ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.24
Committed: Sat Sep 13 18:51:11 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.23: +44 -23 lines
Log Message:
Proofreading pass -- many minor adjustments

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An {@link ExecutorService} that executes each submitted task using
13 * one of possibly several pooled threads.
14 *
15 * <p>Thread pools address two different problems: they usually
16 * provide improved performance when executing large numbers of
17 * asynchronous tasks, due to reduced per-task invocation overhead,
18 * and they provide a means of bounding and managing the resources,
19 * including threads, consumed when executing a collection of tasks.
20 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic
21 * statistics, such as the number of completed tasks.
22 *
23 * <p>To be useful across a wide range of contexts, this class
24 * provides many adjustable parameters and extensibility
25 * hooks. However, programmers are urged to use the more convenient
26 * {@link Executors} factory methods {@link
27 * Executors#newCachedThreadPool} (unbounded thread pool, with
28 * automatic thread reclamation), {@link Executors#newFixedThreadPool}
29 * (fixed size thread pool) and {@link
30 * Executors#newSingleThreadExecutor} (single background thread), that
31 * preconfigure settings for the most common usage
32 * scenarios. Otherwise, use the following guide when manually
33 * configuring and tuning this class:
34 *
35 * <dl>
36 *
37 * <dt>Core and maximum pool sizes</dt>
38 *
39 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the
40 * pool size
41 * (see {@link ThreadPoolExecutor#getPoolSize})
42 * according to the bounds set by corePoolSize
43 * (see {@link ThreadPoolExecutor#getCorePoolSize})
44 * and
45 * maximumPoolSize
46 * (see {@link ThreadPoolExecutor#getMaximumPoolSize}).
47 * When a new task is submitted in method {@link
48 * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads
49 * are running, a new thread is created to handle the request, even if
50 * other worker threads are idle. If there are more than
51 * corePoolSize but less than maximumPoolSize threads running, a new
52 * thread will be created only if the queue is full. By setting
53 * corePoolSize and maximumPoolSize the same, you create a fixed-size
54 * thread pool. By setting maximumPoolSize to an essentially unbounded
55 * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to
56 * accomodate an arbitrary number of concurrent tasks. Most typically,
57 * core and maximum pool sizes are set only upon construction, but they
58 * may also be changed dynamically using {@link
59 * ThreadPoolExecutor#setCorePoolSize} and {@link
60 * ThreadPoolExecutor#setMaximumPoolSize}. <dd>
61 *
62 * <dt> On-demand construction
63 *
64 * <dd> By default, even core threads are initially created and
65 * started only when needed by new tasks, but this can be overridden
66 * dynamically using method {@link
67 * ThreadPoolExecutor#prestartCoreThread} or
68 * {@link ThreadPoolExecutor#prestartAllCoreThreads}. </dd>
69 *
70 * <dt>Creating new threads</dt>
71 *
72 * <dd>New threads are created using a {@link ThreadFactory}. By
73 * default, threads are created simply with the <tt>new
74 * Thread(Runnable)</tt> constructor, but by supplying a different
75 * ThreadFactory, you can alter the thread's name, thread group,
76 * priority, daemon status, etc. </dd>
77 *
78 * <dt>Keep-alive times</dt>
79 *
80 * <dd>If the pool currently has more than corePoolSize threads,
81 * excess threads will be terminated if they have been idle for more
82 * than the keepAliveTime (see {@link
83 * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of
84 * reducing resource consumption when the pool is not being actively
85 * used. If the pool becomes more active later, new threads will be
86 * constructed. This parameter can also be changed dynamically
87 * using method {@link ThreadPoolExecutor#setKeepAliveTime}. Using
88 * a value of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS}
89 * effectively disables idle threads from ever terminating prior
90 * to shut down.
91 * </dd>
92 *
93 * <dt>Queueing</dt>
94 *
95 * <dd>Any {@link BlockingQueue} may be used to transfer and hold
96 * submitted tasks. The use of this queue interacts with pool sizing:
97 *
98 * <ul>
99 *
100 * <li> If fewer than corePoolSize threads are running, the Executor
101 * always prefers adding a new thread
102 * rather than queueing.</li>
103 *
104 * <li> If corePoolSize or more threads are running, the Executor
105 * always prefers queuing a request rather than adding a new
106 * thread.</li>
107 *
108 * <li> If a request cannot be queued, a new thread is created unless
109 * this would exceed maximumPoolSize, in which case, the task will be
110 * rejected.</li>
111 *
112 * </ul>
113 *
114 * There are three general strategies for queuing:
115 * <ol>
116 *
117 * <li> <em> Direct handoffs.</em> A good default choice for a work
118 * queue is a {@link SynchronousQueue} that hands off tasks to threads
119 * without otherwise holding them. Here, an attempt to queue a task
120 * will fail if no threads are immediately available to run it, so a
121 * new thread will be constructed. This policy avoids lockups when
122 * handling sets of requests that might have internal dependencies.
123 * Direct handoffs generally require unbounded maximumPoolSizes to
124 * avoid rejection of new submitted tasks. This in turn admits the
125 * possibility of unbounded thread growth when commands continue to
126 * arrive on average faster than they can be processed. </li>
127 *
128 * <li><em> Unbounded queues.</em> Using an unbounded queue (for
129 * example a {@link LinkedBlockingQueue} without a predefined
130 * capacity) will cause new tasks to be queued in cases where all
131 * corePoolSize threads are busy. Thus, no more than corePoolSize
132 * threads will ever be created. (And the value of the maximumPoolSize
133 * therefore doesn't have any effect.) This may be appropriate when
134 * each task is completely independent of others, so tasks cannot
135 * affect each others execution; for example, in a web page server.
136 * While this style of queuing can be useful in smoothing out
137 * transient bursts of requests, it admits the possibility of
138 * unbounded work queue growth when commands continue to arrive on
139 * average faster than they can be processed. </li>
140 *
141 * <li><em>Bounded queues.</em> A bounded queue (for example, an
142 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
143 * used with finite maximumPoolSizes, but can be more difficult to
144 * tune and control. Queue sizes and maximum pool sizes may be traded
145 * off for each other: Using large queues and small pools minimizes
146 * CPU usage, OS resources, and context-switching overhead, but can
147 * lead to artifically low throughput. If tasks frequently block (for
148 * example if they are I/O bound), a system may be able to schedule
149 * time for more threads than you otherwise allow. Use of small queues
150 * generally requires larger pool sizes, which keeps CPUs busier but
151 * may encounter unacceptable scheduling overhead, which also
152 * decreases throughput. </li>
153 *
154 * </ol>
155 *
156 * </dd>
157 *
158 * <dt>Rejected tasks</dt>
159 *
160 * <dd> New tasks submitted in method {@link
161 * ThreadPoolExecutor#execute} will be <em>rejected</em> when the
162 * Executor has been shut down, and also when the Executor uses finite
163 * bounds for both maximum threads and work queue capacity, and is
164 * saturated. In either case, the <tt>execute</tt> method invokes the
165 * {@link RejectedExecutionHandler#rejectedExecution} method of its
166 * {@link RejectedExecutionHandler}. Four predefined handler policies
167 * are provided:
168 *
169 * <ol>
170 *
171 * <li> In the
172 * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a
173 * runtime {@link RejectedExecutionException} upon rejection. </li>
174 *
175 * <li> In {@link
176 * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes
177 * <tt>execute</tt> itself runs the task. This provides a simple
178 * feedback control mechanism that will slow down the rate that new
179 * tasks are submitted. </li>
180 *
181 * <li> In {@link ThreadPoolExecutor.DiscardPolicy},
182 * a task that cannot be executed is simply dropped. </li>
183 *
184 * <li>In {@link
185 * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not
186 * shut down, the task at the head of the work queue is dropped, and
187 * then execution is retried (which can fail again, causing this to be
188 * repeated.) </li>
189 *
190 * </ol>
191 *
192 * It is possible to define and use other kinds of {@link
193 * RejectedExecutionHandler} classes. Doing so requires some care
194 * especially when policies are designed to work only under particular
195 * capacity or queueing policies. </dd>
196 *
197 * <dt>Hook methods</dt>
198 *
199 * <dd>This class provides <tt>protected</tt> overridable {@link
200 * ThreadPoolExecutor#beforeExecute} and {@link
201 * ThreadPoolExecutor#afterExecute} methods that are called before and
202 * after execution of each task. These can be used to manipulate the
203 * execution environment, for example, reinitializing ThreadLocals,
204 * gathering statistics, or adding log entries. Additionally, method
205 * {@link ThreadPoolExecutor#terminated} can be overridden to perform
206 * any special processing that needs to be done once the Executor has
207 * fully terminated.</dd>
208 *
209 * <dt>Queue maintenance</dt>
210 *
211 * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to
212 * the work queue for purposes of monitoring and debugging. Use of
213 * this method for any other purpose is strongly discouraged. Two
214 * supplied methods, {@link ThreadPoolExecutor#remove} and {@link
215 * ThreadPoolExecutor#purge} are available to assist in storage
216 * reclamation when large numbers of queued tasks become
217 * cancelled.</dd> </dl>
218 *
219 * @since 1.5
220 * @author Doug Lea
221 */
222 public class ThreadPoolExecutor implements ExecutorService {
223 /**
224 * Queue used for holding tasks and handing off to worker threads.
225 */
226 private final BlockingQueue<Runnable> workQueue;
227
228 /**
229 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
230 * workers set.
231 */
232 private final ReentrantLock mainLock = new ReentrantLock();
233
234 /**
235 * Wait condition to support awaitTermination
236 */
237 private final Condition termination = mainLock.newCondition();
238
239 /**
240 * Set containing all worker threads in pool.
241 */
242 private final HashSet<Worker> workers = new HashSet<Worker>();
243
244 /**
245 * Timeout in nanosecods for idle threads waiting for work.
246 * Threads use this timeout only when there are more than
247 * corePoolSize present. Otherwise they wait forever for new work.
248 */
249 private volatile long keepAliveTime;
250
251 /**
252 * Core pool size, updated only while holding mainLock,
253 * but volatile to allow concurrent readability even
254 * during updates.
255 */
256 private volatile int corePoolSize;
257
258 /**
259 * Maximum pool size, updated only while holding mainLock
260 * but volatile to allow concurrent readability even
261 * during updates.
262 */
263 private volatile int maximumPoolSize;
264
265 /**
266 * Current pool size, updated only while holding mainLock
267 * but volatile to allow concurrent readability even
268 * during updates.
269 */
270 private volatile int poolSize;
271
272 /**
273 * Lifecycle state
274 */
275 private volatile int runState;
276
277 // Special values for runState
278 /** Normal, not-shutdown mode */
279 private static final int RUNNING = 0;
280 /** Controlled shutdown mode */
281 private static final int SHUTDOWN = 1;
282 /** Immediate shutdown mode */
283 private static final int STOP = 2;
284 /** Final state */
285 private static final int TERMINATED = 3;
286
287 /**
288 * Handler called when saturated or shutdown in execute.
289 */
290 private volatile RejectedExecutionHandler handler = defaultHandler;
291
292 /**
293 * Factory for new threads.
294 */
295 private volatile ThreadFactory threadFactory = defaultThreadFactory;
296
297 /**
298 * Tracks largest attained pool size.
299 */
300 private int largestPoolSize;
301
302 /**
303 * Counter for completed tasks. Updated only on termination of
304 * worker threads.
305 */
306 private long completedTaskCount;
307
308 /**
309 * The default thread factory
310 */
311 private static final ThreadFactory defaultThreadFactory =
312 new ThreadFactory() {
313 public Thread newThread(Runnable r) {
314 return new Thread(r);
315 }
316 };
317
318 /**
319 * The default rejectect execution handler
320 */
321 private static final RejectedExecutionHandler defaultHandler =
322 new AbortPolicy();
323
324 /**
325 * Invoke the rejected execution handler for the given command.
326 */
327 void reject(Runnable command) {
328 handler.rejectedExecution(command, this);
329 }
330
331 /**
332 * Create and return a new thread running firstTask as its first
333 * task. Call only while holding mainLock
334 * @param firstTask the task the new thread should run first (or
335 * null if none)
336 * @return the new thread
337 */
338 private Thread addThread(Runnable firstTask) {
339 Worker w = new Worker(firstTask);
340 Thread t = threadFactory.newThread(w);
341 w.thread = t;
342 workers.add(w);
343 int nt = ++poolSize;
344 if (nt > largestPoolSize)
345 largestPoolSize = nt;
346 return t;
347 }
348
349
350
351 /**
352 * Create and start a new thread running firstTask as its first
353 * task, only if less than corePoolSize threads are running.
354 * @param firstTask the task the new thread should run first (or
355 * null if none)
356 * @return true if successful.
357 */
358 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
359 Thread t = null;
360 mainLock.lock();
361 try {
362 if (poolSize < corePoolSize)
363 t = addThread(firstTask);
364 } finally {
365 mainLock.unlock();
366 }
367 if (t == null)
368 return false;
369 t.start();
370 return true;
371 }
372
373 /**
374 * Create and start a new thread only if less than maximumPoolSize
375 * threads are running. The new thread runs as its first task the
376 * next task in queue, or if there is none, the given task.
377 * @param firstTask the task the new thread should run first (or
378 * null if none)
379 * @return null on failure, else the first task to be run by new thread.
380 */
381 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
382 Thread t = null;
383 Runnable next = null;
384 mainLock.lock();
385 try {
386 if (poolSize < maximumPoolSize) {
387 next = workQueue.poll();
388 if (next == null)
389 next = firstTask;
390 t = addThread(next);
391 }
392 } finally {
393 mainLock.unlock();
394 }
395 if (t == null)
396 return null;
397 t.start();
398 return next;
399 }
400
401
402 /**
403 * Get the next task for a worker thread to run.
404 * @return the task
405 * @throws InterruptedException if interrupted while waiting for task
406 */
407 private Runnable getTask() throws InterruptedException {
408 for (;;) {
409 switch(runState) {
410 case RUNNING: {
411 if (poolSize <= corePoolSize) // untimed wait if core
412 return workQueue.take();
413
414 long timeout = keepAliveTime;
415 if (timeout <= 0) // die immediately for 0 timeout
416 return null;
417 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
418 if (r != null)
419 return r;
420 if (poolSize > corePoolSize) // timed out
421 return null;
422 // else, after timeout, pool shrank so shouldn't die, so retry
423 break;
424 }
425
426 case SHUTDOWN: {
427 // Help drain queue
428 Runnable r = workQueue.poll();
429 if (r != null)
430 return r;
431
432 // Check if can terminate
433 if (workQueue.isEmpty()) {
434 interruptIdleWorkers();
435 return null;
436 }
437
438 // There could still be delayed tasks in queue.
439 // Wait for one, re-checking state upon interruption
440 try {
441 return workQueue.take();
442 }
443 catch(InterruptedException ignore) {
444 }
445 break;
446 }
447
448 case STOP:
449 return null;
450 default:
451 assert false;
452 }
453 }
454 }
455
456 /**
457 * Wake up all threads that might be waiting for tasks.
458 */
459 void interruptIdleWorkers() {
460 mainLock.lock();
461 try {
462 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
463 it.next().interruptIfIdle();
464 } finally {
465 mainLock.unlock();
466 }
467 }
468
469 /**
470 * Perform bookkeeping for a terminated worker thread.
471 * @param w the worker
472 */
473 private void workerDone(Worker w) {
474 mainLock.lock();
475 try {
476 completedTaskCount += w.completedTasks;
477 workers.remove(w);
478 if (--poolSize > 0)
479 return;
480
481 // Else, this is the last thread. Deal with potential shutdown.
482
483 int state = runState;
484 assert state != TERMINATED;
485
486 if (state != STOP) {
487 // If there are queued tasks but no threads, create
488 // replacement.
489 Runnable r = workQueue.poll();
490 if (r != null) {
491 addThread(r).start();
492 return;
493 }
494
495 // If there are some (presumably delayed) tasks but
496 // none pollable, create an idle replacement to wait.
497 if (!workQueue.isEmpty()) {
498 addThread(null).start();
499 return;
500 }
501
502 // Otherwise, we can exit without replacement
503 if (state == RUNNING)
504 return;
505 }
506
507 // Either state is STOP, or state is SHUTDOWN and there is
508 // no work to do. So we can terminate.
509 runState = TERMINATED;
510 termination.signalAll();
511 // fall through to call terminate() outside of lock.
512 } finally {
513 mainLock.unlock();
514 }
515
516 assert runState == TERMINATED;
517 terminated();
518 }
519
520 /**
521 * Worker threads
522 */
523 private class Worker implements Runnable {
524
525 /**
526 * The runLock is acquired and released surrounding each task
527 * execution. It mainly protects against interrupts that are
528 * intended to cancel the worker thread from instead
529 * interrupting the task being run.
530 */
531 private final ReentrantLock runLock = new ReentrantLock();
532
533 /**
534 * Initial task to run before entering run loop
535 */
536 private Runnable firstTask;
537
538 /**
539 * Per thread completed task counter; accumulated
540 * into completedTaskCount upon termination.
541 */
542 volatile long completedTasks;
543
544 /**
545 * Thread this worker is running in. Acts as a final field,
546 * but cannot be set until thread is created.
547 */
548 Thread thread;
549
550 Worker(Runnable firstTask) {
551 this.firstTask = firstTask;
552 }
553
554 boolean isActive() {
555 return runLock.isLocked();
556 }
557
558 /**
559 * Interrupt thread if not running a task
560 */
561 void interruptIfIdle() {
562 if (runLock.tryLock()) {
563 try {
564 thread.interrupt();
565 } finally {
566 runLock.unlock();
567 }
568 }
569 }
570
571 /**
572 * Cause thread to die even if running a task.
573 */
574 void interruptNow() {
575 thread.interrupt();
576 }
577
578 /**
579 * Run a single task between before/after methods.
580 */
581 private void runTask(Runnable task) {
582 runLock.lock();
583 try {
584 // Abort now if immediate cancel. Otherwise, we have
585 // committed to run this task.
586 if (runState == STOP)
587 return;
588
589 Thread.interrupted(); // clear interrupt status on entry
590 boolean ran = false;
591 beforeExecute(thread, task);
592 try {
593 task.run();
594 ran = true;
595 afterExecute(task, null);
596 ++completedTasks;
597 } catch(RuntimeException ex) {
598 if (!ran)
599 afterExecute(task, ex);
600 // Else the exception occurred within
601 // afterExecute itself in which case we don't
602 // want to call it again.
603 throw ex;
604 }
605 } finally {
606 runLock.unlock();
607 }
608 }
609
610 /**
611 * Main run loop
612 */
613 public void run() {
614 try {
615 for (;;) {
616 Runnable task;
617 if (firstTask != null) {
618 task = firstTask;
619 firstTask = null;
620 } else {
621 task = getTask();
622 if (task == null)
623 break;
624 }
625 runTask(task);
626 task = null; // unnecessary but can help GC
627 }
628 } catch(InterruptedException ie) {
629 // fall through
630 } finally {
631 workerDone(this);
632 }
633 }
634 }
635
636 // Public methods
637
638 /**
639 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
640 * initial parameters. It may be more convenient to use one of
641 * the {@link Executors} factory methods instead of this general
642 * purpose constructor.
643 *
644 * @param corePoolSize the number of threads to keep in the
645 * pool, even if they are idle.
646 * @param maximumPoolSize the maximum number of threads to allow in the
647 * pool.
648 * @param keepAliveTime when the number of threads is greater than
649 * the core, this is the maximum time that excess idle threads
650 * will wait for new tasks before terminating.
651 * @param unit the time unit for the keepAliveTime
652 * argument.
653 * @param workQueue the queue to use for holding tasks before the
654 * are executed. This queue will hold only the <tt>Runnable</tt>
655 * tasks submitted by the <tt>execute</tt> method.
656 * @throws IllegalArgumentException if corePoolSize, or
657 * keepAliveTime less than zero, or if maximumPoolSize less than or
658 * equal to zero, or if corePoolSize greater than maximumPoolSize.
659 * @throws NullPointerException if <tt>workQueue</tt> is null
660 */
661 public ThreadPoolExecutor(int corePoolSize,
662 int maximumPoolSize,
663 long keepAliveTime,
664 TimeUnit unit,
665 BlockingQueue<Runnable> workQueue) {
666 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
667 defaultThreadFactory, defaultHandler);
668 }
669
670 /**
671 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
672 * parameters.
673 *
674 * @param corePoolSize the number of threads to keep in the
675 * pool, even if they are idle.
676 * @param maximumPoolSize the maximum number of threads to allow in the
677 * pool.
678 * @param keepAliveTime when the number of threads is greater than
679 * the core, this is the maximum time that excess idle threads
680 * will wait for new tasks before terminating.
681 * @param unit the time unit for the keepAliveTime
682 * argument.
683 * @param workQueue the queue to use for holding tasks before the
684 * are executed. This queue will hold only the <tt>Runnable</tt>
685 * tasks submitted by the <tt>execute</tt> method.
686 * @param threadFactory the factory to use when the executor
687 * creates a new thread.
688 * @throws IllegalArgumentException if corePoolSize, or
689 * keepAliveTime less than zero, or if maximumPoolSize less than or
690 * equal to zero, or if corePoolSize greater than maximumPoolSize.
691 * @throws NullPointerException if <tt>workQueue</tt>
692 * or <tt>threadFactory</tt> are null.
693 */
694 public ThreadPoolExecutor(int corePoolSize,
695 int maximumPoolSize,
696 long keepAliveTime,
697 TimeUnit unit,
698 BlockingQueue<Runnable> workQueue,
699 ThreadFactory threadFactory) {
700
701 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
702 threadFactory, defaultHandler);
703 }
704
705 /**
706 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
707 * parameters.
708 *
709 * @param corePoolSize the number of threads to keep in the
710 * pool, even if they are idle.
711 * @param maximumPoolSize the maximum number of threads to allow in the
712 * pool.
713 * @param keepAliveTime when the number of threads is greater than
714 * the core, this is the maximum time that excess idle threads
715 * will wait for new tasks before terminating.
716 * @param unit the time unit for the keepAliveTime
717 * argument.
718 * @param workQueue the queue to use for holding tasks before the
719 * are executed. This queue will hold only the <tt>Runnable</tt>
720 * tasks submitted by the <tt>execute</tt> method.
721 * @param handler the handler to use when execution is blocked
722 * because the thread bounds and queue capacities are reached.
723 * @throws IllegalArgumentException if corePoolSize, or
724 * keepAliveTime less than zero, or if maximumPoolSize less than or
725 * equal to zero, or if corePoolSize greater than maximumPoolSize.
726 * @throws NullPointerException if <tt>workQueue</tt>
727 * or <tt>handler</tt> are null.
728 */
729 public ThreadPoolExecutor(int corePoolSize,
730 int maximumPoolSize,
731 long keepAliveTime,
732 TimeUnit unit,
733 BlockingQueue<Runnable> workQueue,
734 RejectedExecutionHandler handler) {
735 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
736 defaultThreadFactory, handler);
737 }
738
739 /**
740 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
741 * parameters.
742 *
743 * @param corePoolSize the number of threads to keep in the
744 * pool, even if they are idle.
745 * @param maximumPoolSize the maximum number of threads to allow in the
746 * pool.
747 * @param keepAliveTime when the number of threads is greater than
748 * the core, this is the maximum time that excess idle threads
749 * will wait for new tasks before terminating.
750 * @param unit the time unit for the keepAliveTime
751 * argument.
752 * @param workQueue the queue to use for holding tasks before the
753 * are executed. This queue will hold only the <tt>Runnable</tt>
754 * tasks submitted by the <tt>execute</tt> method.
755 * @param threadFactory the factory to use when the executor
756 * creates a new thread.
757 * @param handler the handler to use when execution is blocked
758 * because the thread bounds and queue capacities are reached.
759 * @throws IllegalArgumentException if corePoolSize, or
760 * keepAliveTime less than zero, or if maximumPoolSize less than or
761 * equal to zero, or if corePoolSize greater than maximumPoolSize.
762 * @throws NullPointerException if <tt>workQueue</tt>
763 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
764 */
765 public ThreadPoolExecutor(int corePoolSize,
766 int maximumPoolSize,
767 long keepAliveTime,
768 TimeUnit unit,
769 BlockingQueue<Runnable> workQueue,
770 ThreadFactory threadFactory,
771 RejectedExecutionHandler handler) {
772 if (corePoolSize < 0 ||
773 maximumPoolSize <= 0 ||
774 maximumPoolSize < corePoolSize ||
775 keepAliveTime < 0)
776 throw new IllegalArgumentException();
777 if (workQueue == null || threadFactory == null || handler == null)
778 throw new NullPointerException();
779 this.corePoolSize = corePoolSize;
780 this.maximumPoolSize = maximumPoolSize;
781 this.workQueue = workQueue;
782 this.keepAliveTime = unit.toNanos(keepAliveTime);
783 this.threadFactory = threadFactory;
784 this.handler = handler;
785 }
786
787
788 /**
789 * Executes the given task sometime in the future. The task
790 * may execute in a new thread or in an existing pooled thread.
791 *
792 * If the task cannot be submitted for execution, either because this
793 * executor has been shutdown or because its capacity has been reached,
794 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
795 *
796 * @param command the task to execute
797 * @throws RejectedExecutionException at discretion of
798 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
799 * for execution
800 */
801 public void execute(Runnable command) {
802 for (;;) {
803 if (runState != RUNNING) {
804 reject(command);
805 return;
806 }
807 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
808 return;
809 if (workQueue.offer(command))
810 return;
811 Runnable r = addIfUnderMaximumPoolSize(command);
812 if (r == command)
813 return;
814 if (r == null) {
815 reject(command);
816 return;
817 }
818 // else retry
819 }
820 }
821
822 public void shutdown() {
823 mainLock.lock();
824 try {
825 if (runState == RUNNING) // don't override shutdownNow
826 runState = SHUTDOWN;
827 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
828 it.next().interruptIfIdle();
829 } finally {
830 mainLock.unlock();
831 }
832 }
833
834
835 public List shutdownNow() {
836 mainLock.lock();
837 try {
838 if (runState != TERMINATED)
839 runState = STOP;
840 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
841 it.next().interruptNow();
842 } finally {
843 mainLock.unlock();
844 }
845 return Arrays.asList(workQueue.toArray());
846 }
847
848 public boolean isShutdown() {
849 return runState != RUNNING;
850 }
851
852 /**
853 * Return true if this executor is in the process of terminating
854 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
855 * completely terminated. This method may be useful for
856 * debugging. A return of <tt>true</tt> reported a sufficient
857 * period after shutdown may indicate that submitted tasks have
858 * ignored or suppressed interruption, causing this executor not
859 * to properly terminate.
860 * @return true if terminating but not yet terminated.
861 */
862 public boolean isTerminating() {
863 return runState == STOP;
864 }
865
866 public boolean isTerminated() {
867 return runState == TERMINATED;
868 }
869
870 public boolean awaitTermination(long timeout, TimeUnit unit)
871 throws InterruptedException {
872 mainLock.lock();
873 try {
874 return termination.await(timeout, unit);
875 } finally {
876 mainLock.unlock();
877 }
878 }
879
880 /**
881 * Invokes <tt>shutdown</tt> when this executor is no longer
882 * referenced.
883 */
884 protected void finalize() {
885 shutdown();
886 }
887
888 /**
889 * Sets the thread factory used to create new threads.
890 *
891 * @param threadFactory the new thread factory
892 * @see #getThreadFactory
893 */
894 public void setThreadFactory(ThreadFactory threadFactory) {
895 this.threadFactory = threadFactory;
896 }
897
898 /**
899 * Returns the thread factory used to create new threads.
900 *
901 * @return the current thread factory
902 * @see #setThreadFactory
903 */
904 public ThreadFactory getThreadFactory() {
905 return threadFactory;
906 }
907
908 /**
909 * Sets a new handler for unexecutable tasks.
910 *
911 * @param handler the new handler
912 * @see #getRejectedExecutionHandler
913 */
914 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
915 this.handler = handler;
916 }
917
918 /**
919 * Returns the current handler for unexecutable tasks.
920 *
921 * @return the current handler
922 * @see #setRejectedExecutionHandler
923 */
924 public RejectedExecutionHandler getRejectedExecutionHandler() {
925 return handler;
926 }
927
928 /**
929 * Returns the task queue used by this executor. Access to the
930 * task queue is intended primarily for debugging and monitoring.
931 * This queue may be in active use. Retrieveing the task queue
932 * does not prevent queued tasks from executing.
933 *
934 * @return the task queue
935 */
936 public BlockingQueue<Runnable> getQueue() {
937 return workQueue;
938 }
939
940 /**
941 * Removes this task from internal queue if it is present, thus
942 * causing it not to be run if it has not already started. This
943 * method may be useful as one part of a cancellation scheme.
944 *
945 * @param task the task to remove
946 * @return true if the task was removed
947 */
948 public boolean remove(Runnable task) {
949 return getQueue().remove(task);
950 }
951
952
953 /**
954 * Tries to remove from the work queue all {@link Cancellable}
955 * tasks that have been cancelled. This method can be useful as a
956 * storage reclamation operation, that has no other impact on
957 * functionality. Cancelled tasks are never executed, but may
958 * accumulate in work queues until worker threads can actively
959 * remove them. Invoking this method instead tries to remove them now.
960 * However, this method may fail to remove tasks in
961 * the presence of interference by other threads.
962 */
963
964 public void purge() {
965 // Fail if we encounter interference during traversal
966 try {
967 Iterator<Runnable> it = getQueue().iterator();
968 while (it.hasNext()) {
969 Runnable r = it.next();
970 if (r instanceof Cancellable) {
971 Cancellable c = (Cancellable)r;
972 if (c.isCancelled())
973 it.remove();
974 }
975 }
976 }
977 catch(ConcurrentModificationException ex) {
978 return;
979 }
980 }
981
982 /**
983 * Sets the core number of threads. This overrides any value set
984 * in the constructor. If the new value is smaller than the
985 * current value, excess existing threads will be terminated when
986 * they next become idle.
987 *
988 * @param corePoolSize the new core size
989 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
990 * less than zero
991 * @see #getCorePoolSize
992 */
993 public void setCorePoolSize(int corePoolSize) {
994 if (corePoolSize < 0)
995 throw new IllegalArgumentException();
996 mainLock.lock();
997 try {
998 int extra = this.corePoolSize - corePoolSize;
999 this.corePoolSize = corePoolSize;
1000 if (extra > 0 && poolSize > corePoolSize) {
1001 Iterator<Worker> it = workers.iterator();
1002 while (it.hasNext() &&
1003 extra > 0 &&
1004 poolSize > corePoolSize &&
1005 workQueue.remainingCapacity() == 0) {
1006 it.next().interruptIfIdle();
1007 --extra;
1008 }
1009 }
1010
1011 } finally {
1012 mainLock.unlock();
1013 }
1014 }
1015
1016 /**
1017 * Returns the core number of threads.
1018 *
1019 * @return the core number of threads
1020 * @see #setCorePoolSize
1021 */
1022 public int getCorePoolSize() {
1023 return corePoolSize;
1024 }
1025
1026 /**
1027 * Start a core thread, causing it to idly wait for work. This
1028 * overrides the default policy of starting core threads only when
1029 * new tasks are executed. This method will return <tt>false</tt>
1030 * if all core threads have already been started.
1031 * @return true if a thread was started
1032 */
1033 public boolean prestartCoreThread() {
1034 return addIfUnderCorePoolSize(null);
1035 }
1036
1037 /**
1038 * Start all core threads, causing them to idly wait for work. This
1039 * overrides the default policy of starting core threads only when
1040 * new tasks are executed.
1041 * @return the number of threads started.
1042 */
1043 public int prestartAllCoreThreads() {
1044 int n = 0;
1045 while (addIfUnderCorePoolSize(null))
1046 ++n;
1047 return n;
1048 }
1049
1050 /**
1051 * Sets the maximum allowed number of threads. This overrides any
1052 * value set in the constructor. If the new value is smaller than
1053 * the current value, excess existing threads will be
1054 * terminated when they next become idle.
1055 *
1056 * @param maximumPoolSize the new maximum
1057 * @throws IllegalArgumentException if maximumPoolSize less than zero or
1058 * the {@link #getCorePoolSize core pool size}
1059 * @see #getMaximumPoolSize
1060 */
1061 public void setMaximumPoolSize(int maximumPoolSize) {
1062 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1063 throw new IllegalArgumentException();
1064 mainLock.lock();
1065 try {
1066 int extra = this.maximumPoolSize - maximumPoolSize;
1067 this.maximumPoolSize = maximumPoolSize;
1068 if (extra > 0 && poolSize > maximumPoolSize) {
1069 Iterator<Worker> it = workers.iterator();
1070 while (it.hasNext() &&
1071 extra > 0 &&
1072 poolSize > maximumPoolSize) {
1073 it.next().interruptIfIdle();
1074 --extra;
1075 }
1076 }
1077 } finally {
1078 mainLock.unlock();
1079 }
1080 }
1081
1082 /**
1083 * Returns the maximum allowed number of threads.
1084 *
1085 * @return the maximum allowed number of threads
1086 * @see #setMaximumPoolSize
1087 */
1088 public int getMaximumPoolSize() {
1089 return maximumPoolSize;
1090 }
1091
1092 /**
1093 * Sets the time limit for which threads may remain idle before
1094 * being terminated. If there are more than the core number of
1095 * threads currently in the pool, after waiting this amount of
1096 * time without processing a task, excess threads will be
1097 * terminated. This overrides any value set in the constructor.
1098 * @param time the time to wait. A time value of zero will cause
1099 * excess threads to terminate immediately after executing tasks.
1100 * @param unit the time unit of the time argument
1101 * @throws IllegalArgumentException if time less than zero
1102 * @see #getKeepAliveTime
1103 */
1104 public void setKeepAliveTime(long time, TimeUnit unit) {
1105 if (time < 0)
1106 throw new IllegalArgumentException();
1107 this.keepAliveTime = unit.toNanos(time);
1108 }
1109
1110 /**
1111 * Returns the thread keep-alive time, which is the amount of time
1112 * which threads in excess of the core pool size may remain
1113 * idle before being terminated.
1114 *
1115 * @param unit the desired time unit of the result
1116 * @return the time limit
1117 * @see #setKeepAliveTime
1118 */
1119 public long getKeepAliveTime(TimeUnit unit) {
1120 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1121 }
1122
1123 /* Statistics */
1124
1125 /**
1126 * Returns the current number of threads in the pool.
1127 *
1128 * @return the number of threads
1129 */
1130 public int getPoolSize() {
1131 return poolSize;
1132 }
1133
1134 /**
1135 * Returns the approximate number of threads that are actively
1136 * executing tasks.
1137 *
1138 * @return the number of threads
1139 */
1140 public int getActiveCount() {
1141 mainLock.lock();
1142 try {
1143 int n = 0;
1144 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1145 if (it.next().isActive())
1146 ++n;
1147 }
1148 return n;
1149 } finally {
1150 mainLock.unlock();
1151 }
1152 }
1153
1154 /**
1155 * Returns the largest number of threads that have ever
1156 * simultaneously been in the pool.
1157 *
1158 * @return the number of threads
1159 */
1160 public int getLargestPoolSize() {
1161 mainLock.lock();
1162 try {
1163 return largestPoolSize;
1164 } finally {
1165 mainLock.unlock();
1166 }
1167 }
1168
1169 /**
1170 * Returns the approximate total number of tasks that have been
1171 * scheduled for execution. Because the states of tasks and
1172 * threads may change dynamically during computation, the returned
1173 * value is only an approximation, but one that does not ever
1174 * decrease across successive calls.
1175 *
1176 * @return the number of tasks
1177 */
1178 public long getTaskCount() {
1179 mainLock.lock();
1180 try {
1181 long n = completedTaskCount;
1182 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1183 Worker w = it.next();
1184 n += w.completedTasks;
1185 if (w.isActive())
1186 ++n;
1187 }
1188 return n + workQueue.size();
1189 } finally {
1190 mainLock.unlock();
1191 }
1192 }
1193
1194 /**
1195 * Returns the approximate total number of tasks that have
1196 * completed execution. Because the states of tasks and threads
1197 * may change dynamically during computation, the returned value
1198 * is only an approximation, but one that does not ever decrease
1199 * across successive calls.
1200 *
1201 * @return the number of tasks
1202 */
1203 public long getCompletedTaskCount() {
1204 mainLock.lock();
1205 try {
1206 long n = completedTaskCount;
1207 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1208 n += it.next().completedTasks;
1209 return n;
1210 } finally {
1211 mainLock.unlock();
1212 }
1213 }
1214
1215 /**
1216 * Method invoked prior to executing the given Runnable in the
1217 * given thread. This method may be used to re-initialize
1218 * ThreadLocals, or to perform logging. Note: To properly nest
1219 * multiple overridings, subclasses should generally invoke
1220 * <tt>super.beforeExecute</tt> at the end of this method.
1221 *
1222 * @param t the thread that will run task r.
1223 * @param r the task that will be executed.
1224 */
1225 protected void beforeExecute(Thread t, Runnable r) { }
1226
1227 /**
1228 * Method invoked upon completion of execution of the given
1229 * Runnable. If non-null, the Throwable is the uncaught exception
1230 * that caused execution to terminate abruptly. Note: To properly
1231 * nest multiple overridings, subclasses should generally invoke
1232 * <tt>super.afterExecute</tt> at the beginning of this method.
1233 *
1234 * @param r the runnable that has completed.
1235 * @param t the exception that caused termination, or null if
1236 * execution completed normally.
1237 */
1238 protected void afterExecute(Runnable r, Throwable t) { }
1239
1240 /**
1241 * Method invoked when the Executor has terminated. Default
1242 * implementation does nothing. Note: To properly nest multiple
1243 * overridings, subclasses should generally invoke
1244 * <tt>super.terminated</tt> within this method.
1245 */
1246 protected void terminated() { }
1247
1248 /**
1249 * A handler for rejected tasks that runs the rejected task
1250 * directly in the calling thread of the <tt>execute</tt> method,
1251 * unless the executor has been shut down, in which case the task
1252 * is discarded.
1253 */
1254 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1255
1256 /**
1257 * Creates a <tt>CallerRunsPolicy</tt>.
1258 */
1259 public CallerRunsPolicy() { }
1260
1261 /**
1262 * Executes task r in the caller's thread, unless the executor
1263 * has been shut down, in which case the task is discarded.
1264 * @param r the runnable task requested to be executed
1265 * @param e the executor attempting to execute this task
1266 */
1267 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1268 if (!e.isShutdown()) {
1269 r.run();
1270 }
1271 }
1272 }
1273
1274 /**
1275 * A handler for rejected tasks that throws a
1276 * <tt>RejectedExecutionException</tt>.
1277 */
1278 public static class AbortPolicy implements RejectedExecutionHandler {
1279
1280 /**
1281 * Creates a <tt>AbortPolicy</tt>.
1282 */
1283 public AbortPolicy() { }
1284
1285 /**
1286 * Always throws RejectedExecutionException
1287 * @param r the runnable task requested to be executed
1288 * @param e the executor attempting to execute this task
1289 * @throws RejectedExecutionException always.
1290 */
1291 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1292 throw new RejectedExecutionException();
1293 }
1294 }
1295
1296 /**
1297 * A handler for rejected tasks that silently discards the
1298 * rejected task.
1299 */
1300 public static class DiscardPolicy implements RejectedExecutionHandler {
1301
1302 /**
1303 * Creates <tt>DiscardPolicy</tt>.
1304 */
1305 public DiscardPolicy() { }
1306
1307 /**
1308 * Does nothing, which has the effect of discarding task r.
1309 * @param r the runnable task requested to be executed
1310 * @param e the executor attempting to execute this task
1311 */
1312 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1313 }
1314 }
1315
1316 /**
1317 * A handler for rejected tasks that discards the oldest unhandled
1318 * request and then retries <tt>execute</tt>, unless the executor
1319 * is shut down, in which case the task is discarded.
1320 */
1321 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1322 /**
1323 * Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
1324 */
1325 public DiscardOldestPolicy() { }
1326
1327 /**
1328 * Obtains and ignores the next task that the executor
1329 * would otherwise execute, if one is immediately available,
1330 * and then retries execution of task r, unless the executor
1331 * is shut down, in which case task r is instead discarded.
1332 * @param r the runnable task requested to be executed
1333 * @param e the executor attempting to execute this task
1334 */
1335 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1336 if (!e.isShutdown()) {
1337 e.getQueue().poll();
1338 e.execute(r);
1339 }
1340 }
1341 }
1342 }