ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.26
Committed: Thu Sep 25 11:01:22 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.25: +3 -0 lines
Log Message:
Consistently throw NPE for execute(null)

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An {@link ExecutorService} that executes each submitted task using
13 * one of possibly several pooled threads.
14 *
15 * <p>Thread pools address two different problems: they usually
16 * provide improved performance when executing large numbers of
17 * asynchronous tasks, due to reduced per-task invocation overhead,
18 * and they provide a means of bounding and managing the resources,
19 * including threads, consumed when executing a collection of tasks.
20 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
21 * statistics, such as the number of completed tasks.
22 *
23 * <p>To be useful across a wide range of contexts, this class
24 * provides many adjustable parameters and extensibility
25 * hooks. However, programmers are urged to use the more convenient
26 * {@link Executors} factory methods {@link
27 * Executors#newCachedThreadPool} (unbounded thread pool, with
28 * automatic thread reclamation), {@link Executors#newFixedThreadPool}
29 * (fixed size thread pool) and {@link
30 * Executors#newSingleThreadExecutor} (single background thread), that
31 * preconfigure settings for the most common usage
32 * scenarios. Otherwise, use the following guide when manually
33 * configuring and tuning this class:
34 *
35 * <dl>
36 *
37 * <dt>Core and maximum pool sizes</dt>
38 *
39 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
40 * pool size
41 * (see {@link ThreadPoolExecutor#getPoolSize})
42 * according to the bounds set by corePoolSize
43 * (see {@link ThreadPoolExecutor#getCorePoolSize})
44 * and
45 * maximumPoolSize
46 * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
47 * When a new task is submitted in method {@link
48 * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
49 * are running, a new thread is created to handle the request, even if
50 * other worker threads are idle. If there are more than
51 * corePoolSize but less than maximumPoolSize threads running, a new
52 * thread will be created only if the queue is full. By setting
53 * corePoolSize and maximumPoolSize the same, you create a fixed-size
54 * thread pool. By setting maximumPoolSize to an essentially unbounded
55 * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
56 * accomodate an arbitrary number of concurrent tasks. Most typically,
57 * core and maximum pool sizes are set only upon construction, but they
58 * may also be changed dynamically using {@link
59 * ThreadPoolExecutor#setCorePoolSize} and {@link
60 * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
61 *
62 * <dt> On-demand construction
63 *
64 * <dd> By default, even core threads are initially created and
65 * started only when needed by new tasks, but this can be overridden
66 * dynamically using method {@link
67 * ThreadPoolExecutor#prestartCoreThread} or
68 * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
69 *
70 * <dt>Creating new threads</dt>
71 *
72 * <dd>New threads are created using a {@link ThreadFactory}. By
73 * default, threads are created simply with the <tt>new
74 * Thread(Runnable)</tt> constructor, but by supplying a different
75 * ThreadFactory, you can alter the thread's name, thread group,
76 * priority, daemon status, etc. </dd>
77 *
78 * <dt>Keep-alive times</dt>
79 *
80 * <dd>If the pool currently has more than corePoolSize threads,
81 * excess threads will be terminated if they have been idle for more
82 * than the keepAliveTime (see {@link
83 * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
84 * reducing resource consumption when the pool is not being actively
85 * used. If the pool becomes more active later, new threads will be
86 * constructed. This parameter can also be changed dynamically
87 * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
88 * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
89 * effectively disables idle threads from ever terminating prior
90 * to shut down.
91 * </dd>
92 *
93 * <dt>Queueing</dt>
94 *
95 * <dd>Any {@link BlockingQueue} may be used to transfer and hold
96 * submitted tasks. The use of this queue interacts with pool sizing:
97 *
98 * <ul>
99 *
100 * <li> If fewer than corePoolSize threads are running, the Executor
101 * always prefers adding a new thread
102 * rather than queueing.</li>
103 *
104 * <li> If corePoolSize or more threads are running, the Executor
105 * always prefers queuing a request rather than adding a new
106 * thread.</li>
107 *
108 * <li> If a request cannot be queued, a new thread is created unless
109 * this would exceed maximumPoolSize, in which case, the task will be
110 * rejected.</li>
111 *
112 * </ul>
113 *
114 * There are three general strategies for queuing:
115 * <ol>
116 *
117 * <li> <em> Direct handoffs.</em> A good default choice for a work
118 * queue is a {@link SynchronousQueue} that hands off tasks to threads
119 * without otherwise holding them. Here, an attempt to queue a task
120 * will fail if no threads are immediately available to run it, so a
121 * new thread will be constructed. This policy avoids lockups when
122 * handling sets of requests that might have internal dependencies.
123 * Direct handoffs generally require unbounded maximumPoolSizes to
124 * avoid rejection of new submitted tasks. This in turn admits the
125 * possibility of unbounded thread growth when commands continue to
126 * arrive on average faster than they can be processed. </li>
127 *
128 * <li><em> Unbounded queues.</em> Using an unbounded queue (for
129 * example a {@link LinkedBlockingQueue} without a predefined
130 * capacity) will cause new tasks to be queued in cases where all
131 * corePoolSize threads are busy. Thus, no more than corePoolSize
132 * threads will ever be created. (And the value of the maximumPoolSize
133 * therefore doesn't have any effect.) This may be appropriate when
134 * each task is completely independent of others, so tasks cannot
135 * affect each others execution; for example, in a web page server.
136 * While this style of queuing can be useful in smoothing out
137 * transient bursts of requests, it admits the possibility of
138 * unbounded work queue growth when commands continue to arrive on
139 * average faster than they can be processed. </li>
140 *
141 * <li><em>Bounded queues.</em> A bounded queue (for example, an
142 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
143 * used with finite maximumPoolSizes, but can be more difficult to
144 * tune and control. Queue sizes and maximum pool sizes may be traded
145 * off for each other: Using large queues and small pools minimizes
146 * CPU usage, OS resources, and context-switching overhead, but can
147 * lead to artifically low throughput. If tasks frequently block (for
148 * example if they are I/O bound), a system may be able to schedule
149 * time for more threads than you otherwise allow. Use of small queues
150 * generally requires larger pool sizes, which keeps CPUs busier but
151 * may encounter unacceptable scheduling overhead, which also
152 * decreases throughput. </li>
153 *
154 * </ol>
155 *
156 * </dd>
157 *
158 * <dt>Rejected tasks</dt>
159 *
160 * <dd> New tasks submitted in method {@link
161 * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
162 * Executor has been shut down, and also when the Executor uses finite
163 * bounds for both maximum threads and work queue capacity, and is
164 * saturated. In either case, the <tt>execute</tt> method invokes the
165 * {@link RejectedExecutionHandler#rejectedExecution} method of its
166 * {@link RejectedExecutionHandler}. Four predefined handler policies
167 * are provided:
168 *
169 * <ol>
170 *
171 * <li> In the
172 * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
173 * runtime {@link RejectedExecutionException} upon rejection. </li>
174 *
175 * <li> In {@link
176 * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
177 * <tt>execute</tt> itself runs the task. This provides a simple
178 * feedback control mechanism that will slow down the rate that new
179 * tasks are submitted. </li>
180 *
181 * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
182 * a task that cannot be executed is simply dropped. </li>
183 *
184 * <li>In {@link
185 * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
186 * shut down, the task at the head of the work queue is dropped, and
187 * then execution is retried (which can fail again, causing this to be
188 * repeated.) </li>
189 *
190 * </ol>
191 *
192 * It is possible to define and use other kinds of {@link
193 * RejectedExecutionHandler} classes. Doing so requires some care
194 * especially when policies are designed to work only under particular
195 * capacity or queueing policies. </dd>
196 *
197 * <dt>Hook methods</dt>
198 *
199 * <dd>This class provides <tt>protected</tt> overridable {@link
200 * ThreadPoolExecutor#beforeExecute} and {@link
201 * ThreadPoolExecutor#afterExecute} methods that are called before and
202 * after execution of each task. These can be used to manipulate the
203 * execution environment, for example, reinitializing ThreadLocals,
204 * gathering statistics, or adding log entries. Additionally, method
205 * {@link ThreadPoolExecutor#terminated} can be overridden to perform
206 * any special processing that needs to be done once the Executor has
207 * fully terminated.</dd>
208 *
209 * <dt>Queue maintenance</dt>
210 *
211 * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
212 * the work queue for purposes of monitoring and debugging. Use of
213 * this method for any other purpose is strongly discouraged. Two
214 * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
215 * ThreadPoolExecutor#purge} are available to assist in storage
216 * reclamation when large numbers of queued tasks become
217 * cancelled.</dd> </dl>
218 *
219 * @since 1.5
220 * @author Doug Lea
221 */
222 public class ThreadPoolExecutor implements ExecutorService {
223 /**
224 * Queue used for holding tasks and handing off to worker threads.
225 */
226 private final BlockingQueue<Runnable> workQueue;
227
228 /**
229 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
230 * workers set.
231 */
232 private final ReentrantLock mainLock = new ReentrantLock();
233
234 /**
235 * Wait condition to support awaitTermination
236 */
237 private final Condition termination = mainLock.newCondition();
238
239 /**
240 * Set containing all worker threads in pool.
241 */
242 private final HashSet<Worker> workers = new HashSet<Worker>();
243
244 /**
245 * Timeout in nanosecods for idle threads waiting for work.
246 * Threads use this timeout only when there are more than
247 * corePoolSize present. Otherwise they wait forever for new work.
248 */
249 private volatile long keepAliveTime;
250
251 /**
252 * Core pool size, updated only while holding mainLock,
253 * but volatile to allow concurrent readability even
254 * during updates.
255 */
256 private volatile int corePoolSize;
257
258 /**
259 * Maximum pool size, updated only while holding mainLock
260 * but volatile to allow concurrent readability even
261 * during updates.
262 */
263 private volatile int maximumPoolSize;
264
265 /**
266 * Current pool size, updated only while holding mainLock
267 * but volatile to allow concurrent readability even
268 * during updates.
269 */
270 private volatile int poolSize;
271
272 /**
273 * Lifecycle state
274 */
275 private volatile int runState;
276
277 // Special values for runState
278 /** Normal, not-shutdown mode */
279 private static final int RUNNING = 0;
280 /** Controlled shutdown mode */
281 private static final int SHUTDOWN = 1;
282 /** Immediate shutdown mode */
283 private static final int STOP = 2;
284 /** Final state */
285 private static final int TERMINATED = 3;
286
287 /**
288 * Handler called when saturated or shutdown in execute.
289 */
290 private volatile RejectedExecutionHandler handler = defaultHandler;
291
292 /**
293 * Factory for new threads.
294 */
295 private volatile ThreadFactory threadFactory = defaultThreadFactory;
296
297 /**
298 * Tracks largest attained pool size.
299 */
300 private int largestPoolSize;
301
302 /**
303 * Counter for completed tasks. Updated only on termination of
304 * worker threads.
305 */
306 private long completedTaskCount;
307
308 /**
309 * The default thread factory
310 */
311 private static final ThreadFactory defaultThreadFactory =
312 new ThreadFactory() {
313 public Thread newThread(Runnable r) {
314 return new Thread(r);
315 }
316 };
317
318 /**
319 * The default rejectect execution handler
320 */
321 private static final RejectedExecutionHandler defaultHandler =
322 new AbortPolicy();
323
324 /**
325 * Invoke the rejected execution handler for the given command.
326 */
327 void reject(Runnable command) {
328 handler.rejectedExecution(command, this);
329 }
330
331 /**
332 * Create and return a new thread running firstTask as its first
333 * task. Call only while holding mainLock
334 * @param firstTask the task the new thread should run first (or
335 * null if none)
336 * @return the new thread
337 */
338 private Thread addThread(Runnable firstTask) {
339 Worker w = new Worker(firstTask);
340 Thread t = threadFactory.newThread(w);
341 w.thread = t;
342 workers.add(w);
343 int nt = ++poolSize;
344 if (nt > largestPoolSize)
345 largestPoolSize = nt;
346 return t;
347 }
348
349
350
351 /**
352 * Create and start a new thread running firstTask as its first
353 * task, only if less than corePoolSize threads are running.
354 * @param firstTask the task the new thread should run first (or
355 * null if none)
356 * @return true if successful.
357 */
358 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
359 Thread t = null;
360 mainLock.lock();
361 try {
362 if (poolSize < corePoolSize)
363 t = addThread(firstTask);
364 } finally {
365 mainLock.unlock();
366 }
367 if (t == null)
368 return false;
369 t.start();
370 return true;
371 }
372
373 /**
374 * Create and start a new thread only if less than maximumPoolSize
375 * threads are running. The new thread runs as its first task the
376 * next task in queue, or if there is none, the given task.
377 * @param firstTask the task the new thread should run first (or
378 * null if none)
379 * @return null on failure, else the first task to be run by new thread.
380 */
381 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
382 Thread t = null;
383 Runnable next = null;
384 mainLock.lock();
385 try {
386 if (poolSize < maximumPoolSize) {
387 next = workQueue.poll();
388 if (next == null)
389 next = firstTask;
390 t = addThread(next);
391 }
392 } finally {
393 mainLock.unlock();
394 }
395 if (t == null)
396 return null;
397 t.start();
398 return next;
399 }
400
401
402 /**
403 * Get the next task for a worker thread to run.
404 * @return the task
405 * @throws InterruptedException if interrupted while waiting for task
406 */
407 private Runnable getTask() throws InterruptedException {
408 for (;;) {
409 switch(runState) {
410 case RUNNING: {
411 if (poolSize <= corePoolSize) // untimed wait if core
412 return workQueue.take();
413
414 long timeout = keepAliveTime;
415 if (timeout <= 0) // die immediately for 0 timeout
416 return null;
417 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
418 if (r != null)
419 return r;
420 if (poolSize > corePoolSize) // timed out
421 return null;
422 // else, after timeout, pool shrank so shouldn't die, so retry
423 break;
424 }
425
426 case SHUTDOWN: {
427 // Help drain queue
428 Runnable r = workQueue.poll();
429 if (r != null)
430 return r;
431
432 // Check if can terminate
433 if (workQueue.isEmpty()) {
434 interruptIdleWorkers();
435 return null;
436 }
437
438 // There could still be delayed tasks in queue.
439 // Wait for one, re-checking state upon interruption
440 try {
441 return workQueue.take();
442 }
443 catch(InterruptedException ignore) {
444 }
445 break;
446 }
447
448 case STOP:
449 return null;
450 default:
451 assert false;
452 }
453 }
454 }
455
456 /**
457 * Wake up all threads that might be waiting for tasks.
458 */
459 void interruptIdleWorkers() {
460 mainLock.lock();
461 try {
462 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
463 it.next().interruptIfIdle();
464 } finally {
465 mainLock.unlock();
466 }
467 }
468
469 /**
470 * Perform bookkeeping for a terminated worker thread.
471 * @param w the worker
472 */
473 private void workerDone(Worker w) {
474 mainLock.lock();
475 try {
476 completedTaskCount += w.completedTasks;
477 workers.remove(w);
478 if (--poolSize > 0)
479 return;
480
481 // Else, this is the last thread. Deal with potential shutdown.
482
483 int state = runState;
484 assert state != TERMINATED;
485
486 if (state != STOP) {
487 // If there are queued tasks but no threads, create
488 // replacement.
489 Runnable r = workQueue.poll();
490 if (r != null) {
491 addThread(r).start();
492 return;
493 }
494
495 // If there are some (presumably delayed) tasks but
496 // none pollable, create an idle replacement to wait.
497 if (!workQueue.isEmpty()) {
498 addThread(null).start();
499 return;
500 }
501
502 // Otherwise, we can exit without replacement
503 if (state == RUNNING)
504 return;
505 }
506
507 // Either state is STOP, or state is SHUTDOWN and there is
508 // no work to do. So we can terminate.
509 runState = TERMINATED;
510 termination.signalAll();
511 // fall through to call terminate() outside of lock.
512 } finally {
513 mainLock.unlock();
514 }
515
516 assert runState == TERMINATED;
517 terminated();
518 }
519
520 /**
521 * Worker threads
522 */
523 private class Worker implements Runnable {
524
525 /**
526 * The runLock is acquired and released surrounding each task
527 * execution. It mainly protects against interrupts that are
528 * intended to cancel the worker thread from instead
529 * interrupting the task being run.
530 */
531 private final ReentrantLock runLock = new ReentrantLock();
532
533 /**
534 * Initial task to run before entering run loop
535 */
536 private Runnable firstTask;
537
538 /**
539 * Per thread completed task counter; accumulated
540 * into completedTaskCount upon termination.
541 */
542 volatile long completedTasks;
543
544 /**
545 * Thread this worker is running in. Acts as a final field,
546 * but cannot be set until thread is created.
547 */
548 Thread thread;
549
550 Worker(Runnable firstTask) {
551 this.firstTask = firstTask;
552 }
553
554 boolean isActive() {
555 return runLock.isLocked();
556 }
557
558 /**
559 * Interrupt thread if not running a task
560 */
561 void interruptIfIdle() {
562 if (runLock.tryLock()) {
563 try {
564 thread.interrupt();
565 } finally {
566 runLock.unlock();
567 }
568 }
569 }
570
571 /**
572 * Cause thread to die even if running a task.
573 */
574 void interruptNow() {
575 thread.interrupt();
576 }
577
578 /**
579 * Run a single task between before/after methods.
580 */
581 private void runTask(Runnable task) {
582 runLock.lock();
583 try {
584 // Abort now if immediate cancel. Otherwise, we have
585 // committed to run this task.
586 if (runState == STOP)
587 return;
588
589 Thread.interrupted(); // clear interrupt status on entry
590 boolean ran = false;
591 beforeExecute(thread, task);
592 try {
593 task.run();
594 ran = true;
595 afterExecute(task, null);
596 ++completedTasks;
597 } catch(RuntimeException ex) {
598 if (!ran)
599 afterExecute(task, ex);
600 // Else the exception occurred within
601 // afterExecute itself in which case we don't
602 // want to call it again.
603 throw ex;
604 }
605 } finally {
606 runLock.unlock();
607 }
608 }
609
610 /**
611 * Main run loop
612 */
613 public void run() {
614 try {
615 for (;;) {
616 Runnable task;
617 if (firstTask != null) {
618 task = firstTask;
619 firstTask = null;
620 } else {
621 task = getTask();
622 if (task == null)
623 break;
624 }
625 runTask(task);
626 task = null; // unnecessary but can help GC
627 }
628 } catch(InterruptedException ie) {
629 // fall through
630 } finally {
631 workerDone(this);
632 }
633 }
634 }
635
636 // Public methods
637
638 /**
639 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
640 * initial parameters. It may be more convenient to use one of
641 * the {@link Executors} factory methods instead of this general
642 * purpose constructor.
643 *
644 * @param corePoolSize the number of threads to keep in the
645 * pool, even if they are idle.
646 * @param maximumPoolSize the maximum number of threads to allow in the
647 * pool.
648 * @param keepAliveTime when the number of threads is greater than
649 * the core, this is the maximum time that excess idle threads
650 * will wait for new tasks before terminating.
651 * @param unit the time unit for the keepAliveTime
652 * argument.
653 * @param workQueue the queue to use for holding tasks before the
654 * are executed. This queue will hold only the <tt>Runnable</tt>
655 * tasks submitted by the <tt>execute</tt> method.
656 * @throws IllegalArgumentException if corePoolSize, or
657 * keepAliveTime less than zero, or if maximumPoolSize less than or
658 * equal to zero, or if corePoolSize greater than maximumPoolSize.
659 * @throws NullPointerException if <tt>workQueue</tt> is null
660 */
661 public ThreadPoolExecutor(int corePoolSize,
662 int maximumPoolSize,
663 long keepAliveTime,
664 TimeUnit unit,
665 BlockingQueue<Runnable> workQueue) {
666 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
667 defaultThreadFactory, defaultHandler);
668 }
669
670 /**
671 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
672 * parameters.
673 *
674 * @param corePoolSize the number of threads to keep in the
675 * pool, even if they are idle.
676 * @param maximumPoolSize the maximum number of threads to allow in the
677 * pool.
678 * @param keepAliveTime when the number of threads is greater than
679 * the core, this is the maximum time that excess idle threads
680 * will wait for new tasks before terminating.
681 * @param unit the time unit for the keepAliveTime
682 * argument.
683 * @param workQueue the queue to use for holding tasks before the
684 * are executed. This queue will hold only the <tt>Runnable</tt>
685 * tasks submitted by the <tt>execute</tt> method.
686 * @param threadFactory the factory to use when the executor
687 * creates a new thread.
688 * @throws IllegalArgumentException if corePoolSize, or
689 * keepAliveTime less than zero, or if maximumPoolSize less than or
690 * equal to zero, or if corePoolSize greater than maximumPoolSize.
691 * @throws NullPointerException if <tt>workQueue</tt>
692 * or <tt>threadFactory</tt> are null.
693 */
694 public ThreadPoolExecutor(int corePoolSize,
695 int maximumPoolSize,
696 long keepAliveTime,
697 TimeUnit unit,
698 BlockingQueue<Runnable> workQueue,
699 ThreadFactory threadFactory) {
700
701 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
702 threadFactory, defaultHandler);
703 }
704
705 /**
706 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
707 * parameters.
708 *
709 * @param corePoolSize the number of threads to keep in the
710 * pool, even if they are idle.
711 * @param maximumPoolSize the maximum number of threads to allow in the
712 * pool.
713 * @param keepAliveTime when the number of threads is greater than
714 * the core, this is the maximum time that excess idle threads
715 * will wait for new tasks before terminating.
716 * @param unit the time unit for the keepAliveTime
717 * argument.
718 * @param workQueue the queue to use for holding tasks before the
719 * are executed. This queue will hold only the <tt>Runnable</tt>
720 * tasks submitted by the <tt>execute</tt> method.
721 * @param handler the handler to use when execution is blocked
722 * because the thread bounds and queue capacities are reached.
723 * @throws IllegalArgumentException if corePoolSize, or
724 * keepAliveTime less than zero, or if maximumPoolSize less than or
725 * equal to zero, or if corePoolSize greater than maximumPoolSize.
726 * @throws NullPointerException if <tt>workQueue</tt>
727 * or <tt>handler</tt> are null.
728 */
729 public ThreadPoolExecutor(int corePoolSize,
730 int maximumPoolSize,
731 long keepAliveTime,
732 TimeUnit unit,
733 BlockingQueue<Runnable> workQueue,
734 RejectedExecutionHandler handler) {
735 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
736 defaultThreadFactory, handler);
737 }
738
739 /**
740 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
741 * parameters.
742 *
743 * @param corePoolSize the number of threads to keep in the
744 * pool, even if they are idle.
745 * @param maximumPoolSize the maximum number of threads to allow in the
746 * pool.
747 * @param keepAliveTime when the number of threads is greater than
748 * the core, this is the maximum time that excess idle threads
749 * will wait for new tasks before terminating.
750 * @param unit the time unit for the keepAliveTime
751 * argument.
752 * @param workQueue the queue to use for holding tasks before the
753 * are executed. This queue will hold only the <tt>Runnable</tt>
754 * tasks submitted by the <tt>execute</tt> method.
755 * @param threadFactory the factory to use when the executor
756 * creates a new thread.
757 * @param handler the handler to use when execution is blocked
758 * because the thread bounds and queue capacities are reached.
759 * @throws IllegalArgumentException if corePoolSize, or
760 * keepAliveTime less than zero, or if maximumPoolSize less than or
761 * equal to zero, or if corePoolSize greater than maximumPoolSize.
762 * @throws NullPointerException if <tt>workQueue</tt>
763 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
764 */
765 public ThreadPoolExecutor(int corePoolSize,
766 int maximumPoolSize,
767 long keepAliveTime,
768 TimeUnit unit,
769 BlockingQueue<Runnable> workQueue,
770 ThreadFactory threadFactory,
771 RejectedExecutionHandler handler) {
772 if (corePoolSize < 0 ||
773 maximumPoolSize <= 0 ||
774 maximumPoolSize < corePoolSize ||
775 keepAliveTime < 0)
776 throw new IllegalArgumentException();
777 if (workQueue == null || threadFactory == null || handler == null)
778 throw new NullPointerException();
779 this.corePoolSize = corePoolSize;
780 this.maximumPoolSize = maximumPoolSize;
781 this.workQueue = workQueue;
782 this.keepAliveTime = unit.toNanos(keepAliveTime);
783 this.threadFactory = threadFactory;
784 this.handler = handler;
785 }
786
787
788 /**
789 * Executes the given task sometime in the future. The task
790 * may execute in a new thread or in an existing pooled thread.
791 *
792 * If the task cannot be submitted for execution, either because this
793 * executor has been shutdown or because its capacity has been reached,
794 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
795 *
796 * @param command the task to execute
797 * @throws RejectedExecutionException at discretion of
798 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
799 * for execution
800 * @throws NullPointerException if command is null
801 */
802 public void execute(Runnable command) {
803 if (command == null)
804 throw new NullPointerException();
805 for (;;) {
806 if (runState != RUNNING) {
807 reject(command);
808 return;
809 }
810 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
811 return;
812 if (workQueue.offer(command))
813 return;
814 Runnable r = addIfUnderMaximumPoolSize(command);
815 if (r == command)
816 return;
817 if (r == null) {
818 reject(command);
819 return;
820 }
821 // else retry
822 }
823 }
824
825 public void shutdown() {
826 boolean fullyTerminated = false;
827 mainLock.lock();
828 try {
829 if (workers.size() > 0) {
830 if (runState == RUNNING) // don't override shutdownNow
831 runState = SHUTDOWN;
832 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
833 it.next().interruptIfIdle();
834 }
835 else { // If no workers, trigger full termination now
836 fullyTerminated = true;
837 runState = TERMINATED;
838 termination.signalAll();
839 }
840 } finally {
841 mainLock.unlock();
842 }
843 if (fullyTerminated)
844 terminated();
845 }
846
847
848 public List shutdownNow() {
849 boolean fullyTerminated = false;
850 mainLock.lock();
851 try {
852 if (workers.size() > 0) {
853 if (runState != TERMINATED)
854 runState = STOP;
855 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
856 it.next().interruptNow();
857 }
858 else { // If no workers, trigger full termination now
859 fullyTerminated = true;
860 runState = TERMINATED;
861 termination.signalAll();
862 }
863 } finally {
864 mainLock.unlock();
865 }
866 if (fullyTerminated)
867 terminated();
868 return Arrays.asList(workQueue.toArray());
869 }
870
871 public boolean isShutdown() {
872 return runState != RUNNING;
873 }
874
875 /**
876 * Return true if this executor is in the process of terminating
877 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
878 * completely terminated. This method may be useful for
879 * debugging. A return of <tt>true</tt> reported a sufficient
880 * period after shutdown may indicate that submitted tasks have
881 * ignored or suppressed interruption, causing this executor not
882 * to properly terminate.
883 * @return true if terminating but not yet terminated.
884 */
885 public boolean isTerminating() {
886 return runState == STOP;
887 }
888
889 public boolean isTerminated() {
890 return runState == TERMINATED;
891 }
892
893 public boolean awaitTermination(long timeout, TimeUnit unit)
894 throws InterruptedException {
895 mainLock.lock();
896 try {
897 long nanos = unit.toNanos(timeout);
898 for (;;) {
899 if (runState == TERMINATED)
900 return true;
901 if (nanos <= 0)
902 return false;
903 nanos = termination.awaitNanos(nanos);
904 }
905 } finally {
906 mainLock.unlock();
907 }
908 }
909
910 /**
911 * Invokes <tt>shutdown</tt> when this executor is no longer
912 * referenced.
913 */
914 protected void finalize() {
915 shutdown();
916 }
917
918 /**
919 * Sets the thread factory used to create new threads.
920 *
921 * @param threadFactory the new thread factory
922 * @see #getThreadFactory
923 */
924 public void setThreadFactory(ThreadFactory threadFactory) {
925 this.threadFactory = threadFactory;
926 }
927
928 /**
929 * Returns the thread factory used to create new threads.
930 *
931 * @return the current thread factory
932 * @see #setThreadFactory
933 */
934 public ThreadFactory getThreadFactory() {
935 return threadFactory;
936 }
937
938 /**
939 * Sets a new handler for unexecutable tasks.
940 *
941 * @param handler the new handler
942 * @see #getRejectedExecutionHandler
943 */
944 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
945 this.handler = handler;
946 }
947
948 /**
949 * Returns the current handler for unexecutable tasks.
950 *
951 * @return the current handler
952 * @see #setRejectedExecutionHandler
953 */
954 public RejectedExecutionHandler getRejectedExecutionHandler() {
955 return handler;
956 }
957
958 /**
959 * Returns the task queue used by this executor. Access to the
960 * task queue is intended primarily for debugging and monitoring.
961 * This queue may be in active use. Retrieveing the task queue
962 * does not prevent queued tasks from executing.
963 *
964 * @return the task queue
965 */
966 public BlockingQueue<Runnable> getQueue() {
967 return workQueue;
968 }
969
970 /**
971 * Removes this task from internal queue if it is present, thus
972 * causing it not to be run if it has not already started. This
973 * method may be useful as one part of a cancellation scheme.
974 *
975 * @param task the task to remove
976 * @return true if the task was removed
977 */
978 public boolean remove(Runnable task) {
979 return getQueue().remove(task);
980 }
981
982
983 /**
984 * Tries to remove from the work queue all {@link Cancellable}
985 * tasks that have been cancelled. This method can be useful as a
986 * storage reclamation operation, that has no other impact on
987 * functionality. Cancelled tasks are never executed, but may
988 * accumulate in work queues until worker threads can actively
989 * remove them. Invoking this method instead tries to remove them now.
990 * However, this method may fail to remove tasks in
991 * the presence of interference by other threads.
992 */
993
994 public void purge() {
995 // Fail if we encounter interference during traversal
996 try {
997 Iterator<Runnable> it = getQueue().iterator();
998 while (it.hasNext()) {
999 Runnable r = it.next();
1000 if (r instanceof Cancellable) {
1001 Cancellable c = (Cancellable)r;
1002 if (c.isCancelled())
1003 it.remove();
1004 }
1005 }
1006 }
1007 catch(ConcurrentModificationException ex) {
1008 return;
1009 }
1010 }
1011
1012 /**
1013 * Sets the core number of threads. This overrides any value set
1014 * in the constructor. If the new value is smaller than the
1015 * current value, excess existing threads will be terminated when
1016 * they next become idle.
1017 *
1018 * @param corePoolSize the new core size
1019 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
1020 * less than zero
1021 * @see #getCorePoolSize
1022 */
1023 public void setCorePoolSize(int corePoolSize) {
1024 if (corePoolSize < 0)
1025 throw new IllegalArgumentException();
1026 mainLock.lock();
1027 try {
1028 int extra = this.corePoolSize - corePoolSize;
1029 this.corePoolSize = corePoolSize;
1030 if (extra > 0 && poolSize > corePoolSize) {
1031 Iterator<Worker> it = workers.iterator();
1032 while (it.hasNext() &&
1033 extra > 0 &&
1034 poolSize > corePoolSize &&
1035 workQueue.remainingCapacity() == 0) {
1036 it.next().interruptIfIdle();
1037 --extra;
1038 }
1039 }
1040
1041 } finally {
1042 mainLock.unlock();
1043 }
1044 }
1045
1046 /**
1047 * Returns the core number of threads.
1048 *
1049 * @return the core number of threads
1050 * @see #setCorePoolSize
1051 */
1052 public int getCorePoolSize() {
1053 return corePoolSize;
1054 }
1055
1056 /**
1057 * Start a core thread, causing it to idly wait for work. This
1058 * overrides the default policy of starting core threads only when
1059 * new tasks are executed. This method will return <tt>false</tt>
1060 * if all core threads have already been started.
1061 * @return true if a thread was started
1062 */
1063 public boolean prestartCoreThread() {
1064 return addIfUnderCorePoolSize(null);
1065 }
1066
1067 /**
1068 * Start all core threads, causing them to idly wait for work. This
1069 * overrides the default policy of starting core threads only when
1070 * new tasks are executed.
1071 * @return the number of threads started.
1072 */
1073 public int prestartAllCoreThreads() {
1074 int n = 0;
1075 while (addIfUnderCorePoolSize(null))
1076 ++n;
1077 return n;
1078 }
1079
1080 /**
1081 * Sets the maximum allowed number of threads. This overrides any
1082 * value set in the constructor. If the new value is smaller than
1083 * the current value, excess existing threads will be
1084 * terminated when they next become idle.
1085 *
1086 * @param maximumPoolSize the new maximum
1087 * @throws IllegalArgumentException if maximumPoolSize less than zero or
1088 * the {@link #getCorePoolSize core pool size}
1089 * @see #getMaximumPoolSize
1090 */
1091 public void setMaximumPoolSize(int maximumPoolSize) {
1092 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1093 throw new IllegalArgumentException();
1094 mainLock.lock();
1095 try {
1096 int extra = this.maximumPoolSize - maximumPoolSize;
1097 this.maximumPoolSize = maximumPoolSize;
1098 if (extra > 0 && poolSize > maximumPoolSize) {
1099 Iterator<Worker> it = workers.iterator();
1100 while (it.hasNext() &&
1101 extra > 0 &&
1102 poolSize > maximumPoolSize) {
1103 it.next().interruptIfIdle();
1104 --extra;
1105 }
1106 }
1107 } finally {
1108 mainLock.unlock();
1109 }
1110 }
1111
1112 /**
1113 * Returns the maximum allowed number of threads.
1114 *
1115 * @return the maximum allowed number of threads
1116 * @see #setMaximumPoolSize
1117 */
1118 public int getMaximumPoolSize() {
1119 return maximumPoolSize;
1120 }
1121
1122 /**
1123 * Sets the time limit for which threads may remain idle before
1124 * being terminated. If there are more than the core number of
1125 * threads currently in the pool, after waiting this amount of
1126 * time without processing a task, excess threads will be
1127 * terminated. This overrides any value set in the constructor.
1128 * @param time the time to wait. A time value of zero will cause
1129 * excess threads to terminate immediately after executing tasks.
1130 * @param unit the time unit of the time argument
1131 * @throws IllegalArgumentException if time less than zero
1132 * @see #getKeepAliveTime
1133 */
1134 public void setKeepAliveTime(long time, TimeUnit unit) {
1135 if (time < 0)
1136 throw new IllegalArgumentException();
1137 this.keepAliveTime = unit.toNanos(time);
1138 }
1139
1140 /**
1141 * Returns the thread keep-alive time, which is the amount of time
1142 * which threads in excess of the core pool size may remain
1143 * idle before being terminated.
1144 *
1145 * @param unit the desired time unit of the result
1146 * @return the time limit
1147 * @see #setKeepAliveTime
1148 */
1149 public long getKeepAliveTime(TimeUnit unit) {
1150 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1151 }
1152
1153 /* Statistics */
1154
1155 /**
1156 * Returns the current number of threads in the pool.
1157 *
1158 * @return the number of threads
1159 */
1160 public int getPoolSize() {
1161 return poolSize;
1162 }
1163
1164 /**
1165 * Returns the approximate number of threads that are actively
1166 * executing tasks.
1167 *
1168 * @return the number of threads
1169 */
1170 public int getActiveCount() {
1171 mainLock.lock();
1172 try {
1173 int n = 0;
1174 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1175 if (it.next().isActive())
1176 ++n;
1177 }
1178 return n;
1179 } finally {
1180 mainLock.unlock();
1181 }
1182 }
1183
1184 /**
1185 * Returns the largest number of threads that have ever
1186 * simultaneously been in the pool.
1187 *
1188 * @return the number of threads
1189 */
1190 public int getLargestPoolSize() {
1191 mainLock.lock();
1192 try {
1193 return largestPoolSize;
1194 } finally {
1195 mainLock.unlock();
1196 }
1197 }
1198
1199 /**
1200 * Returns the approximate total number of tasks that have been
1201 * scheduled for execution. Because the states of tasks and
1202 * threads may change dynamically during computation, the returned
1203 * value is only an approximation, but one that does not ever
1204 * decrease across successive calls.
1205 *
1206 * @return the number of tasks
1207 */
1208 public long getTaskCount() {
1209 mainLock.lock();
1210 try {
1211 long n = completedTaskCount;
1212 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1213 Worker w = it.next();
1214 n += w.completedTasks;
1215 if (w.isActive())
1216 ++n;
1217 }
1218 return n + workQueue.size();
1219 } finally {
1220 mainLock.unlock();
1221 }
1222 }
1223
1224 /**
1225 * Returns the approximate total number of tasks that have
1226 * completed execution. Because the states of tasks and threads
1227 * may change dynamically during computation, the returned value
1228 * is only an approximation, but one that does not ever decrease
1229 * across successive calls.
1230 *
1231 * @return the number of tasks
1232 */
1233 public long getCompletedTaskCount() {
1234 mainLock.lock();
1235 try {
1236 long n = completedTaskCount;
1237 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1238 n += it.next().completedTasks;
1239 return n;
1240 } finally {
1241 mainLock.unlock();
1242 }
1243 }
1244
1245 /**
1246 * Method invoked prior to executing the given Runnable in the
1247 * given thread. This method may be used to re-initialize
1248 * ThreadLocals, or to perform logging. Note: To properly nest
1249 * multiple overridings, subclasses should generally invoke
1250 * <tt>super.beforeExecute</tt> at the end of this method.
1251 *
1252 * @param t the thread that will run task r.
1253 * @param r the task that will be executed.
1254 */
1255 protected void beforeExecute(Thread t, Runnable r) { }
1256
1257 /**
1258 * Method invoked upon completion of execution of the given
1259 * Runnable. If non-null, the Throwable is the uncaught exception
1260 * that caused execution to terminate abruptly. Note: To properly
1261 * nest multiple overridings, subclasses should generally invoke
1262 * <tt>super.afterExecute</tt> at the beginning of this method.
1263 *
1264 * @param r the runnable that has completed.
1265 * @param t the exception that caused termination, or null if
1266 * execution completed normally.
1267 */
1268 protected void afterExecute(Runnable r, Throwable t) { }
1269
1270 /**
1271 * Method invoked when the Executor has terminated. Default
1272 * implementation does nothing. Note: To properly nest multiple
1273 * overridings, subclasses should generally invoke
1274 * <tt>super.terminated</tt> within this method.
1275 */
1276 protected void terminated() { }
1277
1278 /**
1279 * A handler for rejected tasks that runs the rejected task
1280 * directly in the calling thread of the <tt>execute</tt> method,
1281 * unless the executor has been shut down, in which case the task
1282 * is discarded.
1283 */
1284 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1285
1286 /**
1287 * Creates a <tt>CallerRunsPolicy</tt>.
1288 */
1289 public CallerRunsPolicy() { }
1290
1291 /**
1292 * Executes task r in the caller's thread, unless the executor
1293 * has been shut down, in which case the task is discarded.
1294 * @param r the runnable task requested to be executed
1295 * @param e the executor attempting to execute this task
1296 */
1297 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1298 if (!e.isShutdown()) {
1299 r.run();
1300 }
1301 }
1302 }
1303
1304 /**
1305 * A handler for rejected tasks that throws a
1306 * <tt>RejectedExecutionException</tt>.
1307 */
1308 public static class AbortPolicy implements RejectedExecutionHandler {
1309
1310 /**
1311 * Creates a <tt>AbortPolicy</tt>.
1312 */
1313 public AbortPolicy() { }
1314
1315 /**
1316 * Always throws RejectedExecutionException
1317 * @param r the runnable task requested to be executed
1318 * @param e the executor attempting to execute this task
1319 * @throws RejectedExecutionException always.
1320 */
1321 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1322 throw new RejectedExecutionException();
1323 }
1324 }
1325
1326 /**
1327 * A handler for rejected tasks that silently discards the
1328 * rejected task.
1329 */
1330 public static class DiscardPolicy implements RejectedExecutionHandler {
1331
1332 /**
1333 * Creates <tt>DiscardPolicy</tt>.
1334 */
1335 public DiscardPolicy() { }
1336
1337 /**
1338 * Does nothing, which has the effect of discarding task r.
1339 * @param r the runnable task requested to be executed
1340 * @param e the executor attempting to execute this task
1341 */
1342 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1343 }
1344 }
1345
1346 /**
1347 * A handler for rejected tasks that discards the oldest unhandled
1348 * request and then retries <tt>execute</tt>, unless the executor
1349 * is shut down, in which case the task is discarded.
1350 */
1351 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1352 /**
1353 * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1354 */
1355 public DiscardOldestPolicy() { }
1356
1357 /**
1358 * Obtains and ignores the next task that the executor
1359 * would otherwise execute, if one is immediately available,
1360 * and then retries execution of task r, unless the executor
1361 * is shut down, in which case task r is instead discarded.
1362 * @param r the runnable task requested to be executed
1363 * @param e the executor attempting to execute this task
1364 */
1365 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1366 if (!e.isShutdown()) {
1367 e.getQueue().poll();
1368 e.execute(r);
1369 }
1370 }
1371 }
1372 }