ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
Revision: 1.16
Committed: Thu Aug 14 15:34:04 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.15: +162 -69 lines
Log Message:
New shutdown policies for Scheduled executor; refactored to avoid more messiness

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An {@link ExecutorService} that executes each submitted task on one
13 * of several pooled threads.
14 *
15 * <p>Thread pools address two different problems at the same time:
16 * they usually provide improved performance when executing large
17 * numbers of asynchronous tasks, due to reduced per-task invocation
18 * overhead, and they provide a means of bounding and managing the
19 * resources, including threads, consumed in executing a collection of
20 * tasks.
21 *
22 * <p>This class is very configurable and can be configured to create
23 * a new thread for each task, or even to execute tasks sequentially
24 * in a single thread, in addition to its most common configuration,
25 * which reuses a pool of threads.
26 *
27 * <p>To be useful across a wide range of contexts, this class
28 * provides many adjustable parameters and extensibility hooks.
29 * However, programmers are urged to use the more convenient factory
30 * methods <tt>newCachedThreadPool</tt> (unbounded thread pool, with
31 * automatic thread reclamation), <tt>newFixedThreadPool</tt> (fixed
32 * size thread pool), <tt>newSingleThreadPoolExecutor</tt> (single
33 * background thread for execution of tasks), and
34 * <tt>newThreadPerTaskExeceutor</tt> (execute each task in a new
35 * thread), that preconfigure settings for the most common usage
36 * scenarios.
37 *
38 * <p>This class also maintain some basic statistics, such as the
39 * number of completed tasks, that may be useful for monitoring and
40 * tuning executors.
41 *
42 * <h3>Tuning guide</h3>
43 * <dl>
44 *
45 * <dt>Core and maximum pool size</dt>
46 *
47 * <dd>A ThreadPoolExecutor will automatically adjust the pool size
48 * according to the bounds set by corePoolSize and maximumPoolSize.
49 * When a new task is submitted, and fewer than corePoolSize threads
50 * are running, a new thread is created to handle the request, even if
51 * other worker threads are idle. If there are more than the
52 * corePoolSize but less than maximumPoolSize threads running, a new
53 * thread will be created only if the queue is full. By setting
54 * corePoolSize and maximumPoolSize the same, you create a fixed-size
55 * thread pool.</dd>
56 *
57 * <dt>Keep-alive</dt>
58 *
59 * <dd>The keepAliveTime determines what happens to idle threads. If
60 * the pool currently has more than the core number of threads, excess
61 * threads will be terminated if they have been idle for more than the
62 * keepAliveTime.</dd>
63 *
64 * <dt>Queueing</dt>
65 *
66 * <dd>You are free to specify the queuing mechanism used to handle
67 * submitted tasks. A good default is to use queueless synchronous
68 * channels to to hand off work to threads. This is a safe,
69 * conservative policy that avoids lockups when handling sets of
70 * requests that might have internal dependencies. Using an unbounded
71 * queue (for example a LinkedBlockingQueue) which will cause new
72 * tasks to be queued in cases where all corePoolSize threads are
73 * busy, so no more that corePoolSize threads will be craated. This
74 * may be appropriate when each task is completely independent of
75 * others, so tasks cannot affect each others execution. For example,
76 * in an http server. When given a choice, this pool always prefers
77 * adding a new thread rather than queueing if there are currently
78 * fewer than the current getCorePoolSize threads running, but
79 * otherwise always prefers queuing a request rather than adding a new
80 * thread.
81 *
82 * <p>While queuing can be useful in smoothing out transient bursts of
83 * requests, especially in socket-based services, it is not very well
84 * behaved when commands continue to arrive on average faster than
85 * they can be processed.
86 *
87 * Queue sizes and maximum pool sizes can often be traded off for each
88 * other. Using large queues and small pools minimizes CPU usage, OS
89 * resources, and context-switching overhead, but can lead to
90 * artifically low throughput. If tasks frequently block (for example
91 * if they are I/O bound), a JVM and underlying OS may be able to
92 * schedule time for more threads than you otherwise allow. Use of
93 * small queues or queueless handoffs generally requires larger pool
94 * sizes, which keeps CPUs busier but may encounter unacceptable
95 * scheduling overhead, which also decreases throughput.
96 * </dd>
97 *
98 * <dt>Creating new threads</dt>
99 *
100 * <dd>New threads are created using a ThreadFactory. By default,
101 * threads are created simply with the new Thread(Runnable)
102 * constructor, but by supplying a different ThreadFactory, you can
103 * alter the thread's name, thread group, priority, daemon status,
104 * etc. </dd>
105 *
106 * <dt>Before and after intercepts</dt>
107 *
108 * <dd>This class has overridable methods that which are called before
109 * and after execution of each task. These can be used to manipulate
110 * the execution environment (for example, reinitializing
111 * ThreadLocals), gather statistics, or perform logging. </dd>
112 *
113 * <dt>Blocked execution</dt>
114 *
115 * <dd>There are a number of factors which can bound the number of
116 * tasks which can execute at once, including the maximum pool size
117 * and the queuing mechanism used. If the executor determines that a
118 * task cannot be executed because it has been refused by the queue
119 * and no threads are available, or because the executor has been shut
120 * down, the RejectedExecutionHandler's rejectedExecution method is
121 * invoked. </dd>
122 *
123 * <dt>Termination</dt>
124 *
125 * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
126 * graceful. In an immediate shutdown, any threads currently
127 * executing are interrupted, and any tasks not yet begun are returned
128 * from the shutdownNow call. In a graceful shutdown, all queued
129 * tasks are allowed to run, but new tasks may not be submitted.
130 * </dd>
131 *
132 * </dl>
133 *
134 * @since 1.5
135 * @see RejectedExecutionHandler
136 * @see Executors
137 * @see ThreadFactory
138 *
139 * @spec JSR-166
140 * @revised $Date: 2003/08/09 19:55:30 $
141 * @editor $Author: dl $
142 * @author Doug Lea
143 */
144 public class ThreadPoolExecutor implements ExecutorService {
145 /**
146 * Queue used for holding tasks and handing off to worker threads.
147 */
148 private final BlockingQueue<Runnable> workQueue;
149
150 /**
151 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
152 * workers set.
153 */
154 private final ReentrantLock mainLock = new ReentrantLock();
155
156 /**
157 * Wait condition to support awaitTermination
158 */
159 private final Condition termination = mainLock.newCondition();
160
161 /**
162 * Set containing all worker threads in pool.
163 */
164 private final Set<Worker> workers = new HashSet<Worker>();
165
166 /**
167 * Timeout in nanosecods for idle threads waiting for work.
168 * Threads use this timeout only when there are more than
169 * corePoolSize present. Otherwise they wait forever for new work.
170 */
171 private volatile long keepAliveTime;
172
173 /**
174 * Core pool size, updated only while holding mainLock,
175 * but volatile to allow concurrent readability even
176 * during updates.
177 */
178 private volatile int corePoolSize;
179
180 /**
181 * Maximum pool size, updated only while holding mainLock
182 * but volatile to allow concurrent readability even
183 * during updates.
184 */
185 private volatile int maximumPoolSize;
186
187 /**
188 * Current pool size, updated only while holding mainLock
189 * but volatile to allow concurrent readability even
190 * during updates.
191 */
192 private volatile int poolSize;
193
194 /**
195 * Lifecycle state
196 */
197 private volatile int runState;
198
199 // Special values for runState
200 /** Normal, not-shutdown mode */
201 private static final int RUNNING = 0;
202 /** Controlled shutdown mode */
203 private static final int SHUTDOWN = 1;
204 /** Immediate shutdown mode */
205 private static final int STOP = 2;
206 /** Final state */
207 private static final int TERMINATED = 3;
208
209 /**
210 * Handler called when saturated or shutdown in execute.
211 */
212 private volatile RejectedExecutionHandler handler = defaultHandler;
213
214 /**
215 * Factory for new threads.
216 */
217 private volatile ThreadFactory threadFactory = defaultThreadFactory;
218
219 /**
220 * Tracks largest attained pool size.
221 */
222 private int largestPoolSize;
223
224 /**
225 * Counter for completed tasks. Updated only on termination of
226 * worker threads.
227 */
228 private long completedTaskCount;
229
230 /**
231 * The default thread factory
232 */
233 private static final ThreadFactory defaultThreadFactory =
234 new ThreadFactory() {
235 public Thread newThread(Runnable r) {
236 return new Thread(r);
237 }
238 };
239
240 /**
241 * The default rejectect execution handler
242 */
243 private static final RejectedExecutionHandler defaultHandler =
244 new AbortPolicy();
245
246 /**
247 * Invoke the rejected execution handler for the give command.
248 */
249 void reject(Runnable command) {
250 handler.rejectedExecution(command, this);
251 }
252
253 /**
254 * Create and return a new thread running firstTask as its first
255 * task. Call only while holding mainLock
256 * @param firstTask the task the new thread should run first (or
257 * null if none)
258 * @return the new thread
259 */
260 private Thread addThread(Runnable firstTask) {
261 Worker w = new Worker(firstTask);
262 Thread t = threadFactory.newThread(w);
263 w.thread = t;
264 workers.add(w);
265 int nt = ++poolSize;
266 if (nt > largestPoolSize)
267 largestPoolSize = nt;
268 return t;
269 }
270
271
272
273 /**
274 * Create and start a new thread running firstTask as its first
275 * task, only if less than corePoolSize threads are running.
276 * @param firstTask the task the new thread should run first (or
277 * null if none)
278 * @return true if successful.
279 */
280 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
281 Thread t = null;
282 mainLock.lock();
283 try {
284 if (poolSize < corePoolSize)
285 t = addThread(firstTask);
286 } finally {
287 mainLock.unlock();
288 }
289 if (t == null)
290 return false;
291 t.start();
292 return true;
293 }
294
295 /**
296 * Create and start a new thread only if less than maximumPoolSize
297 * threads are running. The new thread runs as its first task the
298 * next task in queue, or if there is none, the given task.
299 * @param firstTask the task the new thread should run first (or
300 * null if none)
301 * @return null on failure, else the first task to be run by new thread.
302 */
303 private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
304 Thread t = null;
305 Runnable next = null;
306 mainLock.lock();
307 try {
308 if (poolSize < maximumPoolSize) {
309 next = workQueue.poll();
310 if (next == null)
311 next = firstTask;
312 t = addThread(next);
313 }
314 } finally {
315 mainLock.unlock();
316 }
317 if (t == null)
318 return null;
319 t.start();
320 return next;
321 }
322
323
324 /**
325 * Get the next task for a worker thread to run.
326 * @return the task
327 * @throws InterruptedException if interrupted while waiting for task
328 */
329 private Runnable getTask() throws InterruptedException {
330 for (;;) {
331 switch(runState) {
332 case RUNNING: {
333 if (poolSize <= corePoolSize) // untimed wait if core
334 return workQueue.take();
335
336 long timeout = keepAliveTime;
337 if (timeout <= 0) // die immediately for 0 timeout
338 return null;
339 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
340 if (r != null)
341 return r;
342 if (poolSize > corePoolSize) // timed out
343 return null;
344 // else, after timeout, pool shrank so shouldn't die, so retry
345 break;
346 }
347
348 case SHUTDOWN: {
349 // Help drain queue
350 Runnable r = workQueue.poll();
351 if (r != null)
352 return r;
353
354 // Check if can terminate
355 if (workQueue.isEmpty()) {
356 interruptIdleWorkers();
357 return null;
358 }
359
360 // There could still be delayed tasks in queue.
361 // Wait for one, re-checking state upon interruption
362 try {
363 return workQueue.take();
364 }
365 catch(InterruptedException ignore) {
366 }
367 break;
368 }
369
370 case STOP:
371 return null;
372 default:
373 assert false;
374 }
375 }
376 }
377
378 /**
379 * Wake up all threads that might be waiting for tasks.
380 */
381 void interruptIdleWorkers() {
382 mainLock.lock();
383 try {
384 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
385 it.next().interruptIfIdle();
386 } finally {
387 mainLock.unlock();
388 }
389 }
390
391 /**
392 * Perform bookkeeping for a terminated worker thread.
393 * @param w the worker
394 */
395 private void workerDone(Worker w) {
396 mainLock.lock();
397 try {
398 completedTaskCount += w.completedTasks;
399 workers.remove(w);
400 if (--poolSize > 0)
401 return;
402
403 // Else, this is the last thread. Deal with potential shutdown.
404
405 int state = runState;
406 assert state != TERMINATED;
407
408 if (state != STOP) {
409 // If there are queued tasks but no threads, create
410 // replacement.
411 Runnable r = workQueue.poll();
412 if (r != null) {
413 addThread(r).start();
414 return;
415 }
416
417 // If there are some (presumably delayed) tasks but
418 // none pollable, create an idle replacement to wait.
419 if (!workQueue.isEmpty()) {
420 addThread(null).start();
421 return;
422 }
423
424 // Otherwise, we can exit without replacement
425 if (state == RUNNING)
426 return;
427 }
428
429 // Either state is STOP, or state is SHUTDOWN and there is
430 // no work to do. So we can terminate.
431 runState = TERMINATED;
432 termination.signalAll();
433 // fall through to call terminate() outside of lock.
434 } finally {
435 mainLock.unlock();
436 }
437
438 assert runState == TERMINATED;
439 terminated();
440 }
441
442 /**
443 * Worker threads
444 */
445 private class Worker implements Runnable {
446
447 /**
448 * The runLock is acquired and released surrounding each task
449 * execution. It mainly protects against interrupts that are
450 * intended to cancel the worker thread from instead
451 * interrupting the task being run.
452 */
453 private final ReentrantLock runLock = new ReentrantLock();
454
455 /**
456 * Initial task to run before entering run loop
457 */
458 private Runnable firstTask;
459
460 /**
461 * Per thread completed task counter; accumulated
462 * into completedTaskCount upon termination.
463 */
464 volatile long completedTasks;
465
466 /**
467 * Thread this worker is running in. Acts as a final field,
468 * but cannot be set until thread is created.
469 */
470 Thread thread;
471
472 Worker(Runnable firstTask) {
473 this.firstTask = firstTask;
474 }
475
476 boolean isActive() {
477 return runLock.isLocked();
478 }
479
480 /**
481 * Interrupt thread if not running a task
482 */
483 void interruptIfIdle() {
484 if (runLock.tryLock()) {
485 try {
486 thread.interrupt();
487 } finally {
488 runLock.unlock();
489 }
490 }
491 }
492
493 /**
494 * Cause thread to die even if running a task.
495 */
496 void interruptNow() {
497 thread.interrupt();
498 }
499
500 /**
501 * Run a single task between before/after methods.
502 */
503 private void runTask(Runnable task) {
504 runLock.lock();
505 try {
506 // Abort now if immediate cancel. Otherwise, we have
507 // committed to run this task.
508 if (runState == STOP)
509 return;
510
511 Thread.interrupted(); // clear interrupt status on entry
512 boolean ran = false;
513 beforeExecute(thread, task);
514 try {
515 task.run();
516 ran = true;
517 afterExecute(task, null);
518 ++completedTasks;
519 } catch(RuntimeException ex) {
520 if (!ran)
521 afterExecute(task, ex);
522 // else the exception occurred within
523 // afterExecute itself in which case we don't
524 // want to call it again.
525 throw ex;
526 }
527 } finally {
528 runLock.unlock();
529 }
530 }
531
532 /**
533 * Main run loop
534 */
535 public void run() {
536 try {
537 for (;;) {
538 Runnable task;
539 if (firstTask != null) {
540 task = firstTask;
541 firstTask = null;
542 } else {
543 task = getTask();
544 if (task == null)
545 break;
546 }
547 runTask(task);
548 task = null; // unnecessary but can help GC
549 }
550 } catch(InterruptedException ie) {
551 // fall through
552 } finally {
553 workerDone(this);
554 }
555 }
556 }
557
558 /**
559 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
560 * parameters. It may be more convenient to use one of the factory
561 * methods instead of this general purpose constructor.
562 *
563 * @param corePoolSize the number of threads to keep in the
564 * pool, even if they are idle.
565 * @param maximumPoolSize the maximum number of threads to allow in the
566 * pool.
567 * @param keepAliveTime when the number of threads is greater than
568 * the core, this is the maximum time that excess idle threads
569 * will wait for new tasks before terminating.
570 * @param unit the time unit for the keepAliveTime
571 * argument.
572 * @param workQueue the queue to use for holding tasks before the
573 * are executed. This queue will hold only the <tt>Runnable</tt>
574 * tasks submitted by the <tt>execute</tt> method.
575 * @throws IllegalArgumentException if corePoolSize, or
576 * keepAliveTime less than zero, or if maximumPoolSize less than or
577 * equal to zero, or if corePoolSize greater than maximumPoolSize.
578 * @throws NullPointerException if <tt>workQueue</tt> is null
579 */
580 public ThreadPoolExecutor(int corePoolSize,
581 int maximumPoolSize,
582 long keepAliveTime,
583 TimeUnit unit,
584 BlockingQueue<Runnable> workQueue) {
585 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
586 defaultThreadFactory, defaultHandler);
587 }
588
589 /**
590 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
591 * parameters.
592 *
593 * @param corePoolSize the number of threads to keep in the
594 * pool, even if they are idle.
595 * @param maximumPoolSize the maximum number of threads to allow in the
596 * pool.
597 * @param keepAliveTime when the number of threads is greater than
598 * the core, this is the maximum time that excess idle threads
599 * will wait for new tasks before terminating.
600 * @param unit the time unit for the keepAliveTime
601 * argument.
602 * @param workQueue the queue to use for holding tasks before the
603 * are executed. This queue will hold only the <tt>Runnable</tt>
604 * tasks submitted by the <tt>execute</tt> method.
605 * @param threadFactory the factory to use when the executor
606 * creates a new thread.
607 * @throws IllegalArgumentException if corePoolSize, or
608 * keepAliveTime less than zero, or if maximumPoolSize less than or
609 * equal to zero, or if corePoolSize greater than maximumPoolSize.
610 * @throws NullPointerException if <tt>workQueue</tt>
611 * or <tt>threadFactory</tt> are null.
612 */
613 public ThreadPoolExecutor(int corePoolSize,
614 int maximumPoolSize,
615 long keepAliveTime,
616 TimeUnit unit,
617 BlockingQueue<Runnable> workQueue,
618 ThreadFactory threadFactory) {
619
620 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
621 threadFactory, defaultHandler);
622 }
623
624 /**
625 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
626 * parameters.
627 *
628 * @param corePoolSize the number of threads to keep in the
629 * pool, even if they are idle.
630 * @param maximumPoolSize the maximum number of threads to allow in the
631 * pool.
632 * @param keepAliveTime when the number of threads is greater than
633 * the core, this is the maximum time that excess idle threads
634 * will wait for new tasks before terminating.
635 * @param unit the time unit for the keepAliveTime
636 * argument.
637 * @param workQueue the queue to use for holding tasks before the
638 * are executed. This queue will hold only the <tt>Runnable</tt>
639 * tasks submitted by the <tt>execute</tt> method.
640 * @param handler the handler to use when execution is blocked
641 * because the thread bounds and queue capacities are reached.
642 * @throws IllegalArgumentException if corePoolSize, or
643 * keepAliveTime less than zero, or if maximumPoolSize less than or
644 * equal to zero, or if corePoolSize greater than maximumPoolSize.
645 * @throws NullPointerException if <tt>workQueue</tt>
646 * or <tt>handler</tt> are null.
647 */
648 public ThreadPoolExecutor(int corePoolSize,
649 int maximumPoolSize,
650 long keepAliveTime,
651 TimeUnit unit,
652 BlockingQueue<Runnable> workQueue,
653 RejectedExecutionHandler handler) {
654 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
655 defaultThreadFactory, handler);
656 }
657
658 /**
659 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
660 * parameters.
661 *
662 * @param corePoolSize the number of threads to keep in the
663 * pool, even if they are idle.
664 * @param maximumPoolSize the maximum number of threads to allow in the
665 * pool.
666 * @param keepAliveTime when the number of threads is greater than
667 * the core, this is the maximum time that excess idle threads
668 * will wait for new tasks before terminating.
669 * @param unit the time unit for the keepAliveTime
670 * argument.
671 * @param workQueue the queue to use for holding tasks before the
672 * are executed. This queue will hold only the <tt>Runnable</tt>
673 * tasks submitted by the <tt>execute</tt> method.
674 * @param threadFactory the factory to use when the executor
675 * creates a new thread.
676 * @param handler the handler to use when execution is blocked
677 * because the thread bounds and queue capacities are reached.
678 * @throws IllegalArgumentException if corePoolSize, or
679 * keepAliveTime less than zero, or if maximumPoolSize less than or
680 * equal to zero, or if corePoolSize greater than maximumPoolSize.
681 * @throws NullPointerException if <tt>workQueue</tt>
682 * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
683 */
684 public ThreadPoolExecutor(int corePoolSize,
685 int maximumPoolSize,
686 long keepAliveTime,
687 TimeUnit unit,
688 BlockingQueue<Runnable> workQueue,
689 ThreadFactory threadFactory,
690 RejectedExecutionHandler handler) {
691 if (corePoolSize < 0 ||
692 maximumPoolSize <= 0 ||
693 maximumPoolSize < corePoolSize ||
694 keepAliveTime < 0)
695 throw new IllegalArgumentException();
696 if (workQueue == null || threadFactory == null || handler == null)
697 throw new NullPointerException();
698 this.corePoolSize = corePoolSize;
699 this.maximumPoolSize = maximumPoolSize;
700 this.workQueue = workQueue;
701 this.keepAliveTime = unit.toNanos(keepAliveTime);
702 this.threadFactory = threadFactory;
703 this.handler = handler;
704 }
705
706
707 /**
708 * Executes the given task sometime in the future. The task
709 * may execute in a new thread or in an existing pooled thread.
710 *
711 * If the task cannot be submitted for execution, either because this
712 * executor has been shutdown or because its capacity has been reached,
713 * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
714 *
715 * @param command the task to execute
716 * @throws RejectedExecutionException at discretion of
717 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
718 * for execution
719 */
720 public void execute(Runnable command) {
721 for (;;) {
722 if (runState != RUNNING) {
723 reject(command);
724 return;
725 }
726 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
727 return;
728 if (workQueue.offer(command))
729 return;
730 Runnable r = addIfUnderMaximumPoolSize(command);
731 if (r == command)
732 return;
733 if (r == null) {
734 reject(command);
735 return;
736 }
737 // else retry
738 }
739 }
740
741 public void shutdown() {
742 mainLock.lock();
743 try {
744 if (runState == RUNNING) // don't override shutdownNow
745 runState = SHUTDOWN;
746 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
747 it.next().interruptIfIdle();
748 } finally {
749 mainLock.unlock();
750 }
751 }
752
753
754 public List shutdownNow() {
755 mainLock.lock();
756 try {
757 if (runState != TERMINATED)
758 runState = STOP;
759 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
760 it.next().interruptNow();
761 } finally {
762 mainLock.unlock();
763 }
764 return Arrays.asList(workQueue.toArray());
765 }
766
767 public boolean isShutdown() {
768 return runState != RUNNING;
769 }
770
771 /**
772 * Return true if this executor is in the process of terminating
773 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
774 * completely terminated. This method may be useful for
775 * debugging. A return of <tt>true</tt> reported a sufficient
776 * period after shutdown may indicate that submitted tasks have
777 * ignored or suppressed interruption, causing this executor not
778 * to properly terminate.
779 * @return true if terminating but not yet terminated.
780 */
781 public boolean isTerminating() {
782 return runState == STOP;
783 }
784
785 public boolean isTerminated() {
786 return runState == TERMINATED;
787 }
788
789 public boolean awaitTermination(long timeout, TimeUnit unit)
790 throws InterruptedException {
791 mainLock.lock();
792 try {
793 return termination.await(timeout, unit);
794 } finally {
795 mainLock.unlock();
796 }
797 }
798
799 /**
800 * Invokes <tt>shutdown</tt> when this executor is no longer
801 * referenced.
802 */
803 protected void finalize() {
804 shutdown();
805 }
806
807 /**
808 * Sets the thread factory used to create new threads.
809 *
810 * @param threadFactory the new thread factory
811 * @see #getThreadFactory
812 */
813 public void setThreadFactory(ThreadFactory threadFactory) {
814 this.threadFactory = threadFactory;
815 }
816
817 /**
818 * Returns the thread factory used to create new threads.
819 *
820 * @return the current thread factory
821 * @see #setThreadFactory
822 */
823 public ThreadFactory getThreadFactory() {
824 return threadFactory;
825 }
826
827 /**
828 * Sets a new handler for unexecutable tasks.
829 *
830 * @param handler the new handler
831 * @see #getRejectedExecutionHandler
832 */
833 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
834 this.handler = handler;
835 }
836
837 /**
838 * Returns the current handler for unexecutable tasks.
839 *
840 * @return the current handler
841 * @see #setRejectedExecutionHandler
842 */
843 public RejectedExecutionHandler getRejectedExecutionHandler() {
844 return handler;
845 }
846
847 /**
848 * Returns the task queue used by this executor. Note that
849 * this queue may be in active use. Retrieveing the task queue
850 * does not prevent queued tasks from executing.
851 *
852 * @return the task queue
853 */
854 public BlockingQueue<Runnable> getQueue() {
855 return workQueue;
856 }
857
858 /**
859 * Removes this task from internal queue if it is present, thus
860 * causing it not to be run if it has not already started. This
861 * method may be useful as one part of a cancellation scheme.
862 *
863 * @param task the task to remove
864 * @return true if the task was removed
865 */
866 public boolean remove(Runnable task) {
867 return getQueue().remove(task);
868 }
869
870
871 /**
872 * Tries to remove from the work queue all {@link Cancellable}
873 * tasks that have been cancelled. This method can be useful as a
874 * storage reclamation operation, that has no other impact on
875 * functionality. Cancelled tasks are never executed, but may
876 * accumulate in work queues until worker threads can actively
877 * remove them. Invoking this method instead tries to remove them now.
878 * However, this method may fail to remove all such tasks in
879 * the presence of interference by other threads.
880 */
881
882 public void purge() {
883 // Fail if we encounter interference during traversal
884 try {
885 Iterator<Runnable> it = getQueue().iterator();
886 while (it.hasNext()) {
887 Runnable r = it.next();
888 if (r instanceof Cancellable) {
889 Cancellable c = (Cancellable)r;
890 if (c.isCancelled())
891 it.remove();
892 }
893 }
894 }
895 catch(ConcurrentModificationException ex) {
896 return;
897 }
898 }
899
900 /**
901 * Sets the core number of threads. This overrides any value set
902 * in the constructor. If the new value is smaller than the
903 * current value, excess existing threads will be terminated when
904 * they next become idle.
905 *
906 * @param corePoolSize the new core size
907 * @throws IllegalArgumentException if <tt>corePoolSize</tt>
908 * less than zero
909 * @see #getCorePoolSize
910 */
911 public void setCorePoolSize(int corePoolSize) {
912 if (corePoolSize < 0)
913 throw new IllegalArgumentException();
914 mainLock.lock();
915 try {
916 int extra = this.corePoolSize - corePoolSize;
917 this.corePoolSize = corePoolSize;
918 if (extra > 0 && poolSize > corePoolSize) {
919 Iterator<Worker> it = workers.iterator();
920 while (it.hasNext() &&
921 extra > 0 &&
922 poolSize > corePoolSize &&
923 workQueue.remainingCapacity() == 0) {
924 it.next().interruptIfIdle();
925 --extra;
926 }
927 }
928
929 } finally {
930 mainLock.unlock();
931 }
932 }
933
934 /**
935 * Returns the core number of threads.
936 *
937 * @return the core number of threads
938 * @see #setCorePoolSize
939 */
940 public int getCorePoolSize() {
941 return corePoolSize;
942 }
943
944 /**
945 * Start a core thread, causing it to idly wait for work. This
946 * overrides the default policy of starting core threads only when
947 * new tasks are executed. This method will return <tt>false</tt>
948 * if all core threads have already been started.
949 * @return true if a thread was started
950 */
951 public boolean prestartCoreThread() {
952 return addIfUnderCorePoolSize(null);
953 }
954
955 /**
956 * Start all core threads, causing them to idly wait for work. This
957 * overrides the default policy of starting core threads only when
958 * new tasks are executed.
959 * @return the number of threads started.
960 */
961 public int prestartAllCoreThreads() {
962 int n = 0;
963 while (addIfUnderCorePoolSize(null))
964 ++n;
965 return n;
966 }
967
968 /**
969 * Sets the maximum allowed number of threads. This overrides any
970 * value set in the constructor. If the new value is smaller than
971 * the current value, excess existing threads will be
972 * terminated when they next become idle.
973 *
974 * @param maximumPoolSize the new maximum
975 * @throws IllegalArgumentException if maximumPoolSize less than zero or
976 * the {@link #getCorePoolSize core pool size}
977 * @see #getMaximumPoolSize
978 */
979 public void setMaximumPoolSize(int maximumPoolSize) {
980 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
981 throw new IllegalArgumentException();
982 mainLock.lock();
983 try {
984 int extra = this.maximumPoolSize - maximumPoolSize;
985 this.maximumPoolSize = maximumPoolSize;
986 if (extra > 0 && poolSize > maximumPoolSize) {
987 Iterator<Worker> it = workers.iterator();
988 while (it.hasNext() &&
989 extra > 0 &&
990 poolSize > maximumPoolSize) {
991 it.next().interruptIfIdle();
992 --extra;
993 }
994 }
995 } finally {
996 mainLock.unlock();
997 }
998 }
999
1000 /**
1001 * Returns the maximum allowed number of threads.
1002 *
1003 * @return the maximum allowed number of threads
1004 * @see #setMaximumPoolSize
1005 */
1006 public int getMaximumPoolSize() {
1007 return maximumPoolSize;
1008 }
1009
1010 /**
1011 * Sets the time limit for which threads may remain idle before
1012 * being terminated. If there are more than the core number of
1013 * threads currently in the pool, after waiting this amount of
1014 * time without processing a task, excess threads will be
1015 * terminated. This overrides any value set in the constructor.
1016 * @param time the time to wait. A time value of zero will cause
1017 * excess threads to terminate immediately after executing tasks.
1018 * @param unit the time unit of the time argument
1019 * @throws IllegalArgumentException if msecs less than zero
1020 * @see #getKeepAliveTime
1021 */
1022 public void setKeepAliveTime(long time, TimeUnit unit) {
1023 if (time < 0)
1024 throw new IllegalArgumentException();
1025 this.keepAliveTime = unit.toNanos(time);
1026 }
1027
1028 /**
1029 * Returns the thread keep-alive time, which is the amount of time
1030 * which threads in excess of the core pool size may remain
1031 * idle before being terminated.
1032 *
1033 * @param unit the desired time unit of the result
1034 * @return the time limit
1035 * @see #setKeepAliveTime
1036 */
1037 public long getKeepAliveTime(TimeUnit unit) {
1038 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1039 }
1040
1041 /* Statistics */
1042
1043 /**
1044 * Returns the current number of threads in the pool.
1045 *
1046 * @return the number of threads
1047 */
1048 public int getPoolSize() {
1049 return poolSize;
1050 }
1051
1052 /**
1053 * Returns the approximate number of threads that are actively
1054 * executing tasks.
1055 *
1056 * @return the number of threads
1057 */
1058 public int getActiveCount() {
1059 mainLock.lock();
1060 try {
1061 int n = 0;
1062 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1063 if (it.next().isActive())
1064 ++n;
1065 }
1066 return n;
1067 } finally {
1068 mainLock.unlock();
1069 }
1070 }
1071
1072 /**
1073 * Returns the largest number of threads that have ever
1074 * simultaneously been in the pool.
1075 *
1076 * @return the number of threads
1077 */
1078 public int getLargestPoolSize() {
1079 mainLock.lock();
1080 try {
1081 return largestPoolSize;
1082 } finally {
1083 mainLock.unlock();
1084 }
1085 }
1086
1087 /**
1088 * Returns the approximate total number of tasks that have been
1089 * scheduled for execution. Because the states of tasks and
1090 * threads may change dynamically during computation, the returned
1091 * value is only an approximation.
1092 *
1093 * @return the number of tasks
1094 */
1095 public long getTaskCount() {
1096 mainLock.lock();
1097 try {
1098 long n = completedTaskCount;
1099 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
1100 Worker w = it.next();
1101 n += w.completedTasks;
1102 if (w.isActive())
1103 ++n;
1104 }
1105 return n + workQueue.size();
1106 } finally {
1107 mainLock.unlock();
1108 }
1109 }
1110
1111 /**
1112 * Returns the approximate total number of tasks that have
1113 * completed execution. Because the states of tasks and threads
1114 * may change dynamically during computation, the returned value
1115 * is only an approximation.
1116 *
1117 * @return the number of tasks
1118 */
1119 public long getCompletedTaskCount() {
1120 mainLock.lock();
1121 try {
1122 long n = completedTaskCount;
1123 for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1124 n += it.next().completedTasks;
1125 return n;
1126 } finally {
1127 mainLock.unlock();
1128 }
1129 }
1130
1131 /**
1132 * Method invoked prior to executing the given Runnable in given
1133 * thread. This method may be used to re-initialize ThreadLocals,
1134 * or to perform logging. Note: To properly nest multiple
1135 * overridings, subclasses should generally invoke
1136 * <tt>super.beforeExecute</tt> at the end of this method.
1137 *
1138 * @param t the thread that will run task r.
1139 * @param r the task that will be executed.
1140 */
1141 protected void beforeExecute(Thread t, Runnable r) { }
1142
1143 /**
1144 * Method invoked upon completion of execution of the given
1145 * Runnable. If non-null, the Throwable is the uncaught exception
1146 * that caused execution to terminate abruptly. Note: To properly
1147 * nest multiple overridings, subclasses should generally invoke
1148 * <tt>super.afterExecute</tt> at the beginning of this method.
1149 *
1150 * @param r the runnable that has completed.
1151 * @param t the exception that cause termination, or null if
1152 * execution completed normally.
1153 */
1154 protected void afterExecute(Runnable r, Throwable t) { }
1155
1156 /**
1157 * Method invoked when the Executor has terminated. Default
1158 * implementation does nothing.
1159 */
1160 protected void terminated() { }
1161
1162 /**
1163 * A handler for unexecutable tasks that runs these tasks directly in the
1164 * calling thread of the <tt>execute</tt> method. This is the default
1165 * <tt>RejectedExecutionHandler</tt>.
1166 */
1167 public static class CallerRunsPolicy implements RejectedExecutionHandler {
1168
1169 /**
1170 * Constructs a <tt>CallerRunsPolicy</tt>.
1171 */
1172 public CallerRunsPolicy() { }
1173
1174 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1175 if (!e.isShutdown()) {
1176 r.run();
1177 }
1178 }
1179 }
1180
1181 /**
1182 * A handler for unexecutable tasks that throws a
1183 * <tt>RejectedExecutionException</tt>.
1184 */
1185 public static class AbortPolicy implements RejectedExecutionHandler {
1186
1187 /**
1188 * Constructs a <tt>AbortPolicy</tt>.
1189 */
1190 public AbortPolicy() { }
1191
1192 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1193 throw new RejectedExecutionException();
1194 }
1195 }
1196
1197 /**
1198 * A handler for unexecutable tasks that waits until the task can be
1199 * submitted for execution.
1200 */
1201 public static class WaitPolicy implements RejectedExecutionHandler {
1202 /**
1203 * Constructs a <tt>WaitPolicy</tt>.
1204 */
1205 public WaitPolicy() { }
1206
1207 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1208 if (!e.isShutdown()) {
1209 try {
1210 e.getQueue().put(r);
1211 } catch (InterruptedException ie) {
1212 Thread.currentThread().interrupt();
1213 throw new RejectedExecutionException(ie);
1214 }
1215 }
1216 }
1217 }
1218
1219 /**
1220 * A handler for unexecutable tasks that silently discards these tasks.
1221 */
1222 public static class DiscardPolicy implements RejectedExecutionHandler {
1223
1224 /**
1225 * Constructs <tt>DiscardPolicy</tt>.
1226 */
1227 public DiscardPolicy() { }
1228
1229 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1230 }
1231 }
1232
1233 /**
1234 * A handler for unexecutable tasks that discards the oldest
1235 * unhandled request.
1236 */
1237 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1238 /**
1239 * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1240 */
1241 public DiscardOldestPolicy() { }
1242
1243 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1244 if (!e.isShutdown()) {
1245 e.getQueue().poll();
1246 e.execute(r);
1247 }
1248 }
1249 }
1250 }