ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.56
Committed: Sat Apr 10 14:22:38 2004 UTC (20 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.55: +36 -22 lines
Log Message:
Allow and cope with ThreadFactory.newThread failing or returning null

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.47 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.9 import java.util.concurrent.locks.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.17 * An {@link ExecutorService} that executes each submitted task using
13 dl 1.28 * one of possibly several pooled threads, normally configured
14     * using {@link Executors} factory methods.
15 tim 1.1 *
16 dl 1.17 * <p>Thread pools address two different problems: they usually
17     * provide improved performance when executing large numbers of
18     * asynchronous tasks, due to reduced per-task invocation overhead,
19     * and they provide a means of bounding and managing the resources,
20     * including threads, consumed when executing a collection of tasks.
21 dl 1.20 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
22 dl 1.22 * statistics, such as the number of completed tasks.
23 dl 1.17 *
24 tim 1.1 * <p>To be useful across a wide range of contexts, this class
25 dl 1.24 * provides many adjustable parameters and extensibility
26     * hooks. However, programmers are urged to use the more convenient
27 dl 1.20 * {@link Executors} factory methods {@link
28     * Executors#newCachedThreadPool} (unbounded thread pool, with
29     * automatic thread reclamation), {@link Executors#newFixedThreadPool}
30     * (fixed size thread pool) and {@link
31     * Executors#newSingleThreadExecutor} (single background thread), that
32 dl 1.22 * preconfigure settings for the most common usage
33     * scenarios. Otherwise, use the following guide when manually
34 dl 1.24 * configuring and tuning this class:
35 dl 1.17 *
36 tim 1.1 * <dl>
37 dl 1.2 *
38 dl 1.21 * <dt>Core and maximum pool sizes</dt>
39 dl 1.2 *
40 dl 1.19 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
41 dl 1.21 * pool size
42     * (see {@link ThreadPoolExecutor#getPoolSize})
43     * according to the bounds set by corePoolSize
44     * (see {@link ThreadPoolExecutor#getCorePoolSize})
45     * and
46     * maximumPoolSize
47     * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
48     * When a new task is submitted in method {@link
49     * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
50     * are running, a new thread is created to handle the request, even if
51     * other worker threads are idle. If there are more than
52     * corePoolSize but less than maximumPoolSize threads running, a new
53     * thread will be created only if the queue is full. By setting
54     * corePoolSize and maximumPoolSize the same, you create a fixed-size
55     * thread pool. By setting maximumPoolSize to an essentially unbounded
56     * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
57 dl 1.27 * accommodate an arbitrary number of concurrent tasks. Most typically,
58 dl 1.21 * core and maximum pool sizes are set only upon construction, but they
59     * may also be changed dynamically using {@link
60     * ThreadPoolExecutor#setCorePoolSize} and {@link
61     * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
62 dl 1.2 *
63 dl 1.21 * <dt> On-demand construction
64 dl 1.2 *
65 dl 1.21 * <dd> By default, even core threads are initially created and
66     * started only when needed by new tasks, but this can be overridden
67     * dynamically using method {@link
68     * ThreadPoolExecutor#prestartCoreThread} or
69     * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
70 dl 1.2 *
71 tim 1.1 * <dt>Creating new threads</dt>
72 dl 1.2 *
73 dl 1.33 * <dd>New threads are created using a {@link
74     * java.util.concurrent.ThreadFactory}. If not otherwise specified, a
75 dl 1.34 * {@link Executors#defaultThreadFactory} is used, that creates threads to all
76 dl 1.33 * be in the same {@link ThreadGroup} and with the same
77     * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying
78     * a different ThreadFactory, you can alter the thread's name, thread
79 dl 1.56 * group, priority, daemon status, etc. If a ThreadFactory fails to create
80     * a thread when asked (i.e., if it returns <tt>null</tt> or throws
81     * a <tt>RuntimeException</tt>), the executor will continue, but might
82     * not be able to execute any tasks. </dd>
83 dl 1.2 *
84 dl 1.21 * <dt>Keep-alive times</dt>
85     *
86     * <dd>If the pool currently has more than corePoolSize threads,
87     * excess threads will be terminated if they have been idle for more
88     * than the keepAliveTime (see {@link
89     * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
90     * reducing resource consumption when the pool is not being actively
91     * used. If the pool becomes more active later, new threads will be
92     * constructed. This parameter can also be changed dynamically
93     * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
94     * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
95     * effectively disables idle threads from ever terminating prior
96     * to shut down.
97     * </dd>
98     *
99 dl 1.48 * <dt>Queuing</dt>
100 dl 1.21 *
101     * <dd>Any {@link BlockingQueue} may be used to transfer and hold
102     * submitted tasks. The use of this queue interacts with pool sizing:
103 dl 1.2 *
104 dl 1.21 * <ul>
105     *
106 dl 1.23 * <li> If fewer than corePoolSize threads are running, the Executor
107     * always prefers adding a new thread
108 dl 1.48 * rather than queuing.</li>
109 dl 1.21 *
110 dl 1.23 * <li> If corePoolSize or more threads are running, the Executor
111     * always prefers queuing a request rather than adding a new
112     * thread.</li>
113 dl 1.21 *
114     * <li> If a request cannot be queued, a new thread is created unless
115     * this would exceed maximumPoolSize, in which case, the task will be
116     * rejected.</li>
117     *
118     * </ul>
119     *
120     * There are three general strategies for queuing:
121     * <ol>
122     *
123     * <li> <em> Direct handoffs.</em> A good default choice for a work
124     * queue is a {@link SynchronousQueue} that hands off tasks to threads
125     * without otherwise holding them. Here, an attempt to queue a task
126     * will fail if no threads are immediately available to run it, so a
127     * new thread will be constructed. This policy avoids lockups when
128     * handling sets of requests that might have internal dependencies.
129     * Direct handoffs generally require unbounded maximumPoolSizes to
130 dl 1.24 * avoid rejection of new submitted tasks. This in turn admits the
131 dl 1.21 * possibility of unbounded thread growth when commands continue to
132     * arrive on average faster than they can be processed. </li>
133     *
134     * <li><em> Unbounded queues.</em> Using an unbounded queue (for
135     * example a {@link LinkedBlockingQueue} without a predefined
136     * capacity) will cause new tasks to be queued in cases where all
137 dl 1.22 * corePoolSize threads are busy. Thus, no more than corePoolSize
138     * threads will ever be created. (And the value of the maximumPoolSize
139     * therefore doesn't have any effect.) This may be appropriate when
140     * each task is completely independent of others, so tasks cannot
141     * affect each others execution; for example, in a web page server.
142     * While this style of queuing can be useful in smoothing out
143     * transient bursts of requests, it admits the possibility of
144     * unbounded work queue growth when commands continue to arrive on
145     * average faster than they can be processed. </li>
146 dl 1.21 *
147     * <li><em>Bounded queues.</em> A bounded queue (for example, an
148     * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
149     * used with finite maximumPoolSizes, but can be more difficult to
150     * tune and control. Queue sizes and maximum pool sizes may be traded
151     * off for each other: Using large queues and small pools minimizes
152     * CPU usage, OS resources, and context-switching overhead, but can
153 dl 1.27 * lead to artificially low throughput. If tasks frequently block (for
154 dl 1.21 * example if they are I/O bound), a system may be able to schedule
155     * time for more threads than you otherwise allow. Use of small queues
156 dl 1.24 * generally requires larger pool sizes, which keeps CPUs busier but
157     * may encounter unacceptable scheduling overhead, which also
158     * decreases throughput. </li>
159 dl 1.21 *
160     * </ol>
161     *
162     * </dd>
163     *
164     * <dt>Rejected tasks</dt>
165     *
166     * <dd> New tasks submitted in method {@link
167     * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
168     * Executor has been shut down, and also when the Executor uses finite
169     * bounds for both maximum threads and work queue capacity, and is
170 dl 1.22 * saturated. In either case, the <tt>execute</tt> method invokes the
171     * {@link RejectedExecutionHandler#rejectedExecution} method of its
172     * {@link RejectedExecutionHandler}. Four predefined handler policies
173     * are provided:
174 dl 1.21 *
175     * <ol>
176     *
177     * <li> In the
178     * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
179     * runtime {@link RejectedExecutionException} upon rejection. </li>
180     *
181     * <li> In {@link
182     * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
183     * <tt>execute</tt> itself runs the task. This provides a simple
184     * feedback control mechanism that will slow down the rate that new
185     * tasks are submitted. </li>
186     *
187     * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
188     * a task that cannot be executed is simply dropped. </li>
189     *
190     * <li>In {@link
191     * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
192     * shut down, the task at the head of the work queue is dropped, and
193     * then execution is retried (which can fail again, causing this to be
194     * repeated.) </li>
195     *
196     * </ol>
197     *
198     * It is possible to define and use other kinds of {@link
199     * RejectedExecutionHandler} classes. Doing so requires some care
200     * especially when policies are designed to work only under particular
201 dl 1.48 * capacity or queuing policies. </dd>
202 dl 1.21 *
203     * <dt>Hook methods</dt>
204     *
205 dl 1.23 * <dd>This class provides <tt>protected</tt> overridable {@link
206 dl 1.21 * ThreadPoolExecutor#beforeExecute} and {@link
207     * ThreadPoolExecutor#afterExecute} methods that are called before and
208 dl 1.19 * after execution of each task. These can be used to manipulate the
209     * execution environment, for example, reinitializing ThreadLocals,
210 dl 1.21 * gathering statistics, or adding log entries. Additionally, method
211     * {@link ThreadPoolExecutor#terminated} can be overridden to perform
212     * any special processing that needs to be done once the Executor has
213     * fully terminated.</dd>
214 dl 1.2 *
215 dl 1.21 * <dt>Queue maintenance</dt>
216 dl 1.2 *
217 dl 1.24 * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
218     * the work queue for purposes of monitoring and debugging. Use of
219     * this method for any other purpose is strongly discouraged. Two
220     * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
221     * ThreadPoolExecutor#purge} are available to assist in storage
222     * reclamation when large numbers of queued tasks become
223     * cancelled.</dd> </dl>
224 tim 1.1 *
225 dl 1.43 * <p> <b>Extension example</b>. Most extensions of this class
226     * override one or more of the protected hook methods. For example,
227     * here is a subclass that adds a simple pause/resume feature:
228     *
229     * <pre>
230     * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
231     * private boolean isPaused;
232     * private ReentrantLock pauseLock = new ReentrantLock();
233     * private Condition unpaused = pauseLock.newCondition();
234     *
235     * public PausableThreadPoolExecutor(...) { super(...); }
236     *
237     * protected void beforeExecute(Thread t, Runnable r) {
238     * super.beforeExecute(t, r);
239     * pauseLock.lock();
240     * try {
241     * while (isPaused) unpaused.await();
242     * } catch(InterruptedException ie) {
243 dl 1.53 * t.interrupt();
244 dl 1.43 * } finally {
245 dl 1.53 * pauseLock.unlock();
246 dl 1.43 * }
247     * }
248     *
249     * public void pause() {
250     * pauseLock.lock();
251     * try {
252     * isPaused = true;
253     * } finally {
254 dl 1.53 * pauseLock.unlock();
255 dl 1.43 * }
256     * }
257     *
258     * public void resume() {
259     * pauseLock.lock();
260     * try {
261     * isPaused = false;
262     * unpaused.signalAll();
263     * } finally {
264 dl 1.53 * pauseLock.unlock();
265 dl 1.43 * }
266     * }
267     * }
268     * </pre>
269 tim 1.1 * @since 1.5
270 dl 1.8 * @author Doug Lea
271 tim 1.1 */
272 tim 1.38 public class ThreadPoolExecutor extends AbstractExecutorService {
273 dl 1.2 /**
274 tim 1.41 * Only used to force toArray() to produce a Runnable[].
275     */
276     private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
277    
278     /**
279 dl 1.43 * Permission for checking shutdown
280     */
281     private static final RuntimePermission shutdownPerm =
282     new RuntimePermission("modifyThread");
283    
284     /**
285 dl 1.2 * Queue used for holding tasks and handing off to worker threads.
286 tim 1.10 */
287 dl 1.2 private final BlockingQueue<Runnable> workQueue;
288    
289     /**
290     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
291     * workers set.
292 tim 1.10 */
293 dl 1.2 private final ReentrantLock mainLock = new ReentrantLock();
294    
295     /**
296     * Wait condition to support awaitTermination
297 tim 1.10 */
298 dl 1.46 private final Condition termination = mainLock.newCondition();
299 dl 1.2
300     /**
301     * Set containing all worker threads in pool.
302 tim 1.10 */
303 dl 1.17 private final HashSet<Worker> workers = new HashSet<Worker>();
304 dl 1.2
305     /**
306 dl 1.35 * Timeout in nanoseconds for idle threads waiting for work.
307 dl 1.2 * Threads use this timeout only when there are more than
308     * corePoolSize present. Otherwise they wait forever for new work.
309 tim 1.10 */
310 dl 1.2 private volatile long keepAliveTime;
311    
312     /**
313     * Core pool size, updated only while holding mainLock,
314     * but volatile to allow concurrent readability even
315     * during updates.
316 tim 1.10 */
317 dl 1.2 private volatile int corePoolSize;
318    
319     /**
320     * Maximum pool size, updated only while holding mainLock
321     * but volatile to allow concurrent readability even
322     * during updates.
323 tim 1.10 */
324 dl 1.2 private volatile int maximumPoolSize;
325    
326     /**
327     * Current pool size, updated only while holding mainLock
328     * but volatile to allow concurrent readability even
329     * during updates.
330 tim 1.10 */
331 dl 1.2 private volatile int poolSize;
332    
333     /**
334 dl 1.16 * Lifecycle state
335 tim 1.10 */
336 dl 1.52 volatile int runState;
337 dl 1.2
338 dl 1.16 // Special values for runState
339 dl 1.8 /** Normal, not-shutdown mode */
340 dl 1.52 static final int RUNNING = 0;
341 dl 1.8 /** Controlled shutdown mode */
342 dl 1.52 static final int SHUTDOWN = 1;
343 dl 1.16 /** Immediate shutdown mode */
344 dl 1.52 static final int STOP = 2;
345 dl 1.16 /** Final state */
346 dl 1.52 static final int TERMINATED = 3;
347 dl 1.2
348     /**
349     * Handler called when saturated or shutdown in execute.
350 tim 1.10 */
351 dl 1.33 private volatile RejectedExecutionHandler handler;
352 dl 1.2
353     /**
354     * Factory for new threads.
355 tim 1.10 */
356 dl 1.33 private volatile ThreadFactory threadFactory;
357 dl 1.2
358     /**
359     * Tracks largest attained pool size.
360 tim 1.10 */
361 dl 1.2 private int largestPoolSize;
362    
363     /**
364     * Counter for completed tasks. Updated only on termination of
365     * worker threads.
366 tim 1.10 */
367 dl 1.2 private long completedTaskCount;
368 tim 1.41
369 dl 1.8 /**
370 dl 1.35 * The default rejected execution handler
371 dl 1.8 */
372 tim 1.10 private static final RejectedExecutionHandler defaultHandler =
373 dl 1.2 new AbortPolicy();
374    
375     /**
376 dl 1.17 * Invoke the rejected execution handler for the given command.
377 dl 1.13 */
378     void reject(Runnable command) {
379     handler.rejectedExecution(command, this);
380     }
381    
382 dl 1.33 /**
383 dl 1.2 * Create and return a new thread running firstTask as its first
384     * task. Call only while holding mainLock
385 dl 1.8 * @param firstTask the task the new thread should run first (or
386     * null if none)
387 dl 1.56 * @return the new thread, or null if threadFactory fails to create thread
388 dl 1.2 */
389     private Thread addThread(Runnable firstTask) {
390     Worker w = new Worker(firstTask);
391 dl 1.56 Thread t = null;
392     try {
393     t = threadFactory.newThread(w);
394     }
395     catch(RuntimeException ex) { // fall through
396     }
397     if (t != null) {
398     w.thread = t;
399     workers.add(w);
400     int nt = ++poolSize;
401     if (nt > largestPoolSize)
402     largestPoolSize = nt;
403     }
404 dl 1.2 return t;
405     }
406 dl 1.15
407 dl 1.2 /**
408     * Create and start a new thread running firstTask as its first
409 dl 1.50 * task, only if fewer than corePoolSize threads are running.
410 dl 1.8 * @param firstTask the task the new thread should run first (or
411     * null if none)
412 dl 1.2 * @return true if successful.
413     */
414 dl 1.16 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
415 dl 1.2 Thread t = null;
416 dl 1.45 final ReentrantLock mainLock = this.mainLock;
417 dl 1.2 mainLock.lock();
418     try {
419 tim 1.10 if (poolSize < corePoolSize)
420 dl 1.8 t = addThread(firstTask);
421 tim 1.14 } finally {
422 dl 1.2 mainLock.unlock();
423     }
424     if (t == null)
425     return false;
426     t.start();
427     return true;
428     }
429    
430     /**
431 dl 1.50 * Create and start a new thread only if fewer than maximumPoolSize
432 dl 1.2 * threads are running. The new thread runs as its first task the
433     * next task in queue, or if there is none, the given task.
434 dl 1.8 * @param firstTask the task the new thread should run first (or
435     * null if none)
436 dl 1.2 * @return null on failure, else the first task to be run by new thread.
437     */
438 dl 1.8 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
439 dl 1.2 Thread t = null;
440     Runnable next = null;
441 dl 1.45 final ReentrantLock mainLock = this.mainLock;
442 dl 1.2 mainLock.lock();
443     try {
444     if (poolSize < maximumPoolSize) {
445     next = workQueue.poll();
446     if (next == null)
447 dl 1.8 next = firstTask;
448 dl 1.2 t = addThread(next);
449     }
450 tim 1.14 } finally {
451 dl 1.2 mainLock.unlock();
452     }
453     if (t == null)
454     return null;
455     t.start();
456     return next;
457     }
458    
459    
460     /**
461     * Get the next task for a worker thread to run.
462 dl 1.8 * @return the task
463     * @throws InterruptedException if interrupted while waiting for task
464 dl 1.2 */
465 dl 1.52 Runnable getTask() throws InterruptedException {
466 dl 1.2 for (;;) {
467 dl 1.16 switch(runState) {
468     case RUNNING: {
469     if (poolSize <= corePoolSize) // untimed wait if core
470     return workQueue.take();
471    
472     long timeout = keepAliveTime;
473     if (timeout <= 0) // die immediately for 0 timeout
474     return null;
475     Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
476     if (r != null)
477     return r;
478     if (poolSize > corePoolSize) // timed out
479     return null;
480     // else, after timeout, pool shrank so shouldn't die, so retry
481     break;
482     }
483    
484     case SHUTDOWN: {
485     // Help drain queue
486     Runnable r = workQueue.poll();
487     if (r != null)
488     return r;
489    
490     // Check if can terminate
491     if (workQueue.isEmpty()) {
492     interruptIdleWorkers();
493     return null;
494     }
495    
496     // There could still be delayed tasks in queue.
497     // Wait for one, re-checking state upon interruption
498     try {
499     return workQueue.take();
500 dl 1.50 } catch(InterruptedException ignore) {}
501 dl 1.16 break;
502     }
503    
504     case STOP:
505 dl 1.2 return null;
506 dl 1.16 default:
507     assert false;
508     }
509     }
510     }
511    
512     /**
513     * Wake up all threads that might be waiting for tasks.
514     */
515     void interruptIdleWorkers() {
516 dl 1.45 final ReentrantLock mainLock = this.mainLock;
517 dl 1.16 mainLock.lock();
518     try {
519 tim 1.39 for (Worker w : workers)
520     w.interruptIfIdle();
521 dl 1.16 } finally {
522     mainLock.unlock();
523 dl 1.2 }
524     }
525    
526     /**
527     * Perform bookkeeping for a terminated worker thread.
528 tim 1.10 * @param w the worker
529 dl 1.2 */
530 dl 1.52 void workerDone(Worker w) {
531 dl 1.45 final ReentrantLock mainLock = this.mainLock;
532 dl 1.2 mainLock.lock();
533     try {
534     completedTaskCount += w.completedTasks;
535     workers.remove(w);
536 tim 1.10 if (--poolSize > 0)
537 dl 1.2 return;
538    
539 dl 1.16 // Else, this is the last thread. Deal with potential shutdown.
540    
541     int state = runState;
542     assert state != TERMINATED;
543 tim 1.10
544 dl 1.16 if (state != STOP) {
545     // If there are queued tasks but no threads, create
546 dl 1.56 // replacement thread. We must create it initially
547     // idle to avoid orphaned tasks in case addThread
548     // fails. This also handles case of delayed tasks
549     // that will sometime later become runnable.
550 dl 1.16 if (!workQueue.isEmpty()) {
551 dl 1.56 Thread t = addThread(null);
552     if (t != null)
553     t.start();
554 dl 1.16 return;
555     }
556    
557     // Otherwise, we can exit without replacement
558     if (state == RUNNING)
559     return;
560 dl 1.2 }
561    
562 dl 1.16 // Either state is STOP, or state is SHUTDOWN and there is
563     // no work to do. So we can terminate.
564 dl 1.45 termination.signalAll();
565 dl 1.16 runState = TERMINATED;
566     // fall through to call terminate() outside of lock.
567 tim 1.14 } finally {
568 dl 1.2 mainLock.unlock();
569     }
570    
571 dl 1.16 assert runState == TERMINATED;
572     terminated();
573 dl 1.2 }
574    
575     /**
576 tim 1.10 * Worker threads
577 dl 1.2 */
578     private class Worker implements Runnable {
579    
580     /**
581     * The runLock is acquired and released surrounding each task
582     * execution. It mainly protects against interrupts that are
583     * intended to cancel the worker thread from instead
584     * interrupting the task being run.
585     */
586     private final ReentrantLock runLock = new ReentrantLock();
587    
588     /**
589     * Initial task to run before entering run loop
590     */
591     private Runnable firstTask;
592    
593     /**
594     * Per thread completed task counter; accumulated
595     * into completedTaskCount upon termination.
596     */
597     volatile long completedTasks;
598    
599     /**
600     * Thread this worker is running in. Acts as a final field,
601     * but cannot be set until thread is created.
602     */
603     Thread thread;
604    
605     Worker(Runnable firstTask) {
606     this.firstTask = firstTask;
607     }
608    
609     boolean isActive() {
610     return runLock.isLocked();
611     }
612    
613     /**
614     * Interrupt thread if not running a task
615 tim 1.10 */
616 dl 1.2 void interruptIfIdle() {
617 dl 1.45 final ReentrantLock runLock = this.runLock;
618 dl 1.2 if (runLock.tryLock()) {
619     try {
620     thread.interrupt();
621 tim 1.14 } finally {
622 dl 1.2 runLock.unlock();
623     }
624     }
625     }
626    
627     /**
628     * Cause thread to die even if running a task.
629 tim 1.10 */
630 dl 1.2 void interruptNow() {
631     thread.interrupt();
632     }
633    
634     /**
635     * Run a single task between before/after methods.
636     */
637     private void runTask(Runnable task) {
638 dl 1.45 final ReentrantLock runLock = this.runLock;
639 dl 1.2 runLock.lock();
640     try {
641     // Abort now if immediate cancel. Otherwise, we have
642     // committed to run this task.
643 dl 1.16 if (runState == STOP)
644 dl 1.2 return;
645    
646     Thread.interrupted(); // clear interrupt status on entry
647     boolean ran = false;
648     beforeExecute(thread, task);
649     try {
650     task.run();
651     ran = true;
652     afterExecute(task, null);
653     ++completedTasks;
654 tim 1.14 } catch(RuntimeException ex) {
655 dl 1.2 if (!ran)
656     afterExecute(task, ex);
657 dl 1.17 // Else the exception occurred within
658 dl 1.2 // afterExecute itself in which case we don't
659     // want to call it again.
660     throw ex;
661     }
662 tim 1.14 } finally {
663 dl 1.2 runLock.unlock();
664     }
665     }
666    
667     /**
668     * Main run loop
669     */
670     public void run() {
671     try {
672 dl 1.50 Runnable task = firstTask;
673     firstTask = null;
674     while (task != null || (task = getTask()) != null) {
675 dl 1.2 runTask(task);
676     task = null; // unnecessary but can help GC
677     }
678 tim 1.14 } catch(InterruptedException ie) {
679 dl 1.2 // fall through
680 tim 1.14 } finally {
681 dl 1.2 workerDone(this);
682     }
683     }
684     }
685 tim 1.1
686 dl 1.17 // Public methods
687    
688 tim 1.1 /**
689 dl 1.17 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
690 dl 1.34 * initial parameters and default thread factory and handler. It
691     * may be more convenient to use one of the {@link Executors}
692     * factory methods instead of this general purpose constructor.
693 tim 1.1 *
694 dl 1.2 * @param corePoolSize the number of threads to keep in the
695 tim 1.1 * pool, even if they are idle.
696 dl 1.2 * @param maximumPoolSize the maximum number of threads to allow in the
697 tim 1.1 * pool.
698     * @param keepAliveTime when the number of threads is greater than
699 dl 1.2 * the core, this is the maximum time that excess idle threads
700 tim 1.1 * will wait for new tasks before terminating.
701 dl 1.2 * @param unit the time unit for the keepAliveTime
702 tim 1.1 * argument.
703 dl 1.36 * @param workQueue the queue to use for holding tasks before they
704 tim 1.1 * are executed. This queue will hold only the <tt>Runnable</tt>
705     * tasks submitted by the <tt>execute</tt> method.
706 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
707     * keepAliveTime less than zero, or if maximumPoolSize less than or
708     * equal to zero, or if corePoolSize greater than maximumPoolSize.
709 tim 1.1 * @throws NullPointerException if <tt>workQueue</tt> is null
710     */
711 dl 1.2 public ThreadPoolExecutor(int corePoolSize,
712     int maximumPoolSize,
713 tim 1.1 long keepAliveTime,
714 dl 1.2 TimeUnit unit,
715     BlockingQueue<Runnable> workQueue) {
716 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
717 dl 1.34 Executors.defaultThreadFactory(), defaultHandler);
718 dl 1.2 }
719 tim 1.1
720 dl 1.2 /**
721     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
722     * parameters.
723     *
724     * @param corePoolSize the number of threads to keep in the
725     * pool, even if they are idle.
726     * @param maximumPoolSize the maximum number of threads to allow in the
727     * pool.
728     * @param keepAliveTime when the number of threads is greater than
729     * the core, this is the maximum time that excess idle threads
730     * will wait for new tasks before terminating.
731     * @param unit the time unit for the keepAliveTime
732     * argument.
733 dl 1.36 * @param workQueue the queue to use for holding tasks before they
734 dl 1.2 * are executed. This queue will hold only the <tt>Runnable</tt>
735     * tasks submitted by the <tt>execute</tt> method.
736     * @param threadFactory the factory to use when the executor
737 tim 1.10 * creates a new thread.
738 dl 1.2 * @throws IllegalArgumentException if corePoolSize, or
739     * keepAliveTime less than zero, or if maximumPoolSize less than or
740     * equal to zero, or if corePoolSize greater than maximumPoolSize.
741 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
742 dl 1.2 * or <tt>threadFactory</tt> are null.
743     */
744     public ThreadPoolExecutor(int corePoolSize,
745     int maximumPoolSize,
746     long keepAliveTime,
747     TimeUnit unit,
748     BlockingQueue<Runnable> workQueue,
749     ThreadFactory threadFactory) {
750 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
751 dl 1.2 threadFactory, defaultHandler);
752     }
753 tim 1.1
754 dl 1.2 /**
755     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
756     * parameters.
757     *
758     * @param corePoolSize the number of threads to keep in the
759     * pool, even if they are idle.
760     * @param maximumPoolSize the maximum number of threads to allow in the
761     * pool.
762     * @param keepAliveTime when the number of threads is greater than
763     * the core, this is the maximum time that excess idle threads
764     * will wait for new tasks before terminating.
765     * @param unit the time unit for the keepAliveTime
766     * argument.
767 dl 1.36 * @param workQueue the queue to use for holding tasks before they
768 dl 1.2 * are executed. This queue will hold only the <tt>Runnable</tt>
769     * tasks submitted by the <tt>execute</tt> method.
770     * @param handler the handler to use when execution is blocked
771     * because the thread bounds and queue capacities are reached.
772     * @throws IllegalArgumentException if corePoolSize, or
773     * keepAliveTime less than zero, or if maximumPoolSize less than or
774     * equal to zero, or if corePoolSize greater than maximumPoolSize.
775 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
776 dl 1.2 * or <tt>handler</tt> are null.
777     */
778     public ThreadPoolExecutor(int corePoolSize,
779     int maximumPoolSize,
780     long keepAliveTime,
781     TimeUnit unit,
782     BlockingQueue<Runnable> workQueue,
783     RejectedExecutionHandler handler) {
784 tim 1.10 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
785 dl 1.34 Executors.defaultThreadFactory(), handler);
786 dl 1.2 }
787 tim 1.1
788 dl 1.2 /**
789     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
790     * parameters.
791     *
792     * @param corePoolSize the number of threads to keep in the
793     * pool, even if they are idle.
794     * @param maximumPoolSize the maximum number of threads to allow in the
795     * pool.
796     * @param keepAliveTime when the number of threads is greater than
797     * the core, this is the maximum time that excess idle threads
798     * will wait for new tasks before terminating.
799     * @param unit the time unit for the keepAliveTime
800     * argument.
801 dl 1.36 * @param workQueue the queue to use for holding tasks before they
802 dl 1.2 * are executed. This queue will hold only the <tt>Runnable</tt>
803     * tasks submitted by the <tt>execute</tt> method.
804     * @param threadFactory the factory to use when the executor
805 tim 1.10 * creates a new thread.
806 dl 1.2 * @param handler the handler to use when execution is blocked
807     * because the thread bounds and queue capacities are reached.
808     * @throws IllegalArgumentException if corePoolSize, or
809     * keepAliveTime less than zero, or if maximumPoolSize less than or
810     * equal to zero, or if corePoolSize greater than maximumPoolSize.
811 tim 1.10 * @throws NullPointerException if <tt>workQueue</tt>
812 dl 1.2 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
813     */
814     public ThreadPoolExecutor(int corePoolSize,
815     int maximumPoolSize,
816     long keepAliveTime,
817     TimeUnit unit,
818     BlockingQueue<Runnable> workQueue,
819     ThreadFactory threadFactory,
820     RejectedExecutionHandler handler) {
821 tim 1.10 if (corePoolSize < 0 ||
822 dl 1.2 maximumPoolSize <= 0 ||
823 tim 1.10 maximumPoolSize < corePoolSize ||
824 dl 1.2 keepAliveTime < 0)
825     throw new IllegalArgumentException();
826     if (workQueue == null || threadFactory == null || handler == null)
827     throw new NullPointerException();
828     this.corePoolSize = corePoolSize;
829     this.maximumPoolSize = maximumPoolSize;
830     this.workQueue = workQueue;
831     this.keepAliveTime = unit.toNanos(keepAliveTime);
832     this.threadFactory = threadFactory;
833     this.handler = handler;
834 tim 1.1 }
835    
836 dl 1.2
837     /**
838     * Executes the given task sometime in the future. The task
839     * may execute in a new thread or in an existing pooled thread.
840     *
841     * If the task cannot be submitted for execution, either because this
842     * executor has been shutdown or because its capacity has been reached,
843 tim 1.10 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
844 dl 1.2 *
845     * @param command the task to execute
846     * @throws RejectedExecutionException at discretion of
847 dl 1.8 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
848     * for execution
849 dl 1.26 * @throws NullPointerException if command is null
850 dl 1.2 */
851 tim 1.10 public void execute(Runnable command) {
852 dl 1.26 if (command == null)
853     throw new NullPointerException();
854 dl 1.2 for (;;) {
855 dl 1.16 if (runState != RUNNING) {
856 dl 1.13 reject(command);
857 dl 1.2 return;
858     }
859     if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
860     return;
861     if (workQueue.offer(command))
862     return;
863     Runnable r = addIfUnderMaximumPoolSize(command);
864     if (r == command)
865     return;
866     if (r == null) {
867 dl 1.13 reject(command);
868 dl 1.2 return;
869     }
870     // else retry
871     }
872 tim 1.1 }
873 dl 1.4
874 dl 1.53 /**
875     * Initiates an orderly shutdown in which previously submitted
876     * tasks are executed, but no new tasks will be
877     * accepted. Invocation has no additional effect if already shut
878     * down.
879     * @throws SecurityException if a security manager exists and
880     * shutting down this ExecutorService may manipulate threads that
881     * the caller is not permitted to modify because it does not hold
882     * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
883     * or the security manager's <tt>checkAccess</tt> method denies access.
884     */
885 dl 1.2 public void shutdown() {
886 dl 1.43 // Fail if caller doesn't have modifyThread permission
887 dl 1.42 SecurityManager security = System.getSecurityManager();
888     if (security != null)
889 dl 1.43 java.security.AccessController.checkPermission(shutdownPerm);
890 dl 1.42
891 dl 1.25 boolean fullyTerminated = false;
892 dl 1.45 final ReentrantLock mainLock = this.mainLock;
893 dl 1.2 mainLock.lock();
894     try {
895 dl 1.25 if (workers.size() > 0) {
896 dl 1.50 // Check if caller can modify worker threads. This
897     // might not be true even if passed above check, if
898     // the SecurityManager treats some threads specially.
899 dl 1.43 if (security != null) {
900     for (Worker w: workers)
901     security.checkAccess(w.thread);
902     }
903    
904     int state = runState;
905     if (state == RUNNING) // don't override shutdownNow
906 dl 1.25 runState = SHUTDOWN;
907 dl 1.43
908     try {
909     for (Worker w: workers)
910     w.interruptIfIdle();
911     } catch(SecurityException se) {
912 dl 1.50 // If SecurityManager allows above checks, but
913     // then unexpectedly throws exception when
914     // interrupting threads (which it ought not do),
915     // back out as cleanly as we can. Some threads may
916     // have been killed but we remain in non-shutdown
917     // state.
918 dl 1.43 runState = state;
919     throw se;
920     }
921 dl 1.25 }
922     else { // If no workers, trigger full termination now
923     fullyTerminated = true;
924     runState = TERMINATED;
925     termination.signalAll();
926     }
927 tim 1.14 } finally {
928 dl 1.2 mainLock.unlock();
929     }
930 dl 1.25 if (fullyTerminated)
931     terminated();
932 tim 1.1 }
933    
934 dl 1.16
935 dl 1.53 /**
936     * Attempts to stop all actively executing tasks, halts the
937     * processing of waiting tasks, and returns a list of the tasks that were
938     * awaiting execution.
939     *
940     * <p>This implementation cancels tasks via {@link
941     * Thread#interrupt}, so if any tasks mask or fail to respond to
942     * interrupts, they may never terminate.
943     *
944     * @return list of tasks that never commenced execution
945     * @throws SecurityException if a security manager exists and
946     * shutting down this ExecutorService may manipulate threads that
947     * the caller is not permitted to modify because it does not hold
948     * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
949     * or the security manager's <tt>checkAccess</tt> method denies access.
950     */
951 tim 1.39 public List<Runnable> shutdownNow() {
952 dl 1.43 // Almost the same code as shutdown()
953 dl 1.42 SecurityManager security = System.getSecurityManager();
954     if (security != null)
955 dl 1.43 java.security.AccessController.checkPermission(shutdownPerm);
956    
957 dl 1.25 boolean fullyTerminated = false;
958 dl 1.45 final ReentrantLock mainLock = this.mainLock;
959 dl 1.2 mainLock.lock();
960     try {
961 dl 1.25 if (workers.size() > 0) {
962 dl 1.43 if (security != null) {
963     for (Worker w: workers)
964     security.checkAccess(w.thread);
965     }
966    
967     int state = runState;
968     if (state != TERMINATED)
969 dl 1.25 runState = STOP;
970 dl 1.43 try {
971     for (Worker w : workers)
972     w.interruptNow();
973     } catch(SecurityException se) {
974     runState = state; // back out;
975     throw se;
976     }
977 dl 1.25 }
978     else { // If no workers, trigger full termination now
979     fullyTerminated = true;
980     runState = TERMINATED;
981     termination.signalAll();
982     }
983 tim 1.14 } finally {
984 dl 1.2 mainLock.unlock();
985     }
986 dl 1.25 if (fullyTerminated)
987     terminated();
988 tim 1.41 return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
989 tim 1.1 }
990    
991 dl 1.2 public boolean isShutdown() {
992 dl 1.16 return runState != RUNNING;
993     }
994    
995     /**
996 dl 1.55 * Returns true if this executor is in the process of terminating
997 dl 1.16 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
998     * completely terminated. This method may be useful for
999     * debugging. A return of <tt>true</tt> reported a sufficient
1000     * period after shutdown may indicate that submitted tasks have
1001     * ignored or suppressed interruption, causing this executor not
1002     * to properly terminate.
1003     * @return true if terminating but not yet terminated.
1004     */
1005     public boolean isTerminating() {
1006     return runState == STOP;
1007 tim 1.1 }
1008    
1009 dl 1.2 public boolean isTerminated() {
1010 dl 1.16 return runState == TERMINATED;
1011 dl 1.2 }
1012 tim 1.1
1013 dl 1.2 public boolean awaitTermination(long timeout, TimeUnit unit)
1014     throws InterruptedException {
1015 dl 1.50 long nanos = unit.toNanos(timeout);
1016 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1017 dl 1.2 mainLock.lock();
1018     try {
1019 dl 1.25 for (;;) {
1020     if (runState == TERMINATED)
1021     return true;
1022     if (nanos <= 0)
1023     return false;
1024     nanos = termination.awaitNanos(nanos);
1025     }
1026 tim 1.14 } finally {
1027 dl 1.2 mainLock.unlock();
1028     }
1029 dl 1.15 }
1030    
1031     /**
1032     * Invokes <tt>shutdown</tt> when this executor is no longer
1033     * referenced.
1034     */
1035     protected void finalize() {
1036     shutdown();
1037 dl 1.2 }
1038 tim 1.10
1039 dl 1.2 /**
1040     * Sets the thread factory used to create new threads.
1041     *
1042     * @param threadFactory the new thread factory
1043 dl 1.30 * @throws NullPointerException if threadFactory is null
1044 tim 1.11 * @see #getThreadFactory
1045 dl 1.2 */
1046     public void setThreadFactory(ThreadFactory threadFactory) {
1047 dl 1.30 if (threadFactory == null)
1048     throw new NullPointerException();
1049 dl 1.2 this.threadFactory = threadFactory;
1050 tim 1.1 }
1051    
1052 dl 1.2 /**
1053     * Returns the thread factory used to create new threads.
1054     *
1055     * @return the current thread factory
1056 tim 1.11 * @see #setThreadFactory
1057 dl 1.2 */
1058     public ThreadFactory getThreadFactory() {
1059     return threadFactory;
1060 tim 1.1 }
1061    
1062 dl 1.2 /**
1063     * Sets a new handler for unexecutable tasks.
1064     *
1065     * @param handler the new handler
1066 dl 1.31 * @throws NullPointerException if handler is null
1067 tim 1.11 * @see #getRejectedExecutionHandler
1068 dl 1.2 */
1069     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
1070 dl 1.31 if (handler == null)
1071     throw new NullPointerException();
1072 dl 1.2 this.handler = handler;
1073     }
1074 tim 1.1
1075 dl 1.2 /**
1076     * Returns the current handler for unexecutable tasks.
1077     *
1078     * @return the current handler
1079 tim 1.11 * @see #setRejectedExecutionHandler
1080 dl 1.2 */
1081     public RejectedExecutionHandler getRejectedExecutionHandler() {
1082     return handler;
1083 tim 1.1 }
1084    
1085 dl 1.2 /**
1086 dl 1.17 * Returns the task queue used by this executor. Access to the
1087     * task queue is intended primarily for debugging and monitoring.
1088 dl 1.27 * This queue may be in active use. Retrieving the task queue
1089 dl 1.2 * does not prevent queued tasks from executing.
1090     *
1091     * @return the task queue
1092     */
1093     public BlockingQueue<Runnable> getQueue() {
1094     return workQueue;
1095 tim 1.1 }
1096 dl 1.4
1097     /**
1098 dl 1.44 * Removes this task from the executor's internal queue if it is
1099     * present, thus causing it not to be run if it has not already
1100     * started.
1101     *
1102     * <p> This method may be useful as one part of a cancellation
1103     * scheme. It may fail to remove tasks that have been converted
1104     * into other forms before being placed on the internal queue. For
1105     * example, a task entered using <tt>submit</tt> might be
1106     * converted into a form that maintains <tt>Future</tt> status.
1107     * However, in such cases, method {@link ThreadPoolExecutor#purge}
1108     * may be used to remove those Futures that have been cancelled.
1109     *
1110 tim 1.10 *
1111 dl 1.8 * @param task the task to remove
1112     * @return true if the task was removed
1113 dl 1.4 */
1114 dl 1.5 public boolean remove(Runnable task) {
1115 dl 1.4 return getQueue().remove(task);
1116     }
1117    
1118 dl 1.7
1119     /**
1120 dl 1.37 * Tries to remove from the work queue all {@link Future}
1121 dl 1.16 * tasks that have been cancelled. This method can be useful as a
1122     * storage reclamation operation, that has no other impact on
1123     * functionality. Cancelled tasks are never executed, but may
1124     * accumulate in work queues until worker threads can actively
1125     * remove them. Invoking this method instead tries to remove them now.
1126 dl 1.23 * However, this method may fail to remove tasks in
1127 dl 1.16 * the presence of interference by other threads.
1128 dl 1.7 */
1129     public void purge() {
1130 dl 1.16 // Fail if we encounter interference during traversal
1131     try {
1132     Iterator<Runnable> it = getQueue().iterator();
1133     while (it.hasNext()) {
1134     Runnable r = it.next();
1135 dl 1.37 if (r instanceof Future<?>) {
1136     Future<?> c = (Future<?>)r;
1137 dl 1.16 if (c.isCancelled())
1138     it.remove();
1139     }
1140 dl 1.7 }
1141     }
1142 dl 1.16 catch(ConcurrentModificationException ex) {
1143     return;
1144     }
1145 dl 1.7 }
1146 tim 1.1
1147     /**
1148 dl 1.2 * Sets the core number of threads. This overrides any value set
1149     * in the constructor. If the new value is smaller than the
1150     * current value, excess existing threads will be terminated when
1151 dl 1.34 * they next become idle. If larger, new threads will, if needed,
1152     * be started to execute any queued tasks.
1153 tim 1.1 *
1154 dl 1.2 * @param corePoolSize the new core size
1155 tim 1.10 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
1156 dl 1.8 * less than zero
1157 tim 1.11 * @see #getCorePoolSize
1158 tim 1.1 */
1159 dl 1.2 public void setCorePoolSize(int corePoolSize) {
1160     if (corePoolSize < 0)
1161     throw new IllegalArgumentException();
1162 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1163 dl 1.2 mainLock.lock();
1164     try {
1165     int extra = this.corePoolSize - corePoolSize;
1166     this.corePoolSize = corePoolSize;
1167 tim 1.38 if (extra < 0) {
1168 dl 1.56 int n = workQueue.size();
1169     // We have to create initially-idle threads here
1170     // because we otherwise have no recourse about
1171     // what to do with a dequeued task if addThread fails.
1172     while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) {
1173     Thread t = addThread(null);
1174     if (t != null)
1175     t.start();
1176     else
1177     break;
1178     }
1179 tim 1.38 }
1180     else if (extra > 0 && poolSize > corePoolSize) {
1181 dl 1.2 Iterator<Worker> it = workers.iterator();
1182 tim 1.10 while (it.hasNext() &&
1183 dl 1.34 extra-- > 0 &&
1184 dl 1.2 poolSize > corePoolSize &&
1185 dl 1.34 workQueue.remainingCapacity() == 0)
1186 dl 1.2 it.next().interruptIfIdle();
1187     }
1188 tim 1.14 } finally {
1189 dl 1.2 mainLock.unlock();
1190     }
1191     }
1192 tim 1.1
1193     /**
1194 dl 1.2 * Returns the core number of threads.
1195 tim 1.1 *
1196 dl 1.2 * @return the core number of threads
1197 tim 1.11 * @see #setCorePoolSize
1198 tim 1.1 */
1199 tim 1.10 public int getCorePoolSize() {
1200 dl 1.2 return corePoolSize;
1201 dl 1.16 }
1202    
1203     /**
1204 dl 1.55 * Starts a core thread, causing it to idly wait for work. This
1205 dl 1.16 * overrides the default policy of starting core threads only when
1206     * new tasks are executed. This method will return <tt>false</tt>
1207     * if all core threads have already been started.
1208     * @return true if a thread was started
1209     */
1210     public boolean prestartCoreThread() {
1211     return addIfUnderCorePoolSize(null);
1212     }
1213    
1214     /**
1215 dl 1.55 * Starts all core threads, causing them to idly wait for work. This
1216 dl 1.16 * overrides the default policy of starting core threads only when
1217     * new tasks are executed.
1218     * @return the number of threads started.
1219     */
1220     public int prestartAllCoreThreads() {
1221     int n = 0;
1222     while (addIfUnderCorePoolSize(null))
1223     ++n;
1224     return n;
1225 dl 1.2 }
1226 tim 1.1
1227     /**
1228     * Sets the maximum allowed number of threads. This overrides any
1229 dl 1.2 * value set in the constructor. If the new value is smaller than
1230     * the current value, excess existing threads will be
1231     * terminated when they next become idle.
1232 tim 1.1 *
1233 dl 1.2 * @param maximumPoolSize the new maximum
1234     * @throws IllegalArgumentException if maximumPoolSize less than zero or
1235     * the {@link #getCorePoolSize core pool size}
1236 tim 1.11 * @see #getMaximumPoolSize
1237 dl 1.2 */
1238     public void setMaximumPoolSize(int maximumPoolSize) {
1239     if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1240     throw new IllegalArgumentException();
1241 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1242 dl 1.2 mainLock.lock();
1243     try {
1244     int extra = this.maximumPoolSize - maximumPoolSize;
1245     this.maximumPoolSize = maximumPoolSize;
1246     if (extra > 0 && poolSize > maximumPoolSize) {
1247     Iterator<Worker> it = workers.iterator();
1248 tim 1.10 while (it.hasNext() &&
1249     extra > 0 &&
1250 dl 1.2 poolSize > maximumPoolSize) {
1251     it.next().interruptIfIdle();
1252     --extra;
1253     }
1254     }
1255 tim 1.14 } finally {
1256 dl 1.2 mainLock.unlock();
1257     }
1258     }
1259 tim 1.1
1260     /**
1261     * Returns the maximum allowed number of threads.
1262     *
1263 dl 1.2 * @return the maximum allowed number of threads
1264 tim 1.11 * @see #setMaximumPoolSize
1265 tim 1.1 */
1266 tim 1.10 public int getMaximumPoolSize() {
1267 dl 1.2 return maximumPoolSize;
1268     }
1269 tim 1.1
1270     /**
1271     * Sets the time limit for which threads may remain idle before
1272 dl 1.2 * being terminated. If there are more than the core number of
1273 tim 1.1 * threads currently in the pool, after waiting this amount of
1274     * time without processing a task, excess threads will be
1275     * terminated. This overrides any value set in the constructor.
1276     * @param time the time to wait. A time value of zero will cause
1277     * excess threads to terminate immediately after executing tasks.
1278 dl 1.2 * @param unit the time unit of the time argument
1279 dl 1.17 * @throws IllegalArgumentException if time less than zero
1280 tim 1.11 * @see #getKeepAliveTime
1281 tim 1.1 */
1282 dl 1.2 public void setKeepAliveTime(long time, TimeUnit unit) {
1283     if (time < 0)
1284     throw new IllegalArgumentException();
1285     this.keepAliveTime = unit.toNanos(time);
1286     }
1287 tim 1.1
1288     /**
1289     * Returns the thread keep-alive time, which is the amount of time
1290 dl 1.2 * which threads in excess of the core pool size may remain
1291 tim 1.10 * idle before being terminated.
1292 tim 1.1 *
1293 dl 1.2 * @param unit the desired time unit of the result
1294 tim 1.1 * @return the time limit
1295 tim 1.11 * @see #setKeepAliveTime
1296 tim 1.1 */
1297 tim 1.10 public long getKeepAliveTime(TimeUnit unit) {
1298 dl 1.2 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1299     }
1300 tim 1.1
1301     /* Statistics */
1302    
1303     /**
1304     * Returns the current number of threads in the pool.
1305     *
1306     * @return the number of threads
1307     */
1308 tim 1.10 public int getPoolSize() {
1309 dl 1.2 return poolSize;
1310     }
1311 tim 1.1
1312     /**
1313 dl 1.2 * Returns the approximate number of threads that are actively
1314 tim 1.1 * executing tasks.
1315     *
1316     * @return the number of threads
1317     */
1318 tim 1.10 public int getActiveCount() {
1319 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1320 dl 1.2 mainLock.lock();
1321     try {
1322     int n = 0;
1323 tim 1.39 for (Worker w : workers) {
1324     if (w.isActive())
1325 dl 1.2 ++n;
1326     }
1327     return n;
1328 tim 1.14 } finally {
1329 dl 1.2 mainLock.unlock();
1330     }
1331     }
1332 tim 1.1
1333     /**
1334 dl 1.2 * Returns the largest number of threads that have ever
1335     * simultaneously been in the pool.
1336 tim 1.1 *
1337     * @return the number of threads
1338     */
1339 tim 1.10 public int getLargestPoolSize() {
1340 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1341 dl 1.2 mainLock.lock();
1342     try {
1343     return largestPoolSize;
1344 tim 1.14 } finally {
1345 dl 1.2 mainLock.unlock();
1346     }
1347     }
1348 tim 1.1
1349     /**
1350 dl 1.2 * Returns the approximate total number of tasks that have been
1351     * scheduled for execution. Because the states of tasks and
1352     * threads may change dynamically during computation, the returned
1353 dl 1.17 * value is only an approximation, but one that does not ever
1354     * decrease across successive calls.
1355 tim 1.1 *
1356     * @return the number of tasks
1357     */
1358 tim 1.10 public long getTaskCount() {
1359 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1360 dl 1.2 mainLock.lock();
1361     try {
1362     long n = completedTaskCount;
1363 tim 1.39 for (Worker w : workers) {
1364 dl 1.2 n += w.completedTasks;
1365     if (w.isActive())
1366     ++n;
1367     }
1368     return n + workQueue.size();
1369 tim 1.14 } finally {
1370 dl 1.2 mainLock.unlock();
1371     }
1372     }
1373 tim 1.1
1374     /**
1375 dl 1.2 * Returns the approximate total number of tasks that have
1376     * completed execution. Because the states of tasks and threads
1377     * may change dynamically during computation, the returned value
1378 dl 1.17 * is only an approximation, but one that does not ever decrease
1379     * across successive calls.
1380 tim 1.1 *
1381     * @return the number of tasks
1382     */
1383 tim 1.10 public long getCompletedTaskCount() {
1384 dl 1.45 final ReentrantLock mainLock = this.mainLock;
1385 dl 1.2 mainLock.lock();
1386     try {
1387     long n = completedTaskCount;
1388 tim 1.39 for (Worker w : workers)
1389     n += w.completedTasks;
1390 dl 1.2 return n;
1391 tim 1.14 } finally {
1392 dl 1.2 mainLock.unlock();
1393     }
1394     }
1395 tim 1.1
1396     /**
1397 dl 1.17 * Method invoked prior to executing the given Runnable in the
1398 dl 1.43 * given thread. This method is invoked by thread <tt>t</tt> that
1399     * will execute task <tt>r</tt>, and may be used to re-initialize
1400 dl 1.17 * ThreadLocals, or to perform logging. Note: To properly nest
1401     * multiple overridings, subclasses should generally invoke
1402 dl 1.5 * <tt>super.beforeExecute</tt> at the end of this method.
1403 tim 1.1 *
1404 dl 1.2 * @param t the thread that will run task r.
1405     * @param r the task that will be executed.
1406 tim 1.1 */
1407 dl 1.2 protected void beforeExecute(Thread t, Runnable r) { }
1408 tim 1.1
1409     /**
1410 dl 1.2 * Method invoked upon completion of execution of the given
1411 dl 1.43 * Runnable. This method is invoked by the thread that executed
1412     * the task. If non-null, the Throwable is the uncaught exception
1413 dl 1.5 * that caused execution to terminate abruptly. Note: To properly
1414     * nest multiple overridings, subclasses should generally invoke
1415     * <tt>super.afterExecute</tt> at the beginning of this method.
1416 tim 1.1 *
1417 dl 1.2 * @param r the runnable that has completed.
1418 dl 1.24 * @param t the exception that caused termination, or null if
1419 dl 1.2 * execution completed normally.
1420 tim 1.1 */
1421 dl 1.2 protected void afterExecute(Runnable r, Throwable t) { }
1422 tim 1.1
1423 dl 1.2 /**
1424     * Method invoked when the Executor has terminated. Default
1425 dl 1.17 * implementation does nothing. Note: To properly nest multiple
1426     * overridings, subclasses should generally invoke
1427     * <tt>super.terminated</tt> within this method.
1428 dl 1.2 */
1429     protected void terminated() { }
1430 tim 1.1
1431     /**
1432 dl 1.21 * A handler for rejected tasks that runs the rejected task
1433     * directly in the calling thread of the <tt>execute</tt> method,
1434     * unless the executor has been shut down, in which case the task
1435     * is discarded.
1436 tim 1.1 */
1437 dl 1.2 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1438 tim 1.1 /**
1439 dl 1.24 * Creates a <tt>CallerRunsPolicy</tt>.
1440 tim 1.1 */
1441     public CallerRunsPolicy() { }
1442    
1443 dl 1.24 /**
1444     * Executes task r in the caller's thread, unless the executor
1445     * has been shut down, in which case the task is discarded.
1446     * @param r the runnable task requested to be executed
1447     * @param e the executor attempting to execute this task
1448     */
1449 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1450     if (!e.isShutdown()) {
1451 tim 1.1 r.run();
1452     }
1453     }
1454     }
1455    
1456     /**
1457 dl 1.21 * A handler for rejected tasks that throws a
1458 dl 1.8 * <tt>RejectedExecutionException</tt>.
1459 tim 1.1 */
1460 dl 1.2 public static class AbortPolicy implements RejectedExecutionHandler {
1461 tim 1.1 /**
1462 dl 1.29 * Creates an <tt>AbortPolicy</tt>.
1463 tim 1.1 */
1464     public AbortPolicy() { }
1465    
1466 dl 1.24 /**
1467 dl 1.54 * Always throws RejectedExecutionException.
1468 dl 1.24 * @param r the runnable task requested to be executed
1469     * @param e the executor attempting to execute this task
1470     * @throws RejectedExecutionException always.
1471     */
1472 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1473     throw new RejectedExecutionException();
1474 tim 1.1 }
1475     }
1476    
1477     /**
1478 dl 1.21 * A handler for rejected tasks that silently discards the
1479     * rejected task.
1480 tim 1.1 */
1481 dl 1.2 public static class DiscardPolicy implements RejectedExecutionHandler {
1482 tim 1.1 /**
1483 dl 1.54 * Creates a <tt>DiscardPolicy</tt>.
1484 tim 1.1 */
1485     public DiscardPolicy() { }
1486    
1487 dl 1.24 /**
1488     * Does nothing, which has the effect of discarding task r.
1489     * @param r the runnable task requested to be executed
1490     * @param e the executor attempting to execute this task
1491     */
1492 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1493 tim 1.1 }
1494     }
1495    
1496     /**
1497 dl 1.21 * A handler for rejected tasks that discards the oldest unhandled
1498     * request and then retries <tt>execute</tt>, unless the executor
1499     * is shut down, in which case the task is discarded.
1500 tim 1.1 */
1501 dl 1.2 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1502 tim 1.1 /**
1503 dl 1.24 * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1504 tim 1.1 */
1505     public DiscardOldestPolicy() { }
1506    
1507 dl 1.24 /**
1508     * Obtains and ignores the next task that the executor
1509     * would otherwise execute, if one is immediately available,
1510     * and then retries execution of task r, unless the executor
1511     * is shut down, in which case task r is instead discarded.
1512     * @param r the runnable task requested to be executed
1513     * @param e the executor attempting to execute this task
1514     */
1515 dl 1.2 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1516     if (!e.isShutdown()) {
1517     e.getQueue().poll();
1518     e.execute(r);
1519 tim 1.1 }
1520     }
1521     }
1522     }