ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.1 by tim, Wed May 14 21:30:48 2003 UTC vs.
Revision 1.2 by dl, Tue May 27 18:14:40 2003 UTC

# Line 1 | Line 1
1   /*
2 < * @(#)ThreadPoolExecutor.java
2 > * Written by Doug Lea with assistance from members of JCP JSR-166
3 > * Expert Group and released to the public domain. Use, modify, and
4 > * redistribute this code in any way without acknowledgement.
5   */
6  
7   package java.util.concurrent;
8  
9 < import java.util.List;
9 > import java.util.*;
10  
11   /**
12 < * A {@link ThreadedExecutor} that executes each submitted task on one
12 > * An {@link ExecutorService} that executes each submitted task on one
13   * of several pooled threads.
14   *
15   * <p>Thread pools address two different problems at the same time:
# Line 27 | Line 29 | import java.util.List;
29   * However, programmers are urged to use the more convenient factory
30   * methods <tt>newCachedThreadPool</tt> (unbounded thread pool, with
31   * automatic thread reclamation), <tt>newFixedThreadPool</tt> (fixed
32 < * size thread pool), <tt>newSingleThreadExecutor</tt> (single
32 > * size thread pool), <tt>newSingleThreadPoolExecutor</tt> (single
33   * background thread for execution of tasks), and
34   * <tt>newThreadPerTaskExeceutor</tt> (execute each task in a new
35   * thread), that preconfigure settings for the most common usage
36   * scenarios.
37   *
36 *
38   * <p>This class also maintain some basic statistics, such as the
39 < * maximum number of active threads, or the maximum queue length, that
40 < * may be useful for monitoring and tuning executors.
39 > * number of completed tasks, that may be useful for monitoring and
40 > * tuning executors.
41   *
42   * <h3>Tuning guide</h3>
43   * <dl>
44 < * <dt>Minimum and maximum pool size</dt>
45 < * <dd>ThreadExecutor will
46 < * automatically adjust the pool size within the bounds set by
47 < * minimumPoolSize and maximumPoolSize.  When a new task is submitted,
48 < * and fewer than the minimum number of threads are running, a new
49 < * thread is created to handle the request, even if other worker
50 < * threads are idle.  If there are more than the minimum but less than
51 < * the maximum number of threads running, a new thread will be created
52 < * only if all other threads are busy.  By setting minimumPoolSize and
53 < * maximumPoolSize to N, you create a fixed-size thread pool.</dd>
54 < *
55 < * <dt>Keep-alive</dt>
56 < * <dd>The keepAliveTime determines what happens to idle
57 < * threads.  If the pool currently has more than the minimum number of
58 < * threads, excess threads will be terminated if they have been idle
59 < * for more than the keepAliveTime.</dd>
60 < *
61 < * <dt>Queueing</dt>
62 < * <dd>You are free to specify the queuing mechanism used
63 < * to handle submitted tasks.  The newCachedThreadPool factory method
64 < * uses queueless synchronous channels to to hand off work to threads.
44 > *
45 > * <dt>Core and maximum pool size</dt>
46 > *
47 > * <dd>A ThreadPoolExecutor will automatically adjust the pool size
48 > * according to the bounds set by corePoolSize and maximumPoolSize.
49 > * When a new task is submitted, and fewer than corePoolSize threads
50 > * are running, a new thread is created to handle the request, even if
51 > * other worker threads are idle.  If there are more than the
52 > * corePoolSize but less than maximumPoolSize threads running, a new
53 > * thread will be created only if the queue is full.  By setting
54 > * corePoolSize and maximumPoolSize the same, you create a fixed-size
55 > * thread pool.</dd>
56 > *
57 > * <dt>Keep-alive</dt>
58 > *
59 > * <dd>The keepAliveTime determines what happens to idle threads.  If
60 > * the pool currently has more than the core number of threads, excess
61 > * threads will be terminated if they have been idle for more than the
62 > * keepAliveTime.</dd>
63 > *
64 > * <dt>Queueing</dt>
65 > *
66 > * <dd>You are free to specify the queuing mechanism used to handle
67 > * submitted tasks.  The newCachedThreadPool factory method uses
68 > * queueless synchronous channels to to hand off work to threads.
69   * This is a safe, conservative policy that avoids lockups when
70   * handling sets of requests that might have internal dependencies.
71   * The newFixedThreadPool factory method uses a LinkedBlockingQueue,
# Line 70 | Line 75 | import java.util.List;
75   * affect each others execution. For example, in an http server.  When
76   * given a choice, this pool always prefers adding a new thread rather
77   * than queueing if there are currently fewer than the current
78 < * getMinimumPoolSize threads running, but otherwise always prefers
78 > * getCorePoolSize threads running, but otherwise always prefers
79   * queuing a request rather than adding a new thread.
80   *
81   * <p>While queuing can be useful in smoothing out transient bursts of
82   * requests, especially in socket-based services, it is not very well
83   * behaved when commands continue to arrive on average faster than
84 < * they can be processed.  Using a bounded queue implements an overflow
80 < * policy which drops requests which cannot be handled due to insufficient
81 < * capacity.
84 > * they can be processed.  
85   *
86   * Queue sizes and maximum pool sizes can often be traded off for each
87   * other. Using large queues and small pools minimizes CPU usage, OS
# Line 90 | Line 93 | import java.util.List;
93   * sizes, which keeps CPUs busier but may encounter unacceptable
94   * scheduling overhead, which also decreases throughput.
95   * </dd>
96 + *
97   * <dt>Creating new threads</dt>
98 < * <dd>New threads are created through the
99 < * Callbacks.  By default, threads are created simply with
100 < * the new Thread(Runnable) constructor, but by overriding
101 < * Callbacks.newThread, you can alter the thread's name,
102 < * thread group, priority, daemon status, etc.
103 < * </dd>
98 > *
99 > * <dd>New threads are created using a ThreadFactory.  By default,
100 > * threads are created simply with the new Thread(Runnable)
101 > * constructor, but by supplying a different ThreadFactory, you can
102 > * alter the thread's name, thread group, priority, daemon status,
103 > * etc.  </dd>
104 > *
105   * <dt>Before and after intercepts</dt>
106 < * <dd>The Callbacks class has
107 < * methods which are called before and after execution of a task.
108 < * These can be used to manipulate the execution environment (for
109 < * example, reinitializing ThreadLocals), gather statistics, or
110 < * perform logging.
111 < * </dd>
106 > *
107 > * <dd>This class has overridable methods that which are called before
108 > * and after execution of each task.  These can be used to manipulate
109 > * the execution environment (for example, reinitializing
110 > * ThreadLocals), gather statistics, or perform logging.  </dd>
111 > *
112   * <dt>Blocked execution</dt>
113 < * <dd>There are a number of factors which can
114 < * bound the number of tasks which can execute at once, including the
115 < * maximum pool size and the queuing mechanism used.  If you are using
116 < * a synchronous queue, the execute() method will block until threads
117 < * are available to execute.  If you are using a bounded queue, then
118 < * tasks will be discarded if the bound is reached.  If the executor
119 < * determines that a task cannot be executed because it has been
120 < * refused by the queue and no threads are available, the
121 < * Callbacks.cannotExecute method will be called.
117 < * </dd>
113 > *
114 > * <dd>There are a number of factors which can bound the number of
115 > * tasks which can execute at once, including the maximum pool size
116 > * and the queuing mechanism used.  If the executor determines that a
117 > * task cannot be executed because it has been refused by the queue
118 > * and no threads are available, or because the executor has been shut
119 > * down, the RejectedExecutionHandler's rejectedExecution method is
120 > * invoked.  </dd>
121 > *
122   * <dt>Termination</dt>
123 < * <dd>ThreadExecutor supports two shutdown options,
124 < * immediate and graceful.  In an immediate shutdown, any threads
125 < * currently executing are interrupted, and any tasks not yet begun
126 < * are returned from the shutdownNow call.  In a graceful shutdown,
127 < * all queued tasks are allowed to run, but new tasks may not be
128 < * submitted.
123 > *
124 > * <dd>ThreadPoolExecutor supports two shutdown options, immediate and
125 > * graceful.  In an immediate shutdown, any threads currently
126 > * executing are interrupted, and any tasks not yet begun are returned
127 > * from the shutdownNow call.  In a graceful shutdown, all queued
128 > * tasks are allowed to run, but new tasks may not be submitted.
129   * </dd>
130 + *
131   * </dl>
132   *
133   * @since 1.5
134 < * @see CannotExecuteHandler
134 > * @see RejectedExecutionHandler
135   * @see Executors
136   * @see ThreadFactory
137   *
# Line 134 | Line 139 | import java.util.List;
139   * @revised $Date$
140   * @editor $Author$
141   *
137 * @fixme If greater control is needed, you can use the
138 *        constructor with custom parameters, selectively override
139 *        <tt>Callbacks</tt>, and/or dynamically change tuning
140 *        parameters
141 *
142 * @fixme <br/> Brian copied some stuff from dl.u.c for the tuning guide; please
143 *        review to make sure that it is still correct
144 *
145 * @fixme <br/> Please check if Brian's statements about queuing and blocking
146 *        in the tuning guide are correct.
142   */
143 < public class ThreadPoolExecutor implements ThreadedExecutor {
143 > public class ThreadPoolExecutor implements ExecutorService {
144 >    /**
145 >     * Queue used for holding tasks and handing off to worker threads.
146 >     */
147 >    private final BlockingQueue<Runnable> workQueue;
148 >
149 >    /**
150 >     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
151 >     * workers set.
152 >     */
153 >    private final ReentrantLock mainLock = new ReentrantLock();
154 >
155 >    /**
156 >     * Wait condition to support awaitTermination
157 >     */
158 >    private final Condition termination = mainLock.newCondition();
159 >
160 >    /**
161 >     * Set containing all worker threads in pool.
162 >     */
163 >    private final Set<Worker> workers = new HashSet<Worker>();
164 >
165 >    /**
166 >     * Timeout in nanosecods for idle threads waiting for work.
167 >     * Threads use this timeout only when there are more than
168 >     * corePoolSize present. Otherwise they wait forever for new work.
169 >     */
170 >    private volatile long  keepAliveTime;
171 >
172 >    /**
173 >     * Core pool size, updated only while holding mainLock,
174 >     * but volatile to allow concurrent readability even
175 >     * during updates.
176 >     */
177 >    private volatile int   corePoolSize;
178 >
179 >    /**
180 >     * Maximum pool size, updated only while holding mainLock
181 >     * but volatile to allow concurrent readability even
182 >     * during updates.
183 >     */
184 >    private volatile int   maximumPoolSize;
185 >
186 >    /**
187 >     * Current pool size, updated only while holding mainLock
188 >     * but volatile to allow concurrent readability even
189 >     * during updates.
190 >     */
191 >    private volatile int   poolSize;
192 >
193 >    /**
194 >     * Shutdown status, becomes (and remains) nonzero when shutdown called.
195 >     */
196 >    private volatile int shutdownStatus;
197 >
198 >    // Special values for status
199 >    private static final int NOT_SHUTDOWN       = 0;
200 >    private static final int SHUTDOWN_WHEN_IDLE = 1;
201 >    private static final int SHUTDOWN_NOW       = 2;        
202 >
203 >    /**
204 >     * Latch that becomes true when all threads terminate after shutdown.
205 >     */
206 >    private volatile boolean isTerminated;
207 >
208 >    /**
209 >     * Handler called when saturated or shutdown in execute.
210 >     */
211 >    private volatile RejectedExecutionHandler handler = defaultHandler;
212 >
213 >    /**
214 >     * Factory for new threads.
215 >     */
216 >    private volatile ThreadFactory threadFactory = defaultThreadFactory;
217 >
218 >    /**
219 >     * Tracks largest attained pool size.
220 >     */
221 >    private int largestPoolSize;
222 >
223 >    /**
224 >     * Counter for completed tasks. Updated only on termination of
225 >     * worker threads.
226 >     */
227 >    private long completedTaskCount;
228 >
229 >    private static final ThreadFactory defaultThreadFactory =
230 >        new ThreadFactory() {
231 >            public Thread newThread(Runnable r) {
232 >                return new Thread(r);
233 >            }
234 >        };
235 >
236 >    private static final RejectedExecutionHandler defaultHandler =
237 >        new AbortPolicy();
238 >
239 >    /**
240 >     * Create and return a new thread running firstTask as its first
241 >     * task. Call only while holding mainLock
242 >     */
243 >    private Thread addThread(Runnable firstTask) {
244 >        Worker w = new Worker(firstTask);
245 >        Thread t = threadFactory.newThread(w);
246 >        w.thread = t;
247 >        workers.add(w);
248 >        int nt = ++poolSize;
249 >        if (nt > largestPoolSize)
250 >            largestPoolSize = nt;
251 >        return t;
252 >    }
253 >
254 >    /**
255 >     * Create and start a new thread running firstTask as its first
256 >     * task, only if less than corePoolSize threads are running.
257 >     * @return true if successful.
258 >     */
259 >    private boolean addIfUnderCorePoolSize(Runnable task) {
260 >        Thread t = null;
261 >        mainLock.lock();
262 >        try {
263 >            if (poolSize < corePoolSize)
264 >                t = addThread(task);
265 >        }
266 >        finally {
267 >            mainLock.unlock();
268 >        }
269 >        if (t == null)
270 >            return false;
271 >        t.start();
272 >        return true;
273 >    }
274 >
275 >    /**
276 >     * Create and start a new thread only if less than maximumPoolSize
277 >     * threads are running.  The new thread runs as its first task the
278 >     * next task in queue, or if there is none, the given task.
279 >     * @return null on failure, else the first task to be run by new thread.
280 >     */
281 >    private Runnable addIfUnderMaximumPoolSize(Runnable task) {
282 >        Thread t = null;
283 >        Runnable next = null;
284 >        mainLock.lock();
285 >        try {
286 >            if (poolSize < maximumPoolSize) {
287 >                next = workQueue.poll();
288 >                if (next == null)
289 >                    next = task;
290 >                t = addThread(next);
291 >            }
292 >        }
293 >        finally {
294 >            mainLock.unlock();
295 >        }
296 >        if (t == null)
297 >            return null;
298 >        t.start();
299 >        return next;
300 >    }
301 >
302 >
303 >    /**
304 >     * Get the next task for a worker thread to run.
305 >     */
306 >    private Runnable getTask() throws InterruptedException {
307 >        for (;;) {
308 >            int stat = shutdownStatus;
309 >            if (stat == SHUTDOWN_NOW)
310 >                return null;
311 >            long timeout = keepAliveTime;
312 >            if (timeout <= 0) // must die immediately for 0 timeout
313 >                return null;
314 >            if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying
315 >                return workQueue.poll();
316 >            if (poolSize <= corePoolSize)   // untimed wait if core
317 >                return workQueue.take();
318 >            Runnable task =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
319 >            if (task != null)
320 >                return task;
321 >            if (poolSize > corePoolSize) // timed out
322 >                return null;
323 >            // else, after timeout, pool shrank so shouldn't die, so retry
324 >        }
325 >    }
326 >
327 >    /**
328 >     * Perform bookkeeping for a terminated worker thread.
329 >     */
330 >    private void workerDone(Worker w) {
331 >        boolean allDone = false;
332 >        mainLock.lock();
333 >        try {
334 >            completedTaskCount += w.completedTasks;
335 >            workers.remove(w);
336 >
337 >            if (--poolSize > 0)
338 >                return;
339 >
340 >            // If this was last thread, deal with potential shutdown
341 >            int stat = shutdownStatus;
342 >            
343 >            // If there are queued tasks but no threads, create replacement.
344 >            if (stat != SHUTDOWN_NOW) {
345 >                Runnable r = workQueue.poll();
346 >                if (r != null) {
347 >                    addThread(r).start();
348 >                    return;
349 >                }
350 >            }
351 >
352 >            // if no tasks and not shutdown, can exit without replacement
353 >            if (stat == NOT_SHUTDOWN)
354 >                return;
355 >
356 >            allDone = true;
357 >            isTerminated = true;
358 >            termination.signalAll();
359 >        }
360 >        finally {
361 >            mainLock.unlock();
362 >        }
363 >
364 >        if (allDone) // call outside lock
365 >            terminated();
366 >    }
367 >
368 >    /**
369 >     *  Worker threads
370 >     */
371 >    private class Worker implements Runnable {
372 >
373 >        /**
374 >         * The runLock is acquired and released surrounding each task
375 >         * execution. It mainly protects against interrupts that are
376 >         * intended to cancel the worker thread from instead
377 >         * interrupting the task being run.
378 >         */
379 >        private final ReentrantLock runLock = new ReentrantLock();
380 >
381 >        /**
382 >         * Initial task to run before entering run loop
383 >         */
384 >        private Runnable firstTask;
385 >
386 >        /**
387 >         * Per thread completed task counter; accumulated
388 >         * into completedTaskCount upon termination.
389 >         */
390 >        volatile long completedTasks;
391 >
392 >        /**
393 >         * Thread this worker is running in.  Acts as a final field,
394 >         * but cannot be set until thread is created.
395 >         */
396 >        Thread thread;
397 >
398 >        Worker(Runnable firstTask) {
399 >            this.firstTask = firstTask;
400 >        }
401 >
402 >        boolean isActive() {
403 >            return runLock.isLocked();
404 >        }
405 >
406 >        /**
407 >         * Interrupt thread if not running a task
408 >         */
409 >        void interruptIfIdle() {
410 >            if (runLock.tryLock()) {
411 >                try {
412 >                    thread.interrupt();
413 >                }
414 >                finally {
415 >                    runLock.unlock();
416 >                }
417 >            }
418 >        }
419 >
420 >        /**
421 >         * Cause thread to die even if running a task.
422 >         */
423 >        void interruptNow() {
424 >            thread.interrupt();
425 >        }
426 >
427 >        /**
428 >         * Run a single task between before/after methods.
429 >         */
430 >        private void runTask(Runnable task) {
431 >            runLock.lock();
432 >            try {
433 >                // Abort now if immediate cancel.  Otherwise, we have
434 >                // committed to run this task.
435 >                if (shutdownStatus == SHUTDOWN_NOW)
436 >                    return;
437 >
438 >                Thread.interrupted(); // clear interrupt status on entry
439 >                boolean ran = false;
440 >                beforeExecute(thread, task);
441 >                try {
442 >                    task.run();
443 >                    ran = true;
444 >                    afterExecute(task, null);
445 >                    ++completedTasks;
446 >                }
447 >                catch(RuntimeException ex) {
448 >                    if (!ran)
449 >                        afterExecute(task, ex);
450 >                    // else the exception occurred within
451 >                    // afterExecute itself in which case we don't
452 >                    // want to call it again.
453 >                    throw ex;
454 >                }
455 >            }
456 >            finally {
457 >                runLock.unlock();
458 >            }
459 >        }
460 >
461 >        /**
462 >         * Main run loop
463 >         */
464 >        public void run() {
465 >            try {
466 >                for (;;) {
467 >                    Runnable task;
468 >                    if (firstTask != null) {
469 >                        task = firstTask;
470 >                        firstTask = null;
471 >                    }
472 >                    else {
473 >                        task = getTask();
474 >                        if (task == null)
475 >                            break;
476 >                    }
477 >                    runTask(task);
478 >                    task = null; // unnecessary but can help GC
479 >                }
480 >            }
481 >            catch(InterruptedException ie) {
482 >                // fall through
483 >            }
484 >            finally {
485 >                workerDone(this);
486 >            }
487 >        }
488 >    }
489  
490      /**
491       * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
492       * parameters.  It may be more convenient to use one of the factory
493       * methods instead of this general purpose constructor.
494       *
495 <     * @param minThreads the minimum number of threads to keep in the
495 >     * @param corePoolSize the number of threads to keep in the
496       * pool, even if they are idle.
497 <     * @param maxThreads the maximum number of threads to allow in the
497 >     * @param maximumPoolSize the maximum number of threads to allow in the
498       * pool.
499       * @param keepAliveTime when the number of threads is greater than
500 <     * the minimum, this is the maximum time that excess idle threads
500 >     * the core, this is the maximum time that excess idle threads
501       * will wait for new tasks before terminating.
502 <     * @param granularity the time unit for the keepAliveTime
502 >     * @param unit the time unit for the keepAliveTime
503       * argument.
504       * @param workQueue the queue to use for holding tasks before the
505       * are executed. This queue will hold only the <tt>Runnable</tt>
506       * tasks submitted by the <tt>execute</tt> method.
507 <     * @throws IllegalArgumentException if minThreads, maxThreads, or
508 <     * keepAliveTime less than zero, or if minThreads greater than
509 <     * maxThreads.
507 >     * @throws IllegalArgumentException if corePoolSize, or
508 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
509 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
510       * @throws NullPointerException if <tt>workQueue</tt> is null
511       */
512 <    public ThreadPoolExecutor(int minThreads,
513 <                              int maxThreads,
512 >    public ThreadPoolExecutor(int corePoolSize,
513 >                              int maximumPoolSize,
514                                long keepAliveTime,
515 <                              TimeUnit granularity,
516 <                              BlockingQueue workQueue) {}
517 <
518 <    /* Executor implementation. Inherit javadoc from ThreadedExecutor. */
519 <
180 <    public void execute(Runnable command) {}
515 >                              TimeUnit unit,
516 >                              BlockingQueue<Runnable> workQueue) {
517 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
518 >             defaultThreadFactory, defaultHandler);
519 >    }
520  
521 <    /* ThreadedExecutor implementation.  Inherit javadoc. */
521 >    /**
522 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
523 >     * parameters.
524 >     *
525 >     * @param corePoolSize the number of threads to keep in the
526 >     * pool, even if they are idle.
527 >     * @param maximumPoolSize the maximum number of threads to allow in the
528 >     * pool.
529 >     * @param keepAliveTime when the number of threads is greater than
530 >     * the core, this is the maximum time that excess idle threads
531 >     * will wait for new tasks before terminating.
532 >     * @param unit the time unit for the keepAliveTime
533 >     * argument.
534 >     * @param workQueue the queue to use for holding tasks before the
535 >     * are executed. This queue will hold only the <tt>Runnable</tt>
536 >     * tasks submitted by the <tt>execute</tt> method.
537 >     * @param threadFactory the factory to use when the executor
538 >     * creates a new thread.
539 >     * @throws IllegalArgumentException if corePoolSize, or
540 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
541 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
542 >     * @throws NullPointerException if <tt>workQueue</tt>
543 >     * or <tt>threadFactory</tt> are null.
544 >     */
545 >    public ThreadPoolExecutor(int corePoolSize,
546 >                              int maximumPoolSize,
547 >                              long keepAliveTime,
548 >                              TimeUnit unit,
549 >                              BlockingQueue<Runnable> workQueue,
550 >                              ThreadFactory threadFactory) {
551  
552 <    public void setThreadFactory(ThreadFactory threadFactory) {
552 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
553 >             threadFactory, defaultHandler);
554      }
555  
556 <    public ThreadFactory getThreadFactory() {
557 <        return null;
556 >    /**
557 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
558 >     * parameters.
559 >     *
560 >     * @param corePoolSize the number of threads to keep in the
561 >     * pool, even if they are idle.
562 >     * @param maximumPoolSize the maximum number of threads to allow in the
563 >     * pool.
564 >     * @param keepAliveTime when the number of threads is greater than
565 >     * the core, this is the maximum time that excess idle threads
566 >     * will wait for new tasks before terminating.
567 >     * @param unit the time unit for the keepAliveTime
568 >     * argument.
569 >     * @param workQueue the queue to use for holding tasks before the
570 >     * are executed. This queue will hold only the <tt>Runnable</tt>
571 >     * tasks submitted by the <tt>execute</tt> method.
572 >     * @param handler the handler to use when execution is blocked
573 >     * because the thread bounds and queue capacities are reached.
574 >     * @throws IllegalArgumentException if corePoolSize, or
575 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
576 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
577 >     * @throws NullPointerException if <tt>workQueue</tt>
578 >     * or  <tt>handler</tt> are null.
579 >     */
580 >    public ThreadPoolExecutor(int corePoolSize,
581 >                              int maximumPoolSize,
582 >                              long keepAliveTime,
583 >                              TimeUnit unit,
584 >                              BlockingQueue<Runnable> workQueue,
585 >                              RejectedExecutionHandler handler) {
586 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
587 >             defaultThreadFactory, handler);
588      }
589  
590 <    public void setCannotExecuteHandler(CannotExecuteHandler handler) {
590 >    /**
591 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
592 >     * parameters.
593 >     *
594 >     * @param corePoolSize the number of threads to keep in the
595 >     * pool, even if they are idle.
596 >     * @param maximumPoolSize the maximum number of threads to allow in the
597 >     * pool.
598 >     * @param keepAliveTime when the number of threads is greater than
599 >     * the core, this is the maximum time that excess idle threads
600 >     * will wait for new tasks before terminating.
601 >     * @param unit the time unit for the keepAliveTime
602 >     * argument.
603 >     * @param workQueue the queue to use for holding tasks before the
604 >     * are executed. This queue will hold only the <tt>Runnable</tt>
605 >     * tasks submitted by the <tt>execute</tt> method.
606 >     * @param threadFactory the factory to use when the executor
607 >     * creates a new thread.
608 >     * @param handler the handler to use when execution is blocked
609 >     * because the thread bounds and queue capacities are reached.
610 >     * @throws IllegalArgumentException if corePoolSize, or
611 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
612 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
613 >     * @throws NullPointerException if <tt>workQueue</tt>
614 >     * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
615 >     */
616 >    public ThreadPoolExecutor(int corePoolSize,
617 >                              int maximumPoolSize,
618 >                              long keepAliveTime,
619 >                              TimeUnit unit,
620 >                              BlockingQueue<Runnable> workQueue,
621 >                              ThreadFactory threadFactory,
622 >                              RejectedExecutionHandler handler) {
623 >        if (corePoolSize < 0 ||
624 >            maximumPoolSize <= 0 ||
625 >            maximumPoolSize < corePoolSize ||
626 >            keepAliveTime < 0)
627 >            throw new IllegalArgumentException();
628 >        if (workQueue == null || threadFactory == null || handler == null)
629 >            throw new NullPointerException();
630 >        this.corePoolSize = corePoolSize;
631 >        this.maximumPoolSize = maximumPoolSize;
632 >        this.workQueue = workQueue;
633 >        this.keepAliveTime = unit.toNanos(keepAliveTime);
634 >        this.threadFactory = threadFactory;
635 >        this.handler = handler;
636      }
637  
194    public CannotExecuteHandler getCannotExecuteHandler() {
195        return null;
196    }
638  
639 <    public BlockingQueue getQueue() {
640 <        return null;
639 >    /**
640 >     * Executes the given task sometime in the future.  The task
641 >     * may execute in a new thread or in an existing pooled thread.
642 >     *
643 >     * If the task cannot be submitted for execution, either because this
644 >     * executor has been shutdown or because its capacity has been reached,
645 >     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.  
646 >     *
647 >     * @param command the task to execute
648 >     * @throws RejectedExecutionException at discretion of
649 >     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted for execution
650 >     */
651 >    public void execute(Runnable command) {
652 >        for (;;) {
653 >            if (shutdownStatus != NOT_SHUTDOWN) {
654 >                handler.rejectedExecution(command, this);
655 >                return;
656 >            }
657 >            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
658 >                return;
659 >            if (workQueue.offer(command))
660 >                return;
661 >            Runnable r = addIfUnderMaximumPoolSize(command);
662 >            if (r == command)
663 >                return;
664 >            if (r == null) {
665 >                handler.rejectedExecution(command, this);
666 >                return;
667 >            }
668 >            // else retry
669 >        }
670      }
671 +        
672 +    public void shutdown() {
673 +        mainLock.lock();
674 +        try {
675 +            if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow
676 +                shutdownStatus = SHUTDOWN_WHEN_IDLE;
677  
678 <    public void shutdown() {}
678 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
679 >                it.next().interruptIfIdle();
680 >        }
681 >        finally {
682 >            mainLock.unlock();
683 >        }
684 >    }
685  
686      public List shutdownNow() {
687 <        return null;
687 >        mainLock.lock();
688 >        try {
689 >            shutdownStatus = SHUTDOWN_NOW;
690 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
691 >                it.next().interruptNow();
692 >        }
693 >        finally {
694 >            mainLock.unlock();
695 >        }
696 >        return Arrays.asList(workQueue.toArray());
697      }
698  
699      public boolean isShutdown() {
700 <        return false;
700 >        return shutdownStatus != NOT_SHUTDOWN;
701      }
702  
212    public void interrupt() {}
213
703      public boolean isTerminated() {
704 <        return false;
704 >        return isTerminated;
705      }
706  
707 <    public boolean awaitTermination(long timeout, TimeUnit granularity)
708 <            throws InterruptedException {
709 <        return false;
707 >    public boolean awaitTermination(long timeout, TimeUnit unit)
708 >        throws InterruptedException {
709 >        mainLock.lock();
710 >        try {
711 >            return termination.await(timeout, unit);
712 >        }
713 >        finally {
714 >            mainLock.unlock();
715 >        }
716 >    }
717 >    
718 >    /**
719 >     * Sets the thread factory used to create new threads.
720 >     *
721 >     * @param threadFactory the new thread factory
722 >     */
723 >    public void setThreadFactory(ThreadFactory threadFactory) {
724 >        this.threadFactory = threadFactory;
725      }
726  
727 <    /* @fixme Should any of these be included in ThreadedExecutor interface? */
727 >    /**
728 >     * Returns the thread factory used to create new threads.
729 >     *
730 >     * @return the current thread factory
731 >     */
732 >    public ThreadFactory getThreadFactory() {
733 >        return threadFactory;
734 >    }
735  
736      /**
737 <     * Sets the minimum allowed number of threads.  This overrides any
227 <     * value set in the constructor.
737 >     * Sets a new handler for unexecutable tasks.
738       *
739 <     * @param minThreads the new minimum
230 <     * @throws IllegalArgumentException if <tt>minThreads</tt> less than zero
739 >     * @param handler the new handler
740       */
741 <    public void setMinimumPoolSize(int minThreads) {}
741 >    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
742 >        this.handler = handler;
743 >    }
744  
745      /**
746 <     * Returns the minimum allowed number of threads.
746 >     * Returns the current handler for unexecutable tasks.
747       *
748 <     * @return the minimum number of threads
748 >     * @return the current handler
749       */
750 <    public int getMinimumPoolSize() { return 0; }
750 >    public RejectedExecutionHandler getRejectedExecutionHandler() {
751 >        return handler;
752 >    }
753  
754      /**
755 <     * Sets the maximum allowed number of threads. This overrides any
756 <     * value set in the constructor.
755 >     * Returns the task queue used by this executor.  Note that
756 >     * this queue may be in active use.  Retrieveing the task queue
757 >     * does not prevent queued tasks from executing.
758       *
759 <     * @param maxThreads the new maximum
246 <     * @throws IllegalArgumentException if maxThreads less than zero or
247 <     * the {@link #getMinimumPoolSize minimum pool size}
759 >     * @return the task queue
760       */
761 <    public void setMaximumPoolSize(int maxThreads) {}
761 >    public BlockingQueue<Runnable> getQueue() {
762 >        return workQueue;
763 >    }
764 >
765 >    /**
766 >     * Sets the core number of threads.  This overrides any value set
767 >     * in the constructor.  If the new value is smaller than the
768 >     * current value, excess existing threads will be terminated when
769 >     * they next become idle.
770 >     *
771 >     * @param corePoolSize the new core size
772 >     * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
773 >     */
774 >    public void setCorePoolSize(int corePoolSize) {
775 >        if (corePoolSize < 0)
776 >            throw new IllegalArgumentException();
777 >        mainLock.lock();
778 >        try {
779 >            int extra = this.corePoolSize - corePoolSize;
780 >            this.corePoolSize = corePoolSize;
781 >            if (extra > 0 && poolSize > corePoolSize) {
782 >                Iterator<Worker> it = workers.iterator();
783 >                while (it.hasNext() &&
784 >                       extra > 0 &&
785 >                       poolSize > corePoolSize &&
786 >                       workQueue.remainingCapacity() == 0) {
787 >                    it.next().interruptIfIdle();
788 >                    --extra;
789 >                }
790 >            }
791 >                
792 >        }
793 >        finally {
794 >            mainLock.unlock();
795 >        }
796 >    }
797 >
798 >    /**
799 >     * Returns the core number of threads.
800 >     *
801 >     * @return the core number of threads
802 >     */
803 >    public int getCorePoolSize() {
804 >        return corePoolSize;
805 >    }
806 >
807 >    /**
808 >     * Sets the maximum allowed number of threads. This overrides any
809 >     * value set in the constructor. If the new value is smaller than
810 >     * the current value, excess existing threads will be
811 >     * terminated when they next become idle.
812 >     *
813 >     * @param maximumPoolSize the new maximum
814 >     * @throws IllegalArgumentException if maximumPoolSize less than zero or
815 >     * the {@link #getCorePoolSize core pool size}
816 >     */
817 >    public void setMaximumPoolSize(int maximumPoolSize) {
818 >        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
819 >            throw new IllegalArgumentException();
820 >        mainLock.lock();
821 >        try {
822 >            int extra = this.maximumPoolSize - maximumPoolSize;
823 >            this.maximumPoolSize = maximumPoolSize;
824 >            if (extra > 0 && poolSize > maximumPoolSize) {
825 >                Iterator<Worker> it = workers.iterator();
826 >                while (it.hasNext() &&
827 >                       extra > 0 &&
828 >                       poolSize > maximumPoolSize) {
829 >                    it.next().interruptIfIdle();
830 >                    --extra;
831 >                }
832 >            }
833 >        }
834 >        finally {
835 >            mainLock.unlock();
836 >        }
837 >    }
838  
839      /**
840       * Returns the maximum allowed number of threads.
841       *
842 <     * @return the maximum number of threads
842 >     * @return the maximum allowed number of threads
843       */
844 <    public int getMaximumPoolSize() { return 0; }
844 >    public int getMaximumPoolSize() {
845 >        return maximumPoolSize;
846 >    }
847  
848      /**
849       * Sets the time limit for which threads may remain idle before
850 <     * being terminated.  If there are more than the minimum number of
850 >     * being terminated.  If there are more than the core number of
851       * threads currently in the pool, after waiting this amount of
852       * time without processing a task, excess threads will be
853       * terminated.  This overrides any value set in the constructor.
854       * @param time the time to wait.  A time value of zero will cause
855       * excess threads to terminate immediately after executing tasks.
856 <     * @param granularity  the time unit of the time argument
856 >     * @param unit  the time unit of the time argument
857       * @throws IllegalArgumentException if msecs less than zero
858       */
859 <    public void setKeepAliveTime(long time, TimeUnit granularity) {}
859 >    public void setKeepAliveTime(long time, TimeUnit unit) {
860 >        if (time < 0)
861 >            throw new IllegalArgumentException();
862 >        this.keepAliveTime = unit.toNanos(time);
863 >    }
864  
865      /**
866       * Returns the thread keep-alive time, which is the amount of time
867 <     * which threads in excess of the minimum pool size may remain
868 <     * idle before being terminated.
867 >     * which threads in excess of the core pool size may remain
868 >     * idle before being terminated.
869       *
870 <     * @param granularity the desired time unit of the result
870 >     * @param unit the desired time unit of the result
871       * @return the time limit
872       */
873 <    public long getKeepAliveTime(TimeUnit granularity) { return 0; }
873 >    public long getKeepAliveTime(TimeUnit unit) {
874 >        return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
875 >    }
876  
877      /* Statistics */
878  
# Line 285 | Line 881 | public class ThreadPoolExecutor implemen
881       *
882       * @return the number of threads
883       */
884 <    public int getPoolSize() { return 0; }
884 >    public int getPoolSize() {
885 >        return poolSize;
886 >    }
887  
888      /**
889 <     * Returns the current number of threads that are actively
889 >     * Returns the approximate number of threads that are actively
890       * executing tasks.
891       *
892       * @return the number of threads
893       */
894 <    public int getActiveCount() { return 0; }
894 >    public int getActiveCount() {
895 >        mainLock.lock();
896 >        try {
897 >            int n = 0;
898 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
899 >                if (it.next().isActive())
900 >                    ++n;
901 >            }
902 >            return n;
903 >        }
904 >        finally {
905 >            mainLock.unlock();
906 >        }
907 >    }
908  
909      /**
910 <     * Returns the maximum number of threads that have ever simultaneously
911 <     * executed tasks.
910 >     * Returns the largest number of threads that have ever
911 >     * simultaneously been in the pool.
912       *
913       * @return the number of threads
914       */
915 <    public int getMaximumActiveCount() { return 0; }
915 >    public int getLargestPoolSize() {
916 >        mainLock.lock();
917 >        try {
918 >            return largestPoolSize;
919 >        }
920 >        finally {
921 >            mainLock.unlock();
922 >        }
923 >    }
924  
925      /**
926 <     * Returns the number of tasks that have been queued but not yet executed.
926 >     * Returns the approximate total number of tasks that have been
927 >     * scheduled for execution. Because the states of tasks and
928 >     * threads may change dynamically during computation, the returned
929 >     * value is only an approximation.
930       *
931       * @return the number of tasks
932       */
933 <    public int getQueueCount() { return 0; }
933 >    public long getTaskCount() {
934 >        mainLock.lock();
935 >        try {
936 >            long n = completedTaskCount;
937 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); ) {
938 >                Worker w = it.next();
939 >                n += w.completedTasks;
940 >                if (w.isActive())
941 >                    ++n;
942 >            }
943 >            return n + workQueue.size();
944 >        }
945 >        finally {
946 >            mainLock.unlock();
947 >        }
948 >    }
949  
950      /**
951 <     * Returns the maximum number of tasks that have ever been queued
952 <     * waiting for execution.
951 >     * Returns the approximate total number of tasks that have
952 >     * completed execution. Because the states of tasks and threads
953 >     * may change dynamically during computation, the returned value
954 >     * is only an approximation.
955       *
956       * @return the number of tasks
957       */
958 <    public int getMaximumQueueCount() { return 0; }
958 >    public long getCompletedTaskCount() {
959 >        mainLock.lock();
960 >        try {
961 >            long n = completedTaskCount;
962 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
963 >                n += it.next().completedTasks;
964 >            return n;
965 >        }
966 >        finally {
967 >            mainLock.unlock();
968 >        }
969 >    }
970  
971      /**
972 <     * Returns the total number of tasks that have been scheduled for execution.
972 >     * Method invoked prior to executing the given Runnable in given
973 >     * thread.  This method may be used to re-initialize ThreadLocals,
974 >     * or to perform logging.
975       *
976 <     * @return the number of tasks
976 >     * @param t the thread that will run task r.
977 >     * @param r the task that will be executed.
978       */
979 <    public int getCumulativeTaskCount() { return 0; }
979 >    protected void beforeExecute(Thread t, Runnable r) { }
980  
981      /**
982 <     * Returns the total number of tasks that have completed execution.
982 >     * Method invoked upon completion of execution of the given
983 >     * Runnable.  If non-null, the Throwable is the uncaught exception
984 >     * that caused execution to terminate abruptly.
985       *
986 <     * @return the number of tasks
986 >     * @param r the runnable that has completed.
987 >     * @param t the exception that cause termination, or null if
988 >     * execution completed normally.
989       */
990 <    public int getCumulativeCompletedTaskCount() { return 0; }
990 >    protected void afterExecute(Runnable r, Throwable t) { }
991  
992 <    /* @fixme Various CannotExecuteHandler implementations. */
992 >    /**
993 >     * Method invoked when the Executor has terminated.  Default
994 >     * implementation does nothing.
995 >     */
996 >    protected void terminated() { }
997  
998      /**
999       * A handler for unexecutable tasks that runs these tasks directly in the
1000       * calling thread of the <tt>execute</tt> method.  This is the default
1001 <     * <tt>CannotExecuteHandler</tt>.
1001 >     * <tt>RejectedExecutionHandler</tt>.
1002       */
1003 <   public class CallerRunsPolicy implements CannotExecuteHandler {
1003 >   public static class CallerRunsPolicy implements RejectedExecutionHandler {
1004  
1005          /**
1006           * Constructs a <tt>CallerRunsPolicy</tt>.
1007           */
1008          public CallerRunsPolicy() { }
1009  
1010 <        public boolean cannotExecute(Runnable r, boolean isShutdown) {
1011 <            if (!isShutdown) {
1010 >        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1011 >            if (!e.isShutdown()) {
1012                  r.run();
1013              }
353            return true;
1014          }
1015      }
1016  
1017      /**
1018 <     * A handler for unexecutable tasks that throws a <tt>CannotExecuteException</tt>.
1018 >     * A handler for unexecutable tasks that throws a <tt>RejectedExecutionException</tt>.
1019       */
1020 <    public class AbortPolicy implements CannotExecuteHandler {
1020 >    public static class AbortPolicy implements RejectedExecutionHandler {
1021  
1022          /**
1023           * Constructs a <tt>AbortPolicy</tt>.
1024           */
1025          public AbortPolicy() { }
1026  
1027 <        public boolean cannotExecute(Runnable r, boolean isShutdown) {
1028 <            if (!isShutdown) {
369 <                throw new CannotExecuteException();
370 <            }
371 <            return true;
1027 >        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1028 >            throw new RejectedExecutionException();
1029          }
1030      }
1031  
# Line 376 | Line 1033 | public class ThreadPoolExecutor implemen
1033       * A handler for unexecutable tasks that waits until the task can be
1034       * submitted for execution.
1035       */
1036 <    public class WaitPolicy implements CannotExecuteHandler {
380 <
1036 >    public static class WaitPolicy implements RejectedExecutionHandler {
1037          /**
1038           * Constructs a <tt>WaitPolicy</tt>.
1039           */
1040          public WaitPolicy() { }
1041  
1042 <        public boolean cannotExecute(Runnable r, boolean isShutdown) {
1043 <            if (!isShutdown) {
1044 <                // FIXME: wait here
1045 <                // FIXME: throw CannotExecuteException if interrupted
1046 <                return false;
1042 >        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1043 >            if (!e.isShutdown()) {
1044 >                try {
1045 >                    e.getQueue().put(r);
1046 >                }
1047 >                catch (InterruptedException ie) {
1048 >                    Thread.currentThread().interrupt();
1049 >                    throw new RejectedExecutionException(ie);
1050 >                }
1051              }
392            return true;
1052          }
1053      }
1054  
1055      /**
1056       * A handler for unexecutable tasks that silently discards these tasks.
1057       */
1058 <    public class DiscardPolicy implements CannotExecuteHandler {
1058 >    public static class DiscardPolicy implements RejectedExecutionHandler {
1059  
1060          /**
1061           * Constructs <tt>DiscardPolicy</tt>.
1062           */
1063          public DiscardPolicy() { }
1064  
1065 <        public boolean cannotExecute(Runnable r, boolean isShutdown) {
407 <            return true;
1065 >        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1066          }
1067      }
1068  
1069      /**
1070       * A handler for unexecutable tasks that discards the oldest unhandled request.
1071       */
1072 <    public class DiscardOldestPolicy implements CannotExecuteHandler {
415 <
1072 >    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
1073          /**
1074 <         * Constructs a <tt>DiscardOldestPolicy</tt>.
1074 >         * Constructs a <tt>DiscardOldestPolicy</tt> for the given executor.
1075           */
1076          public DiscardOldestPolicy() { }
1077  
1078 <        public boolean cannotExecute(Runnable r, boolean isShutdown) {
1079 <            if (!isShutdown) {
1080 <                // FIXME: discard oldest here
1081 <                return false;
1078 >        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
1079 >            if (!e.isShutdown()) {
1080 >                e.getQueue().poll();
1081 >                e.execute(r);
1082              }
426            return true;
1083          }
1084      }
429
430    /*
431     * Methods invoked during various points of execution, allowing fine-grained
432     * control and monitoring.
433     */
434
435    /**
436     * Method invoked prior to executing the given Runnable in given
437     * thread.  This method may be used to re-initialize ThreadLocals,
438     * or to perform logging.
439     *
440     * @param t the thread that will run task r.
441     * @param r the task that will be executed.
442     */
443    protected void beforeExecute(Thread t, Runnable r) { }
444
445    /**
446     * Method invoked upon completion of execution of the given
447     * Runnable.  If non-null, the Throwable is the uncaught exception
448     * that caused execution to terminate abruptly.
449     *
450     * @param r the runnable that has completed.
451     * @param t the exception that cause termination, or null if
452     * execution completed normally.
453     */
454    protected void afterExecute(Runnable r, Throwable t) { }
455
456    /**
457     * Method invoked when the Executor has terminated.  Default
458     * implementation does nothing.
459     */
460    protected void terminated() { }
1085   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines