ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.18
Committed: Mon Aug 25 01:58:50 2003 UTC (20 years, 9 months ago) by dholmes
Branch: MAIN
Changes since 1.17: +2 -2 lines
Log Message:
Fixed @Executors typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An {@link ExecutorService} that executes each submitted task using
13 * one of possibly several pooled threads.
14 *
15 * <p>Thread pools address two different problems: they usually
16 * provide improved performance when executing large numbers of
17 * asynchronous tasks, due to reduced per-task invocation overhead,
18 * and they provide a means of bounding and managing the resources,
19 * including threads, consumed when executing a collection of tasks.
20 *
21 * <p>This class is designed to ge highly tunable.
22 *
23 * <p>To be useful across a wide range of contexts, this class
24 * provides many adjustable parameters and extensibility hooks. For
25 * example, it can be configured to create a new thread for each task,
26 * or even to execute tasks sequentially in a single thread, in
27 * addition to its most common configuration, which reuses a pool of
28 * threads. However, programmers are urged to use the more convenient
29 * factory methods <tt>newCachedThreadPool</tt> (unbounded thread
30 * pool, with automatic thread reclamation),
31 * <tt>newFixedThreadPool</tt> (fixed size thread pool),
32 * <tt>newSingleThreadPoolExecutor</tt> (single background thread for
33 * execution of tasks), and <tt>newThreadPerTaskExeceutor</tt>
34 * (execute each task in a new thread), that preconfigure settings for
35 * the most common usage scenarios.
36 *
37 * <p>Each <tt>ThreadPoolExecutor</tt> also maintains some basic
38 * statistics, such as the number of completed tasks, that may be
39 * useful for monitoring and tuning executors.
40 *
41 * <h3>Tuning guide</h3>
42 * <dl>
43 *
44 * <dt>Core and maximum pool size</dt>
45 *
46 * <dd>A ThreadPoolExecutor will automatically adjust the pool size
47 * according to the bounds set by corePoolSize and maximumPoolSize.
48 * When a new task is submitted, and fewer than corePoolSize threads
49 * are running, a new thread is created to handle the request, even if
50 * other worker threads are idle. If there are more than the
51 * corePoolSize but less than maximumPoolSize threads running, a new
52 * thread will be created only if the queue is full. By setting
53 * corePoolSize and maximumPoolSize the same, you create a fixed-size
54 * thread pool. By default, even core threads are only created and
55 * started when needed by new tasks, but this can be overridden
56 * dynamically using method <tt>prestartCoreThread</tt>.
57 * </dd>
58 *
59 * <dt>Keep-alive</dt>
60 *
61 * <dd>The keepAliveTime determines what happens to idle threads. If
62 * the pool currently has more than the core number of threads, excess
63 * threads will be terminated if they have been idle for more than the
64 * keepAliveTime.</dd>
65 *
66 * <dt>Queueing</dt>
67 *
68 * <dd>You are free to specify the queuing mechanism used to handle
69 * submitted tasks. A good default is to use a zero-capacity
70 * <tt>SynchronousQueue</tt> to to hand off work to threads. This is
71 * a safe, conservative policy that avoids lockups when handling sets
72 * of requests that might have internal dependencies. Using an
73 * unbounded queue (for example a <tt>LinkedBlockingQueue</tt>) will
74 * cause new tasks to be queued in cases where all corePoolSize
75 * threads are busy, so no more than corePoolSize threads will be
76 * craated. This may be appropriate when each task is completely
77 * independent of others, so tasks cannot affect each others
78 * execution. For example, in a web page server. When given a choice,
79 * this pool always prefers adding a new thread rather than queueing
80 * if there are currently fewer than the current getCorePoolSize
81 * threads running, but otherwise always prefers queuing a request
82 * rather than adding a new thread.
83 *
84 * <p>While queuing can be useful in smoothing out transient bursts of
85 * requests, especially in socket-based services, it is not very well
86 * behaved when commands continue to arrive on average faster than
87 * they can be processed. Queue sizes and maximum pool sizes can
88 * often be traded off for each other. Using large queues and small
89 * pools minimizes CPU usage, OS resources, and context-switching
90 * overhead, but can lead to artifically low throughput. If tasks
91 * frequently block (for example if they are I/O bound), a system may
92 * be able to schedule time for more threads than you otherwise
93 * allow. Use of small queues or queueless handoffs generally requires
94 * larger pool sizes, which keeps CPUs busier but may encounter
95 * unacceptable scheduling overhead, which also decreases throughput.
96 * </dd>
97 *
98 * <dt>Creating new threads</dt>
99 *
100 * <dd>New threads are created using a ThreadFactory. By default,
101 * threads are created simply with the new Thread(Runnable)
102 * constructor, but by supplying a different ThreadFactory, you can
103 * alter the thread's name, thread group, priority, daemon status,
104 * etc. </dd>
105 *
106 * <dt>Before and after intercepts</dt>
107 *
108 * <dd>This class has overridable methods that which are called before
109 * and after execution of each task. These can be used to manipulate
110 * the execution environment (for example, reinitializing
111 * ThreadLocals), gather statistics, or perform logging. </dd>
112 *
113 * <dt>Rejected tasks</dt>
114 *
115 * <dd>There are a number of factors which can bound the number of
116 * tasks which can execute at once, including the maximum pool size
117 * and the queuing mechanism used. If the executor determines that a
118 * task cannot be executed because it has been refused by the queue
119 * and no threads are available, or because the executor has been shut
120 * down, the RejectedExecutionHandler's rejectedExecution method is
121 * invoked. The default (<tt>AbortPolicy</tt>) handler throws a runtime
122 * {@link RejectedExecutionException} upon rejection. </dd>
123 *
124 * <dt>Termination</dt>
125 *
126 * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
127 * graceful. In an immediate shutdown, any threads currently
128 * executing are interrupted, and any tasks not yet begun are returned
129 * from the shutdownNow call. In a graceful shutdown, all queued
130 * tasks are allowed to run, but new tasks may not be submitted.
131 * </dd>
132 *
133 * </dl>
134 *
135 * @since 1.5
136 * @see RejectedExecutionHandler
137 * @see Executors
138 * @see ThreadFactory
139 *
140 * @spec JSR-166
141 * @revised $Date: 2003/08/24 23:32:25 $
142 * @editor $Author: dl $
143 * @author Doug Lea
144 */
145 public class ThreadPoolExecutor implements ExecutorService {
146 /**
147 * Queue used for holding tasks and handing off to worker threads.
148 */
149 private final BlockingQueue<Runnable> workQueue;
150
151 /**
152 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
153 * workers set.
154 */
155 private final ReentrantLock mainLock = new ReentrantLock();
156
157 /**
158 * Wait condition to support awaitTermination
159 */
160 private final Condition termination = mainLock.newCondition();
161
162 /**
163 * Set containing all worker threads in pool.
164 */
165 private final HashSet<Worker> workers = new HashSet<Worker>();
166
167 /**
168 * Timeout in nanosecods for idle threads waiting for work.
169 * Threads use this timeout only when there are more than
170 * corePoolSize present. Otherwise they wait forever for new work.
171 */
172 private volatile long keepAliveTime;
173
174 /**
175 * Core pool size, updated only while holding mainLock,
176 * but volatile to allow concurrent readability even
177 * during updates.
178 */
179 private volatile int corePoolSize;
180
181 /**
182 * Maximum pool size, updated only while holding mainLock
183 * but volatile to allow concurrent readability even
184 * during updates.
185 */
186 private volatile int maximumPoolSize;
187
188 /**
189 * Current pool size, updated only while holding mainLock
190 * but volatile to allow concurrent readability even
191 * during updates.
192 */
193 private volatile int poolSize;
194
195 /**
196 * Lifecycle state
197 */
198 private volatile int runState;
199
200 // Special values for runState
201 /** Normal, not-shutdown mode */
202 private static final int RUNNING = 0;
203 /** Controlled shutdown mode */
204 private static final int SHUTDOWN = 1;
205 /** Immediate shutdown mode */
206 private static final int STOP = 2;
207 /** Final state */
208 private static final int TERMINATED = 3;
209
210 /**
211 * Handler called when saturated or shutdown in execute.
212 */
213 private volatile RejectedExecutionHandler handler = defaultHandler;
214
215 /**
216 * Factory for new threads.
217 */
218 private volatile ThreadFactory threadFactory = defaultThreadFactory;
219
220 /**
221 * Tracks largest attained pool size.
222 */
223 private int largestPoolSize;
224
225 /**
226 * Counter for completed tasks. Updated only on termination of
227 * worker threads.
228 */
229 private long completedTaskCount;
230
231 /**
232 * The default thread factory
233 */
234 private static final ThreadFactory defaultThreadFactory =
235 new ThreadFactory() {
236 public Thread newThread(Runnable r) {
237 return new Thread(r);
238 }
239 };
240
241 /**
242 * The default rejectect execution handler
243 */
244 private static final RejectedExecutionHandler defaultHandler =
245 new AbortPolicy();
246
247 /**
248 * Invoke the rejected execution handler for the given command.
249 */
250 void reject(Runnable command) {
251 handler.rejectedExecution(command, this);
252 }
253
254 /**
255 * Create and return a new thread running firstTask as its first
256 * task. Call only while holding mainLock
257 * @param firstTask the task the new thread should run first (or
258 * null if none)
259 * @return the new thread
260 */
261 private Thread addThread(Runnable firstTask) {
262 Worker w = new Worker(firstTask);
263 Thread t = threadFactory.newThread(w);
264 w.thread = t;
265 workers.add(w);
266 int nt = ++poolSize;
267 if (nt > largestPoolSize)
268 largestPoolSize = nt;
269 return t;
270 }
271
272
273
274 /**
275 * Create and start a new thread running firstTask as its first
276 * task, only if less than corePoolSize threads are running.
277 * @param firstTask the task the new thread should run first (or
278 * null if none)
279 * @return true if successful.
280 */
281 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
282 Thread t = null;
283 mainLock.lock();
284 try {
285 if (poolSize < corePoolSize)
286 t = addThread(firstTask);
287 } finally {
288 mainLock.unlock();
289 }
290 if (t == null)
291 return false;
292 t.start();
293 return true;
294 }
295
296 /**
297 * Create and start a new thread only if less than maximumPoolSize
298 * threads are running. The new thread runs as its first task the
299 * next task in queue, or if there is none, the given task.
300 * @param firstTask the task the new thread should run first (or
301 * null if none)
302 * @return null on failure, else the first task to be run by new thread.
303 */
304 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
305 Thread t = null;
306 Runnable next = null;
307 mainLock.lock();
308 try {
309 if (poolSize < maximumPoolSize) {
310 next = workQueue.poll();
311 if (next == null)
312 next = firstTask;
313 t = addThread(next);
314 }
315 } finally {
316 mainLock.unlock();
317 }
318 if (t == null)
319 return null;
320 t.start();
321 return next;
322 }
323
324
325 /**
326 * Get the next task for a worker thread to run.
327 * @return the task
328 * @throws InterruptedException if interrupted while waiting for task
329 */
330 private Runnable getTask() throws InterruptedException {
331 for (;;) {
332 switch(runState) {
333 case RUNNING: {
334 if (poolSize <= corePoolSize) // untimed wait if core
335 return workQueue.take();
336
337 long timeout = keepAliveTime;
338 if (timeout <= 0) // die immediately for 0 timeout
339 return null;
340 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
341 if (r != null)
342 return r;
343 if (poolSize > corePoolSize) // timed out
344 return null;
345 // else, after timeout, pool shrank so shouldn't die, so retry
346 break;
347 }
348
349 case SHUTDOWN: {
350 // Help drain queue
351 Runnable r = workQueue.poll();
352 if (r != null)
353 return r;
354
355 // Check if can terminate
356 if (workQueue.isEmpty()) {
357 interruptIdleWorkers();
358 return null;
359 }
360
361 // There could still be delayed tasks in queue.
362 // Wait for one, re-checking state upon interruption
363 try {
364 return workQueue.take();
365 }
366 catch(InterruptedException ignore) {
367 }
368 break;
369 }
370
371 case STOP:
372 return null;
373 default:
374 assert false;
375 }
376 }
377 }
378
379 /**
380 * Wake up all threads that might be waiting for tasks.
381 */
382 void interruptIdleWorkers() {
383 mainLock.lock();
384 try {
385 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
386 it.next().interruptIfIdle();
387 } finally {
388 mainLock.unlock();
389 }
390 }
391
392 /**
393 * Perform bookkeeping for a terminated worker thread.
394 * @param w the worker
395 */
396 private void workerDone(Worker w) {
397 mainLock.lock();
398 try {
399 completedTaskCount += w.completedTasks;
400 workers.remove(w);
401 if (--poolSize > 0)
402 return;
403
404 // Else, this is the last thread. Deal with potential shutdown.
405
406 int state = runState;
407 assert state != TERMINATED;
408
409 if (state != STOP) {
410 // If there are queued tasks but no threads, create
411 // replacement.
412 Runnable r = workQueue.poll();
413 if (r != null) {
414 addThread(r).start();
415 return;
416 }
417
418 // If there are some (presumably delayed) tasks but
419 // none pollable, create an idle replacement to wait.
420 if (!workQueue.isEmpty()) {
421 addThread(null).start();
422 return;
423 }
424
425 // Otherwise, we can exit without replacement
426 if (state == RUNNING)
427 return;
428 }
429
430 // Either state is STOP, or state is SHUTDOWN and there is
431 // no work to do. So we can terminate.
432 runState = TERMINATED;
433 termination.signalAll();
434 // fall through to call terminate() outside of lock.
435 } finally {
436 mainLock.unlock();
437 }
438
439 assert runState == TERMINATED;
440 terminated();
441 }
442
443 /**
444 * Worker threads
445 */
446 private class Worker implements Runnable {
447
448 /**
449 * The runLock is acquired and released surrounding each task
450 * execution. It mainly protects against interrupts that are
451 * intended to cancel the worker thread from instead
452 * interrupting the task being run.
453 */
454 private final ReentrantLock runLock = new ReentrantLock();
455
456 /**
457 * Initial task to run before entering run loop
458 */
459 private Runnable firstTask;
460
461 /**
462 * Per thread completed task counter; accumulated
463 * into completedTaskCount upon termination.
464 */
465 volatile long completedTasks;
466
467 /**
468 * Thread this worker is running in. Acts as a final field,
469 * but cannot be set until thread is created.
470 */
471 Thread thread;
472
473 Worker(Runnable firstTask) {
474 this.firstTask = firstTask;
475 }
476
477 boolean isActive() {
478 return runLock.isLocked();
479 }
480
481 /**
482 * Interrupt thread if not running a task
483 */
484 void interruptIfIdle() {
485 if (runLock.tryLock()) {
486 try {
487 thread.interrupt();
488 } finally {
489 runLock.unlock();
490 }
491 }
492 }
493
494 /**
495 * Cause thread to die even if running a task.
496 */
497 void interruptNow() {
498 thread.interrupt();
499 }
500
501 /**
502 * Run a single task between before/after methods.
503 */
504 private void runTask(Runnable task) {
505 runLock.lock();
506 try {
507 // Abort now if immediate cancel. Otherwise, we have
508 // committed to run this task.
509 if (runState == STOP)
510 return;
511
512 Thread.interrupted(); // clear interrupt status on entry
513 boolean ran = false;
514 beforeExecute(thread, task);
515 try {
516 task.run();
517 ran = true;
518 afterExecute(task, null);
519 ++completedTasks;
520 } catch(RuntimeException ex) {
521 if (!ran)
522 afterExecute(task, ex);
523 // Else the exception occurred within
524 // afterExecute itself in which case we don't
525 // want to call it again.
526 throw ex;
527 }
528 } finally {
529 runLock.unlock();
530 }
531 }
532
533 /**
534 * Main run loop
535 */
536 public void run() {
537 try {
538 for (;;) {
539 Runnable task;
540 if (firstTask != null) {
541 task = firstTask;
542 firstTask = null;
543 } else {
544 task = getTask();
545 if (task == null)
546 break;
547 }
548 runTask(task);
549 task = null; // unnecessary but can help GC
550 }
551 } catch(InterruptedException ie) {
552 // fall through
553 } finally {
554 workerDone(this);
555 }
556 }
557 }
558
559 // Public methods
560
561 /**
562 * Creates a new <tt>ThreadPoolExecutor</tt> with the given
563 * initial parameters. It may be more convenient to use one of
564 * the {@link Executors} factory methods instead of this general
565 * purpose constructor.
566 *
567 * @param corePoolSize the number of threads to keep in the
568 * pool, even if they are idle.
569 * @param maximumPoolSize the maximum number of threads to allow in the
570 * pool.
571 * @param keepAliveTime when the number of threads is greater than
572 * the core, this is the maximum time that excess idle threads
573 * will wait for new tasks before terminating.
574 * @param unit the time unit for the keepAliveTime
575 * argument.
576 * @param workQueue the queue to use for holding tasks before the
577 * are executed. This queue will hold only the <tt>Runnable</tt>
578 * tasks submitted by the <tt>execute</tt> method.
579 * @throws IllegalArgumentException if corePoolSize, or
580 * keepAliveTime less than zero, or if maximumPoolSize less than or
581 * equal to zero, or if corePoolSize greater than maximumPoolSize.
582 * @throws NullPointerException if <tt>workQueue</tt> is null
583 */
584 public ThreadPoolExecutor(int corePoolSize,
585 int maximumPoolSize,
586 long keepAliveTime,
587 TimeUnit unit,
588 BlockingQueue<Runnable> workQueue) {
589 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
590 defaultThreadFactory, defaultHandler);
591 }
592
593 /**
594 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
595 * parameters.
596 *
597 * @param corePoolSize the number of threads to keep in the
598 * pool, even if they are idle.
599 * @param maximumPoolSize the maximum number of threads to allow in the
600 * pool.
601 * @param keepAliveTime when the number of threads is greater than
602 * the core, this is the maximum time that excess idle threads
603 * will wait for new tasks before terminating.
604 * @param unit the time unit for the keepAliveTime
605 * argument.
606 * @param workQueue the queue to use for holding tasks before the
607 * are executed. This queue will hold only the <tt>Runnable</tt>
608 * tasks submitted by the <tt>execute</tt> method.
609 * @param threadFactory the factory to use when the executor
610 * creates a new thread.
611 * @throws IllegalArgumentException if corePoolSize, or
612 * keepAliveTime less than zero, or if maximumPoolSize less than or
613 * equal to zero, or if corePoolSize greater than maximumPoolSize.
614 * @throws NullPointerException if <tt>workQueue</tt>
615 * or <tt>threadFactory</tt> are null.
616 */
617 public ThreadPoolExecutor(int corePoolSize,
618 int maximumPoolSize,
619 long keepAliveTime,
620 TimeUnit unit,
621 BlockingQueue<Runnable> workQueue,
622 ThreadFactory threadFactory) {
623
624 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
625 threadFactory, defaultHandler);
626 }
627
628 /**
629 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
630 * parameters.
631 *
632 * @param corePoolSize the number of threads to keep in the
633 * pool, even if they are idle.
634 * @param maximumPoolSize the maximum number of threads to allow in the
635 * pool.
636 * @param keepAliveTime when the number of threads is greater than
637 * the core, this is the maximum time that excess idle threads
638 * will wait for new tasks before terminating.
639 * @param unit the time unit for the keepAliveTime
640 * argument.
641 * @param workQueue the queue to use for holding tasks before the
642 * are executed. This queue will hold only the <tt>Runnable</tt>
643 * tasks submitted by the <tt>execute</tt> method.
644 * @param handler the handler to use when execution is blocked
645 * because the thread bounds and queue capacities are reached.
646 * @throws IllegalArgumentException if corePoolSize, or
647 * keepAliveTime less than zero, or if maximumPoolSize less than or
648 * equal to zero, or if corePoolSize greater than maximumPoolSize.
649 * @throws NullPointerException if <tt>workQueue</tt>
650 * or <tt>handler</tt> are null.
651 */
652 public ThreadPoolExecutor(int corePoolSize,
653 int maximumPoolSize,
654 long keepAliveTime,
655 TimeUnit unit,
656 BlockingQueue<Runnable> workQueue,
657 RejectedExecutionHandler handler) {
658 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
659 defaultThreadFactory, handler);
660 }
661
662 /**
663 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
664 * parameters.
665 *
666 * @param corePoolSize the number of threads to keep in the
667 * pool, even if they are idle.
668 * @param maximumPoolSize the maximum number of threads to allow in the
669 * pool.
670 * @param keepAliveTime when the number of threads is greater than
671 * the core, this is the maximum time that excess idle threads
672 * will wait for new tasks before terminating.
673 * @param unit the time unit for the keepAliveTime
674 * argument.
675 * @param workQueue the queue to use for holding tasks before the
676 * are executed. This queue will hold only the <tt>Runnable</tt>
677 * tasks submitted by the <tt>execute</tt> method.
678 * @param threadFactory the factory to use when the executor
679 * creates a new thread.
680 * @param handler the handler to use when execution is blocked
681 * because the thread bounds and queue capacities are reached.
682 * @throws IllegalArgumentException if corePoolSize, or
683 * keepAliveTime less than zero, or if maximumPoolSize less than or
684 * equal to zero, or if corePoolSize greater than maximumPoolSize.
685 * @throws NullPointerException if <tt>workQueue</tt>
686 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
687 */
688 public ThreadPoolExecutor(int corePoolSize,
689 int maximumPoolSize,
690 long keepAliveTime,
691 TimeUnit unit,
692 BlockingQueue<Runnable> workQueue,
693 ThreadFactory threadFactory,
694 RejectedExecutionHandler handler) {
695 if (corePoolSize < 0 ||
696 maximumPoolSize <= 0 ||
697 maximumPoolSize < corePoolSize ||
698 keepAliveTime < 0)
699 throw new IllegalArgumentException();
700 if (workQueue == null || threadFactory == null || handler == null)
701 throw new NullPointerException();
702 this.corePoolSize = corePoolSize;
703 this.maximumPoolSize = maximumPoolSize;
704 this.workQueue = workQueue;
705 this.keepAliveTime = unit.toNanos(keepAliveTime);
706 this.threadFactory = threadFactory;
707 this.handler = handler;
708 }
709
710
711 /**
712 * Executes the given task sometime in the future. The task
713 * may execute in a new thread or in an existing pooled thread.
714 *
715 * If the task cannot be submitted for execution, either because this
716 * executor has been shutdown or because its capacity has been reached,
717 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
718 *
719 * @param command the task to execute
720 * @throws RejectedExecutionException at discretion of
721 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
722 * for execution
723 */
724 public void execute(Runnable command) {
725 for (;;) {
726 if (runState != RUNNING) {
727 reject(command);
728 return;
729 }
730 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
731 return;
732 if (workQueue.offer(command))
733 return;
734 Runnable r = addIfUnderMaximumPoolSize(command);
735 if (r == command)
736 return;
737 if (r == null) {
738 reject(command);
739 return;
740 }
741 // else retry
742 }
743 }
744
745 public void shutdown() {
746 mainLock.lock();
747 try {
748 if (runState == RUNNING) // don't override shutdownNow
749 runState = SHUTDOWN;
750 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
751 it.next().interruptIfIdle();
752 } finally {
753 mainLock.unlock();
754 }
755 }
756
757
758 public List shutdownNow() {
759 mainLock.lock();
760 try {
761 if (runState != TERMINATED)
762 runState = STOP;
763 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
764 it.next().interruptNow();
765 } finally {
766 mainLock.unlock();
767 }
768 return Arrays.asList(workQueue.toArray());
769 }
770
771 public boolean isShutdown() {
772 return runState != RUNNING;
773 }
774
775 /**
776 * Return true if this executor is in the process of terminating
777 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
778 * completely terminated. This method may be useful for
779 * debugging. A return of <tt>true</tt> reported a sufficient
780 * period after shutdown may indicate that submitted tasks have
781 * ignored or suppressed interruption, causing this executor not
782 * to properly terminate.
783 * @return true if terminating but not yet terminated.
784 */
785 public boolean isTerminating() {
786 return runState == STOP;
787 }
788
789 public boolean isTerminated() {
790 return runState == TERMINATED;
791 }
792
793 public boolean awaitTermination(long timeout, TimeUnit unit)
794 throws InterruptedException {
795 mainLock.lock();
796 try {
797 return termination.await(timeout, unit);
798 } finally {
799 mainLock.unlock();
800 }
801 }
802
803 /**
804 * Invokes <tt>shutdown</tt> when this executor is no longer
805 * referenced.
806 */
807 protected void finalize() {
808 shutdown();
809 }
810
811 /**
812 * Sets the thread factory used to create new threads.
813 *
814 * @param threadFactory the new thread factory
815 * @see #getThreadFactory
816 */
817 public void setThreadFactory(ThreadFactory threadFactory) {
818 this.threadFactory = threadFactory;
819 }
820
821 /**
822 * Returns the thread factory used to create new threads.
823 *
824 * @return the current thread factory
825 * @see #setThreadFactory
826 */
827 public ThreadFactory getThreadFactory() {
828 return threadFactory;
829 }
830
831 /**
832 * Sets a new handler for unexecutable tasks.
833 *
834 * @param handler the new handler
835 * @see #getRejectedExecutionHandler
836 */
837 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
838 this.handler = handler;
839 }
840
841 /**
842 * Returns the current handler for unexecutable tasks.
843 *
844 * @return the current handler
845 * @see #setRejectedExecutionHandler
846 */
847 public RejectedExecutionHandler getRejectedExecutionHandler() {
848 return handler;
849 }
850
851 /**
852 * Returns the task queue used by this executor. Access to the
853 * task queue is intended primarily for debugging and monitoring.
854 * This queue may be in active use. Retrieveing the task queue
855 * does not prevent queued tasks from executing.
856 *
857 * @return the task queue
858 */
859 public BlockingQueue<Runnable> getQueue() {
860 return workQueue;
861 }
862
863 /**
864 * Removes this task from internal queue if it is present, thus
865 * causing it not to be run if it has not already started. This
866 * method may be useful as one part of a cancellation scheme.
867 *
868 * @param task the task to remove
869 * @return true if the task was removed
870 */
871 public boolean remove(Runnable task) {
872 return getQueue().remove(task);
873 }
874
875
876 /**
877 * Tries to remove from the work queue all {@link Cancellable}
878 * tasks that have been cancelled. This method can be useful as a
879 * storage reclamation operation, that has no other impact on
880 * functionality. Cancelled tasks are never executed, but may
881 * accumulate in work queues until worker threads can actively
882 * remove them. Invoking this method instead tries to remove them now.
883 * However, this method may fail to remove all such tasks in
884 * the presence of interference by other threads.
885 */
886
887 public void purge() {
888 // Fail if we encounter interference during traversal
889 try {
890 Iterator<Runnable> it = getQueue().iterator();
891 while (it.hasNext()) {
892 Runnable r = it.next();
893 if (r instanceof Cancellable) {
894 Cancellable c = (Cancellable)r;
895 if (c.isCancelled())
896 it.remove();
897 }
898 }
899 }
900 catch(ConcurrentModificationException ex) {
901 return;
902 }
903 }
904
905 /**
906 * Sets the core number of threads. This overrides any value set
907 * in the constructor. If the new value is smaller than the
908 * current value, excess existing threads will be terminated when
909 * they next become idle.
910 *
911 * @param corePoolSize the new core size
912 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
913 * less than zero
914 * @see #getCorePoolSize
915 */
916 public void setCorePoolSize(int corePoolSize) {
917 if (corePoolSize < 0)
918 throw new IllegalArgumentException();
919 mainLock.lock();
920 try {
921 int extra = this.corePoolSize - corePoolSize;
922 this.corePoolSize = corePoolSize;
923 if (extra > 0 && poolSize > corePoolSize) {
924 Iterator<Worker> it = workers.iterator();
925 while (it.hasNext() &&
926 extra > 0 &&
927 poolSize > corePoolSize &&
928 workQueue.remainingCapacity() == 0) {
929 it.next().interruptIfIdle();
930 --extra;
931 }
932 }
933
934 } finally {
935 mainLock.unlock();
936 }
937 }
938
939 /**
940 * Returns the core number of threads.
941 *
942 * @return the core number of threads
943 * @see #setCorePoolSize
944 */
945 public int getCorePoolSize() {
946 return corePoolSize;
947 }
948
949 /**
950 * Start a core thread, causing it to idly wait for work. This
951 * overrides the default policy of starting core threads only when
952 * new tasks are executed. This method will return <tt>false</tt>
953 * if all core threads have already been started.
954 * @return true if a thread was started
955 */
956 public boolean prestartCoreThread() {
957 return addIfUnderCorePoolSize(null);
958 }
959
960 /**
961 * Start all core threads, causing them to idly wait for work. This
962 * overrides the default policy of starting core threads only when
963 * new tasks are executed.
964 * @return the number of threads started.
965 */
966 public int prestartAllCoreThreads() {
967 int n = 0;
968 while (addIfUnderCorePoolSize(null))
969 ++n;
970 return n;
971 }
972
973 /**
974 * Sets the maximum allowed number of threads. This overrides any
975 * value set in the constructor. If the new value is smaller than
976 * the current value, excess existing threads will be
977 * terminated when they next become idle.
978 *
979 * @param maximumPoolSize the new maximum
980 * @throws IllegalArgumentException if maximumPoolSize less than zero or
981 * the {@link #getCorePoolSize core pool size}
982 * @see #getMaximumPoolSize
983 */
984 public void setMaximumPoolSize(int maximumPoolSize) {
985 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
986 throw new IllegalArgumentException();
987 mainLock.lock();
988 try {
989 int extra = this.maximumPoolSize - maximumPoolSize;
990 this.maximumPoolSize = maximumPoolSize;
991 if (extra > 0 && poolSize > maximumPoolSize) {
992 Iterator<Worker> it = workers.iterator();
993 while (it.hasNext() &&
994 extra > 0 &&
995 poolSize > maximumPoolSize) {
996 it.next().interruptIfIdle();
997 --extra;
998 }
999 }
1000 } finally {
1001 mainLock.unlock();
1002 }
1003 }
1004
1005 /**
1006 * Returns the maximum allowed number of threads.
1007 *
1008 * @return the maximum allowed number of threads
1009 * @see #setMaximumPoolSize
1010 */
1011 public int getMaximumPoolSize() {
1012 return maximumPoolSize;
1013 }
1014
1015 /**
1016 * Sets the time limit for which threads may remain idle before
1017 * being terminated. If there are more than the core number of
1018 * threads currently in the pool, after waiting this amount of
1019 * time without processing a task, excess threads will be
1020 * terminated. This overrides any value set in the constructor.
1021 * @param time the time to wait. A time value of zero will cause
1022 * excess threads to terminate immediately after executing tasks.
1023 * @param unit the time unit of the time argument
1024 * @throws IllegalArgumentException if time less than zero
1025 * @see #getKeepAliveTime
1026 */
1027 public void setKeepAliveTime(long time, TimeUnit unit) {
1028 if (time < 0)
1029 throw new IllegalArgumentException();
1030 this.keepAliveTime = unit.toNanos(time);
1031 }
1032
1033 /**
1034 * Returns the thread keep-alive time, which is the amount of time
1035 * which threads in excess of the core pool size may remain
1036 * idle before being terminated.
1037 *
1038 * @param unit the desired time unit of the result
1039 * @return the time limit
1040 * @see #setKeepAliveTime
1041 */
1042 public long getKeepAliveTime(TimeUnit unit) {
1043 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1044 }
1045
1046 /* Statistics */
1047
1048 /**
1049 * Returns the current number of threads in the pool.
1050 *
1051 * @return the number of threads
1052 */
1053 public int getPoolSize() {
1054 return poolSize;
1055 }
1056
1057 /**
1058 * Returns the approximate number of threads that are actively
1059 * executing tasks.
1060 *
1061 * @return the number of threads
1062 */
1063 public int getActiveCount() {
1064 mainLock.lock();
1065 try {
1066 int n = 0;
1067 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1068 if (it.next().isActive())
1069 ++n;
1070 }
1071 return n;
1072 } finally {
1073 mainLock.unlock();
1074 }
1075 }
1076
1077 /**
1078 * Returns the largest number of threads that have ever
1079 * simultaneously been in the pool.
1080 *
1081 * @return the number of threads
1082 */
1083 public int getLargestPoolSize() {
1084 mainLock.lock();
1085 try {
1086 return largestPoolSize;
1087 } finally {
1088 mainLock.unlock();
1089 }
1090 }
1091
1092 /**
1093 * Returns the approximate total number of tasks that have been
1094 * scheduled for execution. Because the states of tasks and
1095 * threads may change dynamically during computation, the returned
1096 * value is only an approximation, but one that does not ever
1097 * decrease across successive calls.
1098 *
1099 * @return the number of tasks
1100 */
1101 public long getTaskCount() {
1102 mainLock.lock();
1103 try {
1104 long n = completedTaskCount;
1105 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1106 Worker w = it.next();
1107 n += w.completedTasks;
1108 if (w.isActive())
1109 ++n;
1110 }
1111 return n + workQueue.size();
1112 } finally {
1113 mainLock.unlock();
1114 }
1115 }
1116
1117 /**
1118 * Returns the approximate total number of tasks that have
1119 * completed execution. Because the states of tasks and threads
1120 * may change dynamically during computation, the returned value
1121 * is only an approximation, but one that does not ever decrease
1122 * across successive calls.
1123 *
1124 * @return the number of tasks
1125 */
1126 public long getCompletedTaskCount() {
1127 mainLock.lock();
1128 try {
1129 long n = completedTaskCount;
1130 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1131 n += it.next().completedTasks;
1132 return n;
1133 } finally {
1134 mainLock.unlock();
1135 }
1136 }
1137
1138 /**
1139 * Method invoked prior to executing the given Runnable in the
1140 * given thread. This method may be used to re-initialize
1141 * ThreadLocals, or to perform logging. Note: To properly nest
1142 * multiple overridings, subclasses should generally invoke
1143 * <tt>super.beforeExecute</tt> at the end of this method.
1144 *
1145 * @param t the thread that will run task r.
1146 * @param r the task that will be executed.
1147 */
1148 protected void beforeExecute(Thread t, Runnable r) { }
1149
1150 /**
1151 * Method invoked upon completion of execution of the given
1152 * Runnable. If non-null, the Throwable is the uncaught exception
1153 * that caused execution to terminate abruptly. Note: To properly
1154 * nest multiple overridings, subclasses should generally invoke
1155 * <tt>super.afterExecute</tt> at the beginning of this method.
1156 *
1157 * @param r the runnable that has completed.
1158 * @param t the exception that cause termination, or null if
1159 * execution completed normally.
1160 */
1161 protected void afterExecute(Runnable r, Throwable t) { }
1162
1163 /**
1164 * Method invoked when the Executor has terminated. Default
1165 * implementation does nothing. Note: To properly nest multiple
1166 * overridings, subclasses should generally invoke
1167 * <tt>super.terminated</tt> within this method.
1168 */
1169 protected void terminated() { }
1170
1171 /**
1172 * A handler for unexecutable tasks that runs these tasks directly
1173 * in the calling thread of the <tt>execute</tt> method. This is
1174 * the default <tt>RejectedExecutionHandler</tt>.
1175 */
1176 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1177
1178 /**
1179 * Constructs a <tt>CallerRunsPolicy</tt>.
1180 */
1181 public CallerRunsPolicy() { }
1182
1183 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1184 if (!e.isShutdown()) {
1185 r.run();
1186 }
1187 }
1188 }
1189
1190 /**
1191 * A handler for unexecutable tasks that throws a
1192 * <tt>RejectedExecutionException</tt>.
1193 */
1194 public static class AbortPolicy implements RejectedExecutionHandler {
1195
1196 /**
1197 * Constructs a <tt>AbortPolicy</tt>.
1198 */
1199 public AbortPolicy() { }
1200
1201 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1202 throw new RejectedExecutionException();
1203 }
1204 }
1205
1206 /**
1207 * A handler for unexecutable tasks that waits until the task can be
1208 * submitted for execution.
1209 */
1210 public static class WaitPolicy implements RejectedExecutionHandler {
1211 /**
1212 * Constructs a <tt>WaitPolicy</tt>.
1213 */
1214 public WaitPolicy() { }
1215
1216 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1217 if (!e.isShutdown()) {
1218 try {
1219 e.getQueue().put(r);
1220 } catch (InterruptedException ie) {
1221 Thread.currentThread().interrupt();
1222 throw new RejectedExecutionException(ie);
1223 }
1224 }
1225 }
1226 }
1227
1228 /**
1229 * A handler for unexecutable tasks that silently discards these tasks.
1230 */
1231 public static class DiscardPolicy implements RejectedExecutionHandler {
1232
1233 /**
1234 * Constructs <tt>DiscardPolicy</tt>.
1235 */
1236 public DiscardPolicy() { }
1237
1238 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1239 }
1240 }
1241
1242 /**
1243 * A handler for unexecutable tasks that discards the oldest
1244 * unhandled request.
1245 */
1246 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1247 /**
1248 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1249 */
1250 public DiscardOldestPolicy() { }
1251
1252 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1253 if (!e.isShutdown()) {
1254 e.getQueue().poll();
1255 e.execute(r);
1256 }
1257 }
1258 }
1259 }