ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.85 by dl, Fri Jun 16 18:49:43 2006 UTC vs.
Revision 1.86 by dl, Sun Jun 18 22:11:48 2006 UTC

# Line 291 | Line 291 | import java.util.*;
291   */
292   public class ThreadPoolExecutor extends AbstractExecutorService {
293  
294 +    /**
295 +     * Permission for checking shutdown
296 +     */
297 +    private static final RuntimePermission shutdownPerm =
298 +        new RuntimePermission("modifyThread");
299 +
300      /*
301 <     * A TPE manages a largish set of control fields, mainly runState,
302 <     * poolSize, corePoolSize, maximumPoolSize.  In general, state
303 <     * changes only occur within mainLock regions, but nearly all
304 <     * fields are volatile, so can be read outside of locked
305 <     * regions. This enables the most performance-critical actions,
306 <     * such as enqueuing and dequeing tasks in workQueue, to normally
307 <     * proceed without holding this lock when they see that the state
308 <     * allows actions. This sometimes requires a form of double-check.
309 <     * For example when it appears that poolSize is less than
310 <     * corePoolSize, addIfUnderCorePoolSize is called, which checks
311 <     * sizes and runState under the lock before actually creating a
312 <     * new thread.
313 <     *
314 <     * The main lifecyle control is via runState, taking on values:
301 >     * A ThreadPoolExecutor manages a largish set of control fields.
302 >     * State changes in fields that affect execution control
303 >     * guarantees only occur within mainLock regions. These include
304 >     * fields runState, poolSize, corePoolSize, and maximumPoolSize
305 >     * However, these fields are also declared volatile, so can be
306 >     * read outside of locked regions. (Also, the workers Set is
307 >     * accessed only under lock).
308 >     *
309 >     * The other fields representng user control parameters do not
310 >     * affect execution invariants, so are declared volatile and
311 >     * allowed to change (via user methods) asynchronously with
312 >     * execution. These fields include: allowCoreThreadTimeOut,
313 >     * keepAliveTime, the rejected execution handler, and
314 >     * threadfactory are not updated within locks
315 >     *
316 >     * The extensive use of volatiles here enables the most
317 >     * performance-critical actions, such as enqueuing and dequeing
318 >     * tasks in the workQueue, to normally proceed without holding the
319 >     * mainLock when they see that the state allows actions, although,
320 >     * as described below, sometimes at the expense of re-checks
321 >     * following these actions.
322 >     */
323 >
324 >    /**
325 >     * runState provides the main lifecyle control, taking on values:
326 >     *
327       *   RUNNING:  Accept new tasks and process queued tasks
328       *   SHUTDOWN: Don't accept new tasks, but process queued tasks
329       *   STOP:     Don't accept new tasks,  don't process queued tasks,
330       *             and interrupt in-progress tasks
331       *   TERMINATED: Same as stop, plus all threads have terminated
332 <     * with transitions:
332 >     *
333 >     * The numerical order among these values matters, to allow
334 >     * ordered comparisons. The runState monotonically increases over
335 >     * time, but need not hit each state. The transitions are:
336       *
337       * RUNNING -> SHUTDOWN
338 <     *    On invocation of shutdown() when pool or queue nonempty
339 <     * {RUNNING or SHUTDOWN}  -> STOP  
340 <     *    On invocation of shutdownNow() when pool or queue nonempty
341 <     * {SHUTDOWN or STOP} -> TERMINATED
342 <     *    When both queue and pool become empty
343 <     * RUNNING -> TERMINATED
344 <     *    On invocation of shutdown when both queue and pool empty
324 <     *    (This bypasses creating a new thread just to cause termination)
325 <     *
326 <     */
327 <
328 <    /**
329 <     * Permission for checking shutdown
338 >     *    On invocation of shutdown(), perhaps implicity in finalize()
339 >     * (RUNNING or SHUTDOWN) -> STOP  
340 >     *    On invocation of shutdownNow()
341 >     * SHUTDOWN -> TERMINATED
342 >     *    When both queue and pool are empty
343 >     * STOP -> TERMINATED
344 >     *    When pool is empty
345       */
346 <    private static final RuntimePermission shutdownPerm =
347 <        new RuntimePermission("modifyThread");
346 >    volatile int runState;
347 >    static final int RUNNING    = 0;
348 >    static final int SHUTDOWN   = 1;
349 >    static final int STOP       = 2;
350 >    static final int TERMINATED = 3;
351  
352      /**
353 <     * Queue used for holding tasks and handing off to worker threads.
353 >     * The queue used for holding tasks and handing off to worker
354 >     * threads.  Note that when using this queue, we do not require
355 >     * that workQueue.poll() returning null necessarily means that
356 >     * workQueue.isEmpty(), so must sometimes check both. This
357 >     * accommodates special-purpose queues such as DelayQueues for
358 >     * which poll() is allowed to return null even if it may later
359 >     * return non-null when delays expire.
360       */
361      private final BlockingQueue<Runnable> workQueue;
362  
# Line 348 | Line 372 | public class ThreadPoolExecutor extends
372      private final Condition termination = mainLock.newCondition();
373  
374      /**
375 <     * Set containing all worker threads in pool.
375 >     * Set containing all worker threads in pool. Accessed onl when
376 >     * holding mainLock.
377       */
378      private final HashSet<Worker> workers = new HashSet<Worker>();
379  
380      /**
381       * Timeout in nanoseconds for idle threads waiting for work.
382 <     * Threads use this timeout only when there are more than
383 <     * corePoolSize present. Otherwise they wait forever for new work.
382 >     * Threads use this timeout when there are more than corePoolSize
383 >     * present or if allowCoreThreadTimeOut. Otherwise they wait
384 >     * forever for new work.
385       */
386      private volatile long  keepAliveTime;
387  
388      /**
389 <     * If false (default) core threads stay alive even when idle.
390 <     * If true, core threads use keepAliveTime to time out waiting for work.
389 >     * If false (default) core threads stay alive even when idle.  If
390 >     * true, core threads use keepAliveTime to time out waiting for
391 >     * work.
392       */
393      private volatile boolean allowCoreThreadTimeOut;
394  
395      /**
396 <     * Core pool size, updated only while holding mainLock,
397 <     * but volatile to allow concurrent readability even
371 <     * during updates.
396 >     * Core pool size, updated only while holding mainLock, but
397 >     * volatile to allow concurrent readability even during updates.
398       */
399      private volatile int   corePoolSize;
400  
401      /**
402 <     * Maximum pool size, updated only while holding mainLock
403 <     * but volatile to allow concurrent readability even
378 <     * during updates.
402 >     * Maximum pool size, updated only while holding mainLock but
403 >     * volatile to allow concurrent readability even during updates.
404       */
405      private volatile int   maximumPoolSize;
406  
407      /**
408 <     * Current pool size, updated only while holding mainLock
409 <     * but volatile to allow concurrent readability even
385 <     * during updates.
408 >     * Current pool size, updated only while holding mainLock but
409 >     * volatile to allow concurrent readability even during updates.
410       */
411      private volatile int   poolSize;
412  
413      /**
390     * Lifecycle state
391     */
392    volatile int runState;
393
394    /*
395     * Special values for runState. The numerical order among values
396     * matters. The runState monotonically increases over time, but
397     * need not hit each state.
398     */
399    /** Normal, not-shutdown mode */
400    static final int RUNNING    = 0;
401    /** Controlled shutdown mode */
402    static final int SHUTDOWN   = 1;
403    /** Immediate shutdown mode */
404    static final int STOP       = 2;
405    /** Final state */
406    static final int TERMINATED = 3;
407
408    /**
414       * Handler called when saturated or shutdown in execute.
415       */
416      private volatile RejectedExecutionHandler handler;
417  
418      /**
419 <     * Factory for new threads.
419 >     * Factory for new threads. All threads are created using this
420 >     * factory (via method addThread).  All callers must be prepared
421 >     * for addThread to fail by returning null, which may reflect a
422 >     * system or user's policy limiting the number of threads.  Even
423 >     * though it is not treated as an error, failure to create threads
424 >     * may result in new tasks being rejected or existing ones
425 >     * remaining stuck in the queue. On the other hand, no special
426 >     * precautions exist to handle OutOfMemoryErrors that might be
427 >     * thrown while trying to create threads, since there is generally
428 >     * no recourse from within this class.
429       */
430      private volatile ThreadFactory threadFactory;
431  
# Line 432 | Line 446 | public class ThreadPoolExecutor extends
446      private static final RejectedExecutionHandler defaultHandler =
447          new AbortPolicy();
448  
449 +    // Constructors
450 +
451      /**
452 <     * Invokes the rejected execution handler for the given command.
452 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
453 >     * parameters and default thread factory and rejected execution handler.
454 >     * It may be more convenient to use one of the {@link Executors} factory
455 >     * methods instead of this general purpose constructor.
456 >     *
457 >     * @param corePoolSize the number of threads to keep in the
458 >     * pool, even if they are idle.
459 >     * @param maximumPoolSize the maximum number of threads to allow in the
460 >     * pool.
461 >     * @param keepAliveTime when the number of threads is greater than
462 >     * the core, this is the maximum time that excess idle threads
463 >     * will wait for new tasks before terminating.
464 >     * @param unit the time unit for the keepAliveTime
465 >     * argument.
466 >     * @param workQueue the queue to use for holding tasks before they
467 >     * are executed. This queue will hold only the <tt>Runnable</tt>
468 >     * tasks submitted by the <tt>execute</tt> method.
469 >     * @throws IllegalArgumentException if corePoolSize, or
470 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
471 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
472 >     * @throws NullPointerException if <tt>workQueue</tt> is null
473       */
474 <    void reject(Runnable command) {
475 <        handler.rejectedExecution(command, this);
474 >    public ThreadPoolExecutor(int corePoolSize,
475 >                              int maximumPoolSize,
476 >                              long keepAliveTime,
477 >                              TimeUnit unit,
478 >                              BlockingQueue<Runnable> workQueue) {
479 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
480 >             Executors.defaultThreadFactory(), defaultHandler);
481 >    }
482 >
483 >    /**
484 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
485 >     * parameters and default rejected execution handler.
486 >     *
487 >     * @param corePoolSize the number of threads to keep in the
488 >     * pool, even if they are idle.
489 >     * @param maximumPoolSize the maximum number of threads to allow in the
490 >     * pool.
491 >     * @param keepAliveTime when the number of threads is greater than
492 >     * the core, this is the maximum time that excess idle threads
493 >     * will wait for new tasks before terminating.
494 >     * @param unit the time unit for the keepAliveTime
495 >     * argument.
496 >     * @param workQueue the queue to use for holding tasks before they
497 >     * are executed. This queue will hold only the <tt>Runnable</tt>
498 >     * tasks submitted by the <tt>execute</tt> method.
499 >     * @param threadFactory the factory to use when the executor
500 >     * creates a new thread.
501 >     * @throws IllegalArgumentException if corePoolSize, or
502 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
503 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
504 >     * @throws NullPointerException if <tt>workQueue</tt>
505 >     * or <tt>threadFactory</tt> are null.
506 >     */
507 >    public ThreadPoolExecutor(int corePoolSize,
508 >                              int maximumPoolSize,
509 >                              long keepAliveTime,
510 >                              TimeUnit unit,
511 >                              BlockingQueue<Runnable> workQueue,
512 >                              ThreadFactory threadFactory) {
513 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
514 >             threadFactory, defaultHandler);
515 >    }
516 >
517 >    /**
518 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
519 >     * parameters and default thread factory.
520 >     *
521 >     * @param corePoolSize the number of threads to keep in the
522 >     * pool, even if they are idle.
523 >     * @param maximumPoolSize the maximum number of threads to allow in the
524 >     * pool.
525 >     * @param keepAliveTime when the number of threads is greater than
526 >     * the core, this is the maximum time that excess idle threads
527 >     * will wait for new tasks before terminating.
528 >     * @param unit the time unit for the keepAliveTime
529 >     * argument.
530 >     * @param workQueue the queue to use for holding tasks before they
531 >     * are executed. This queue will hold only the <tt>Runnable</tt>
532 >     * tasks submitted by the <tt>execute</tt> method.
533 >     * @param handler the handler to use when execution is blocked
534 >     * because the thread bounds and queue capacities are reached.
535 >     * @throws IllegalArgumentException if corePoolSize, or
536 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
537 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
538 >     * @throws NullPointerException if <tt>workQueue</tt>
539 >     * or <tt>handler</tt> are null.
540 >     */
541 >    public ThreadPoolExecutor(int corePoolSize,
542 >                              int maximumPoolSize,
543 >                              long keepAliveTime,
544 >                              TimeUnit unit,
545 >                              BlockingQueue<Runnable> workQueue,
546 >                              RejectedExecutionHandler handler) {
547 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
548 >             Executors.defaultThreadFactory(), handler);
549 >    }
550 >
551 >    /**
552 >     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
553 >     * parameters.
554 >     *
555 >     * @param corePoolSize the number of threads to keep in the
556 >     * pool, even if they are idle.
557 >     * @param maximumPoolSize the maximum number of threads to allow in the
558 >     * pool.
559 >     * @param keepAliveTime when the number of threads is greater than
560 >     * the core, this is the maximum time that excess idle threads
561 >     * will wait for new tasks before terminating.
562 >     * @param unit the time unit for the keepAliveTime
563 >     * argument.
564 >     * @param workQueue the queue to use for holding tasks before they
565 >     * are executed. This queue will hold only the <tt>Runnable</tt>
566 >     * tasks submitted by the <tt>execute</tt> method.
567 >     * @param threadFactory the factory to use when the executor
568 >     * creates a new thread.
569 >     * @param handler the handler to use when execution is blocked
570 >     * because the thread bounds and queue capacities are reached.
571 >     * @throws IllegalArgumentException if corePoolSize, or
572 >     * keepAliveTime less than zero, or if maximumPoolSize less than or
573 >     * equal to zero, or if corePoolSize greater than maximumPoolSize.
574 >     * @throws NullPointerException if <tt>workQueue</tt>
575 >     * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
576 >     */
577 >    public ThreadPoolExecutor(int corePoolSize,
578 >                              int maximumPoolSize,
579 >                              long keepAliveTime,
580 >                              TimeUnit unit,
581 >                              BlockingQueue<Runnable> workQueue,
582 >                              ThreadFactory threadFactory,
583 >                              RejectedExecutionHandler handler) {
584 >        if (corePoolSize < 0 ||
585 >            maximumPoolSize <= 0 ||
586 >            maximumPoolSize < corePoolSize ||
587 >            keepAliveTime < 0)
588 >            throw new IllegalArgumentException();
589 >        if (workQueue == null || threadFactory == null || handler == null)
590 >            throw new NullPointerException();
591 >        this.corePoolSize = corePoolSize;
592 >        this.maximumPoolSize = maximumPoolSize;
593 >        this.workQueue = workQueue;
594 >        this.keepAliveTime = unit.toNanos(keepAliveTime);
595 >        this.threadFactory = threadFactory;
596 >        this.handler = handler;
597 >    }
598 >
599 >    /*
600 >     * Support for execute().
601 >     *
602 >     * Method execute() and its helper methods handle the various
603 >     * cases encountered when new tasks are submitted.  The main
604 >     * execute() method proceeds in 3 steps:
605 >     *
606 >     * 1. If it appears that fewer than corePoolSize threads are
607 >     * running, try to start a new thread with the given command as
608 >     * its first task.  The check here errs on the side of caution.
609 >     * The call to addIfUnderCorePoolSize rechecks runState and pool
610 >     * size under lock (they change only under lock) so prevents false
611 >     * alarms that would add threads when it shouldn't, but may also
612 >     * fail to add them when they should. This is compensated within
613 >     * the following steps.
614 >     *
615 >     * 2. If a task can be successfully queued, then we are done, but
616 >     * still need to compensate for missing the fact that we should
617 >     * have added a thread (because existing ones died) or that
618 >     * shutdown occured since entry into this method. So we recheck
619 >     * state to and if necessary (in ensureQueuedTaskHandled) roll
620 >     * back the enqueuing if shut down, or start a new thread if there
621 >     * are none.
622 >     *
623 >     * 3. If we cannot queue task, then we try to add a new
624 >     * thread. There's no guesswork here (addIfUnderMaximumPoolSize)
625 >     * since it is performed under lock.  If it fails, we know we are
626 >     * shut down or saturated.
627 >     *
628 >     * The reason for taking this overall approach is to normally
629 >     * avoid holding mainLock during this method, which would be a
630 >     * serious scalability bottleneck.  After warmup, almost all calls
631 >     * take step 2 in a way that entails no locking.
632 >     */
633 >
634 >    /**
635 >     * Executes the given task sometime in the future.  The task
636 >     * may execute in a new thread or in an existing pooled thread.
637 >     *
638 >     * If the task cannot be submitted for execution, either because this
639 >     * executor has been shutdown or because its capacity has been reached,
640 >     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
641 >     *
642 >     * @param command the task to execute
643 >     * @throws RejectedExecutionException at discretion of
644 >     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
645 >     * for execution
646 >     * @throws NullPointerException if command is null
647 >     */
648 >    public void execute(Runnable command) {
649 >        if (command == null)
650 >            throw new NullPointerException();
651 >        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
652 >            if (runState == RUNNING && workQueue.offer(command)) {
653 >                if (runState != RUNNING || poolSize == 0)
654 >                    ensureQueuedTaskHandled(command);
655 >            }
656 >            else if (!addIfUnderMaximumPoolSize(command))
657 >                reject(command); // is shutdown or saturated
658 >        }
659      }
660  
661      /**
662       * Creates and returns a new thread running firstTask as its first
663 <     * task. Call only while holding mainLock.
663 >     * task. Call only while holding mainLock.
664 >     *
665       * @param firstTask the task the new thread should run first (or
666       * null if none)
667       * @return the new thread, or null if threadFactory fails to create thread
# Line 461 | Line 681 | public class ThreadPoolExecutor extends
681  
682      /**
683       * Creates and starts a new thread running firstTask as its first
684 <     * task, only if fewer than corePoolSize threads are running.
684 >     * task, only if fewer than corePoolSize threads are running
685 >     * and the pool is not shut down.
686       * @param firstTask the task the new thread should run first (or
687       * null if none)
688       * @return true if successful.
# Line 483 | Line 704 | public class ThreadPoolExecutor extends
704      }
705  
706      /**
707 <     * Creates and starts a new thread only if fewer than maximumPoolSize
708 <     * threads are running.  The new thread runs as its first task the
709 <     * next task in queue, or if there is none, the given task.
707 >     * Creates and starts a new thread running firstTask as its first
708 >     * task, only if fewer than maximumPoolSize threads are running
709 >     * and pool is not shut down.
710       * @param firstTask the task the new thread should run first (or
711       * null if none)
712 <     * @return 0 if a new thread cannot be created, a positive number
492 <     * if firstTask will be run in a new thread, or a negative number
493 <     * if a new thread was created but is running some other task, in
494 <     * which case the caller must try some other way to run firstTask
495 <     * (perhaps by calling this method again).
712 >     * @return true if successful.
713       */
714 <    private int addIfUnderMaximumPoolSize(Runnable firstTask) {
714 >    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
715          Thread t = null;
499        int status = 0;
716          final ReentrantLock mainLock = this.mainLock;
717          mainLock.lock();
718          try {
719 <            if (poolSize < maximumPoolSize && runState == RUNNING) {
720 <                Runnable next = workQueue.poll();
505 <                if (next == null) {
506 <                    next = firstTask;
507 <                    status = 1;
508 <                } else
509 <                    status = -1;
510 <                t = addThread(next);
511 <            }
719 >            if (poolSize < maximumPoolSize && runState == RUNNING)
720 >                t = addThread(firstTask);
721          } finally {
722              mainLock.unlock();
723          }
724          if (t == null)
725 <            return 0;
725 >            return false;
726          t.start();
727 <        return status;
519 <    }
520 <
521 <    /**
522 <     * Gets the next task for a worker thread to run.
523 <     * @return the task
524 <     */
525 <    Runnable getTask() {
526 <        for (;;) {
527 <            try {
528 <                switch (runState) {
529 <                case RUNNING: {
530 <                    // untimed wait if core and not allowing core timeout
531 <                    if (poolSize <= corePoolSize && !allowCoreThreadTimeOut)
532 <                        return workQueue.take();
533 <
534 <                    long timeout = keepAliveTime;
535 <                    if (timeout <= 0) // die immediately for 0 timeout
536 <                        return null;
537 <                    Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
538 <                    if (r != null)
539 <                        return r;
540 <                    if (poolSize > corePoolSize || allowCoreThreadTimeOut)
541 <                        return null; // timed out
542 <                    // Else, after timeout, the pool shrank. Retry
543 <                    break;
544 <                }
545 <
546 <                case SHUTDOWN: {
547 <                    // Help drain queue
548 <                    Runnable r = workQueue.poll();
549 <                    if (r != null)
550 <                        return r;
551 <
552 <                    // Check if can terminate
553 <                    if (workQueue.isEmpty()) {
554 <                        interruptIdleWorkers();
555 <                        return null;
556 <                    }
557 <
558 <                    // Else there could still be delayed tasks in queue.
559 <                    return workQueue.take();
560 <                }
561 <
562 <                default: // stopping/stopped
563 <                    return null;
564 <                }
565 <            } catch (InterruptedException ie) {
566 <                // On interruption, re-check runstate
567 <            }
568 <        }
727 >        return true;
728      }
729  
571
730      /**
731 <     * Rejects a task that was queued concurrently with a call to
732 <     * shutdownNow. If still present in the queue, this task must be
733 <     * removed and rejected to preserve shutdownNow guarantees.
731 >     * Rechecks state after queuing a task. Called from execute when
732 >     * pool state has been observed to change after queuing a task. If
733 >     * the task was queued concurrently with a call to shutdownNow,
734 >     * and is still present in the queue, this task must be removed
735 >     * and rejected to preserve shutdownNow guarantees.  Otherwise,
736 >     * this method ensures (unless addThread fails) that there is at
737 >     * least one live thread to handle this task
738       * @param command the task
739       */
740 <    private void rejectIfQueued(Runnable command) {
740 >    private void ensureQueuedTaskHandled(Runnable command) {
741          final ReentrantLock mainLock = this.mainLock;
742          mainLock.lock();
743 <        boolean present;
743 >        boolean reject = false;
744 >        Thread t = null;
745          try {
746 <            present = workQueue.remove(command);
746 >            int state = runState;
747 >            if (state != RUNNING && workQueue.remove(command))
748 >                reject = true;
749 >            else if (state < STOP &&
750 >                     poolSize < Math.max(corePoolSize, 1) &&
751 >                     !workQueue.isEmpty())
752 >                t = addThread(null);
753          } finally {
754              mainLock.unlock();
755          }
756 <        if (present)
756 >        if (reject)
757              reject(command);
758 +        else if (t != null)
759 +            t.start();
760      }
761  
762      /**
763 <     * Wakes up all threads that might be waiting for tasks.
763 >     * Invokes the rejected execution handler for the given command.
764       */
765 <    void interruptIdleWorkers() {
766 <        final ReentrantLock mainLock = this.mainLock;
596 <        mainLock.lock();
597 <        try {
598 <            for (Worker w : workers)
599 <                w.interruptIfIdle();
600 <        } finally {
601 <            mainLock.unlock();
602 <        }
765 >    void reject(Runnable command) {
766 >        handler.rejectedExecution(command, this);
767      }
768  
605    /**
606     * Performs bookkeeping for a terminated worker thread.
607     * @param w the worker
608     */
609    void workerDone(Worker w) {
610        final ReentrantLock mainLock = this.mainLock;
611        mainLock.lock();
612        try {
613            completedTaskCount += w.completedTasks;
614            workers.remove(w);
615            if (--poolSize == 0) { // Deal with potential shutdown.
616                int state = runState;
617                // If not stopping and there are queued tasks but no
618                // threads, create replacement thread. We must create
619                // it initially idle to avoid orphaned tasks in case
620                // addThread fails.  This also handles case of delayed
621                // tasks that will sometime later become runnable.
622                if (state < STOP && !workQueue.isEmpty()) {
623                    Thread t = addThread(null);
624                    if (t != null)
625                        t.start();
626                    state = RUNNING; // to cause termination check to fail
627                }
628                if (state == STOP || state == SHUTDOWN) { // can terminate
629                    runState = TERMINATED;
630                    termination.signalAll();
631                    terminated();
632                }
633            }
634        } finally {
635            mainLock.unlock();
636        }
637    }
769  
770      /**
771 <     *  Worker threads
771 >     * Worker threads.
772 >     *
773 >     * Worker threads can start out life either with an initial first
774 >     * task, oo without one. Normally, they are started with a first
775 >     * task. This enables execute(), etc to bypass queuing when there
776 >     * are fewer than corePoolSize threads (in which case we always
777 >     * start one), or when the queue is full.(in which case we must
778 >     * bypass queue.) Initially idle threads are created either by
779 >     * users (prestartCoreThread and setCorePoolSize) or when methods
780 >     * ensureQueuedTaskHandled and tryTerminate notice that the queue
781 >     * is not empty but there are no active threads to handle them.
782 >     *
783 >     * After completing a task, workers try to get another one,
784 >     * via method getTask.
785 >     *
786 >     * When starting to run a task, unless the pool is stopped, each
787 >     * worker thread ensures that it is not interrupted, and uses
788 >     * runLock to prevent the pool from interrupting it in the midst
789 >     * of execution. This shields user tasks from any interrupts that
790 >     * may otherwise be needed during shutdown (see method
791 >     * interruptIdleWorkers), unless the pool is stopping (via
792 >     * shutdownNow) in which case interrupts are let through to affect
793 >     * both tasks and workers. However, this shielding does not
794 >     * necessarily protect the workers from lagging interrupts from
795 >     * other user threads directed towards tasks that have already
796 >     * been completed. Thus, a worker thread may be interrupted
797 >     * needlessly (for example in getTask), in which case it rechecks
798 >     * pool state to see it it should exit.
799 >     *
800       */
801      private final class Worker implements Runnable {
643
802          /**
803           * The runLock is acquired and released surrounding each task
804           * execution. It mainly protects against interrupts that are
# Line 650 | Line 808 | public class ThreadPoolExecutor extends
808          private final ReentrantLock runLock = new ReentrantLock();
809  
810          /**
811 <         * Initial task to run before entering run loop
811 >         * Initial task to run before entering run loop. Possibly null.
812           */
813          private Runnable firstTask;
814  
# Line 702 | Line 860 | public class ThreadPoolExecutor extends
860              final ReentrantLock runLock = this.runLock;
861              runLock.lock();
862              try {
863 <                // If not shutting down then clear an outstanding interrupt.
863 >                /*
864 >                 * Ensure that unless pool is stopping, this thread
865 >                 * does not have its interrupt set. This requires a
866 >                 * double-check of state in case the interrupt was
867 >                 * cleared concurrently with a shutdownNow -- if so,
868 >                 * the interrupt is re-enabled.
869 >                 */
870                  if (runState < STOP &&
871                      Thread.interrupted() &&
872 <                    runState >= STOP) // Re-interrupt if stopped after clearing
872 >                    runState >= STOP)
873                      thread.interrupt();
874 +                /*
875 +                 * Track execution state to ensure that afterExecute
876 +                 * is called only if task completed or threw
877 +                 * exception. Otherwise, the caught runtime exception
878 +                 * will have been thrown by afterExecute itself, in
879 +                 * which case we don't want to call it again.
880 +                 */
881                  boolean ran = false;
882                  beforeExecute(thread, task);
883                  try {
# Line 717 | Line 888 | public class ThreadPoolExecutor extends
888                  } catch (RuntimeException ex) {
889                      if (!ran)
890                          afterExecute(task, ex);
720                    // Else the exception occurred within
721                    // afterExecute itself in which case we don't
722                    // want to call it again.
891                      throw ex;
892                  }
893              } finally {
# Line 744 | Line 912 | public class ThreadPoolExecutor extends
912          }
913      }
914  
915 <    // Public methods
915 >    /* Utilities for worker thread control */
916  
917      /**
918 <     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
919 <     * parameters and default thread factory and rejected execution handler.
920 <     * It may be more convenient to use one of the {@link Executors} factory
921 <     * methods instead of this general purpose constructor.
918 >     * Gets the next task for a worker thread to run.  The general
919 >     * approach is similar to execute() in that worker threads trying
920 >     * to get a task to run do so on the basis of prevailing state
921 >     * accessed outside of locks.  This may cause them to choose the
922 >     * "wrong" action, such as or trying to exit because no tasks
923 >     * appear to a available, or entering a take when the pool is in
924 >     * the process of being shut down. These potential problems are
925 >     * countered by (1) rechecking pool state (in workerCanExit)
926 >     * before giving up, and (2) interrupting other workers upon
927 >     * shutdown, so they can recheck state. All other user-based state
928 >     * changes (to allowCoreThreadTimeOut etc) are OK even when
929 >     * perfromed asynchronously wrt getTask.
930       *
931 <     * @param corePoolSize the number of threads to keep in the
756 <     * pool, even if they are idle.
757 <     * @param maximumPoolSize the maximum number of threads to allow in the
758 <     * pool.
759 <     * @param keepAliveTime when the number of threads is greater than
760 <     * the core, this is the maximum time that excess idle threads
761 <     * will wait for new tasks before terminating.
762 <     * @param unit the time unit for the keepAliveTime
763 <     * argument.
764 <     * @param workQueue the queue to use for holding tasks before they
765 <     * are executed. This queue will hold only the <tt>Runnable</tt>
766 <     * tasks submitted by the <tt>execute</tt> method.
767 <     * @throws IllegalArgumentException if corePoolSize, or
768 <     * keepAliveTime less than zero, or if maximumPoolSize less than or
769 <     * equal to zero, or if corePoolSize greater than maximumPoolSize.
770 <     * @throws NullPointerException if <tt>workQueue</tt> is null
931 >     * @return the task
932       */
933 <    public ThreadPoolExecutor(int corePoolSize,
934 <                              int maximumPoolSize,
935 <                              long keepAliveTime,
936 <                              TimeUnit unit,
937 <                              BlockingQueue<Runnable> workQueue) {
938 <        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
939 <             Executors.defaultThreadFactory(), defaultHandler);
933 >    Runnable getTask() {
934 >        for (;;) {
935 >            try {
936 >                int state = runState;
937 >                if (state > SHUTDOWN)
938 >                    return null;
939 >                Runnable r;
940 >                if (state == SHUTDOWN)  // Help drain queue
941 >                    r = workQueue.poll();
942 >                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
943 >                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
944 >                else
945 >                    r = workQueue.take();
946 >                if (r != null)
947 >                    return r;
948 >                if (workerCanExit()) {
949 >                    if (runState >= SHUTDOWN) // Wake up others
950 >                        interruptIdleWorkers();
951 >                    return null;
952 >                }
953 >                // Else retry
954 >            } catch (InterruptedException ie) {
955 >                // On interruption, re-check runstate
956 >            }
957 >        }
958      }
959  
960      /**
961 <     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
962 <     * parameters and default rejected execution handler.
963 <     *
964 <     * @param corePoolSize the number of threads to keep in the
965 <     * pool, even if they are idle.
787 <     * @param maximumPoolSize the maximum number of threads to allow in the
788 <     * pool.
789 <     * @param keepAliveTime when the number of threads is greater than
790 <     * the core, this is the maximum time that excess idle threads
791 <     * will wait for new tasks before terminating.
792 <     * @param unit the time unit for the keepAliveTime
793 <     * argument.
794 <     * @param workQueue the queue to use for holding tasks before they
795 <     * are executed. This queue will hold only the <tt>Runnable</tt>
796 <     * tasks submitted by the <tt>execute</tt> method.
797 <     * @param threadFactory the factory to use when the executor
798 <     * creates a new thread.
799 <     * @throws IllegalArgumentException if corePoolSize, or
800 <     * keepAliveTime less than zero, or if maximumPoolSize less than or
801 <     * equal to zero, or if corePoolSize greater than maximumPoolSize.
802 <     * @throws NullPointerException if <tt>workQueue</tt>
803 <     * or <tt>threadFactory</tt> are null.
961 >     * Check whether a worker thread that fails to get a task can
962 >     * exit.  We allow a worker thread to die if the pool is stopping,
963 >     * or the queue is empty, or there is at least one thread to
964 >     * handle possibly non-empty queue, even if core timeouts are
965 >     * allowed.
966       */
967 <    public ThreadPoolExecutor(int corePoolSize,
968 <                              int maximumPoolSize,
969 <                              long keepAliveTime,
970 <                              TimeUnit unit,
971 <                              BlockingQueue<Runnable> workQueue,
972 <                              ThreadFactory threadFactory) {
973 <        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
974 <             threadFactory, defaultHandler);
967 >    private boolean workerCanExit() {
968 >        final ReentrantLock mainLock = this.mainLock;
969 >        mainLock.lock();
970 >        boolean canExit;
971 >        try {
972 >            canExit = runState >= STOP ||
973 >                workQueue.isEmpty() ||
974 >                (allowCoreThreadTimeOut &&
975 >                 poolSize > Math.max(1, corePoolSize));
976 >        } finally {
977 >            mainLock.unlock();
978 >        }
979 >        return canExit;
980      }
981  
982      /**
983 <     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
984 <     * parameters and default thread factory.
985 <     *
819 <     * @param corePoolSize the number of threads to keep in the
820 <     * pool, even if they are idle.
821 <     * @param maximumPoolSize the maximum number of threads to allow in the
822 <     * pool.
823 <     * @param keepAliveTime when the number of threads is greater than
824 <     * the core, this is the maximum time that excess idle threads
825 <     * will wait for new tasks before terminating.
826 <     * @param unit the time unit for the keepAliveTime
827 <     * argument.
828 <     * @param workQueue the queue to use for holding tasks before they
829 <     * are executed. This queue will hold only the <tt>Runnable</tt>
830 <     * tasks submitted by the <tt>execute</tt> method.
831 <     * @param handler the handler to use when execution is blocked
832 <     * because the thread bounds and queue capacities are reached.
833 <     * @throws IllegalArgumentException if corePoolSize, or
834 <     * keepAliveTime less than zero, or if maximumPoolSize less than or
835 <     * equal to zero, or if corePoolSize greater than maximumPoolSize.
836 <     * @throws NullPointerException if <tt>workQueue</tt>
837 <     * or <tt>handler</tt> are null.
983 >     * Wakes up all threads that might be waiting for tasks so they
984 >     * can check for termination. Note: this method is also called by
985 >     * ScheduledThreadPoolExecutor.
986       */
987 <    public ThreadPoolExecutor(int corePoolSize,
988 <                              int maximumPoolSize,
989 <                              long keepAliveTime,
990 <                              TimeUnit unit,
991 <                              BlockingQueue<Runnable> workQueue,
992 <                              RejectedExecutionHandler handler) {
993 <        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
994 <             Executors.defaultThreadFactory(), handler);
987 >    void interruptIdleWorkers() {
988 >        final ReentrantLock mainLock = this.mainLock;
989 >        mainLock.lock();
990 >        try {
991 >            for (Worker w : workers)
992 >                w.interruptIfIdle();
993 >        } finally {
994 >            mainLock.unlock();
995 >        }
996      }
997  
998      /**
999 <     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
1000 <     * parameters.
852 <     *
853 <     * @param corePoolSize the number of threads to keep in the
854 <     * pool, even if they are idle.
855 <     * @param maximumPoolSize the maximum number of threads to allow in the
856 <     * pool.
857 <     * @param keepAliveTime when the number of threads is greater than
858 <     * the core, this is the maximum time that excess idle threads
859 <     * will wait for new tasks before terminating.
860 <     * @param unit the time unit for the keepAliveTime
861 <     * argument.
862 <     * @param workQueue the queue to use for holding tasks before they
863 <     * are executed. This queue will hold only the <tt>Runnable</tt>
864 <     * tasks submitted by the <tt>execute</tt> method.
865 <     * @param threadFactory the factory to use when the executor
866 <     * creates a new thread.
867 <     * @param handler the handler to use when execution is blocked
868 <     * because the thread bounds and queue capacities are reached.
869 <     * @throws IllegalArgumentException if corePoolSize, or
870 <     * keepAliveTime less than zero, or if maximumPoolSize less than or
871 <     * equal to zero, or if corePoolSize greater than maximumPoolSize.
872 <     * @throws NullPointerException if <tt>workQueue</tt>
873 <     * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
999 >     * Performs bookkeeping for an exiting worker thread.
1000 >     * @param w the worker
1001       */
1002 <    public ThreadPoolExecutor(int corePoolSize,
1003 <                              int maximumPoolSize,
1004 <                              long keepAliveTime,
1005 <                              TimeUnit unit,
1006 <                              BlockingQueue<Runnable> workQueue,
1007 <                              ThreadFactory threadFactory,
1008 <                              RejectedExecutionHandler handler) {
1009 <        if (corePoolSize < 0 ||
1010 <            maximumPoolSize <= 0 ||
1011 <            maximumPoolSize < corePoolSize ||
1012 <            keepAliveTime < 0)
886 <            throw new IllegalArgumentException();
887 <        if (workQueue == null || threadFactory == null || handler == null)
888 <            throw new NullPointerException();
889 <        this.corePoolSize = corePoolSize;
890 <        this.maximumPoolSize = maximumPoolSize;
891 <        this.workQueue = workQueue;
892 <        this.keepAliveTime = unit.toNanos(keepAliveTime);
893 <        this.threadFactory = threadFactory;
894 <        this.handler = handler;
1002 >    void workerDone(Worker w) {
1003 >        final ReentrantLock mainLock = this.mainLock;
1004 >        mainLock.lock();
1005 >        try {
1006 >            completedTaskCount += w.completedTasks;
1007 >            workers.remove(w);
1008 >            if (--poolSize == 0)
1009 >                tryTerminate();
1010 >        } finally {
1011 >            mainLock.unlock();
1012 >        }
1013      }
1014  
1015 +    /* Termination support. */
1016 +
1017      /**
1018 <     * Executes the given task sometime in the future.  The task
1019 <     * may execute in a new thread or in an existing pooled thread.
1020 <     *
1021 <     * If the task cannot be submitted for execution, either because this
902 <     * executor has been shutdown or because its capacity has been reached,
903 <     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
1018 >     * Transitions to TERMINATED state if either (SHUTDOWN and pool
1019 >     * and queue empty) or (STOP and pool empty), otherwisem unless
1020 >     * stopped, ensuring that there is at least one live thread to
1021 >     * handle queued tasks.
1022       *
1023 <     * @param command the task to execute
1024 <     * @throws RejectedExecutionException at discretion of
1025 <     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
1026 <     * for execution
909 <     * @throws NullPointerException if command is null
1023 >     * This method is called from the three places in which
1024 >     * termination can occur: in workerDone on exit of the last thread
1025 >     * after pool has been shut down, or directly within calls to
1026 >     * shutdown or shutdownNow, if there are no live threads,
1027       */
1028 <    public void execute(Runnable command) {
1029 <        if (command == null)
1030 <            throw new NullPointerException();
1031 <        while (runState == RUNNING) {
1032 <            if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
1033 <                return;
1034 <            if (workQueue.offer(command)) {// recheck state after queuing
1035 <                if (runState != RUNNING)
1036 <                    rejectIfQueued(command);
1037 <                else if (poolSize < corePoolSize)
1038 <                    addIfUnderCorePoolSize(null);
1039 <                return;
1028 >    private void tryTerminate() {
1029 >        if (poolSize == 0) {
1030 >            int state = runState;
1031 >            if (state < STOP && !workQueue.isEmpty()) {
1032 >                state = RUNNING; // disable termination check below
1033 >                Thread t = addThread(null);
1034 >                if (t != null)
1035 >                    t.start();
1036 >            }
1037 >            if (state == STOP || state == SHUTDOWN) {
1038 >                runState = TERMINATED;
1039 >                termination.signalAll();
1040 >                terminated();
1041              }
924            int status = addIfUnderMaximumPoolSize(command);
925            if (status > 0)   // Created new thread to handle task
926                return;
927            if (status == 0)  // Cannot create thread
928                break;
929            // Retry if created thread but it is busy with another task
1042          }
931
932        reject(command); // is shutdown or can't create thread or queue task
933        return;
1043      }
1044  
1045      /**
# Line 945 | Line 1054 | public class ThreadPoolExecutor extends
1054       * or the security manager's <tt>checkAccess</tt> method denies access.
1055       */
1056      public void shutdown() {
1057 <        // Fail if caller doesn't have modifyThread permission.
1057 >        /*
1058 >         * Conceptually, shutdown is just a matter of changing the
1059 >         * runsState to SHUTDOWN, and then interrupting any worker
1060 >         * threads that might be blocked in getTask() to wake them up
1061 >         * so they can exit. Then, if there happen not to be any
1062 >         * threads or tasks, we can directly terminate pool via
1063 >         * tryTerminate.
1064 >         *
1065 >         * But this is made more delicate because we must cooperate
1066 >         * with the security manager (if present), which may implement
1067 >         * policies that make more sense for operations on Threads
1068 >         * than they do for ThreadPools. This requires 3 steps:
1069 >         *
1070 >         * 1. Making sure caller has permission to shut down threads
1071 >         * in general (see shutdownPerm).
1072 >         *
1073 >         * 2. If (1) passes, making sure the caller is allowed to
1074 >         * modify each of our threads. This might not be true even if
1075 >         * first check passed, if the SecurityManager treats some
1076 >         * threads specially. If this check passes, then we can try
1077 >         * to set runState.
1078 >         *
1079 >         * 3. If both (1) and (2) pass, dealing with inconsistent
1080 >         * security managers that allow checkAccess but then throw a
1081 >         * SecurityException when interrupt() is invoked.  In this
1082 >         * third case, because we have already set runState, we can
1083 >         * only try to back out from the shutdown.as cleanly as
1084 >         * possible. Some threads may have been killed but we remain
1085 >         * in non-shutdown state (which may entail tryTerminate
1086 >         * starting a thread to maintain liveness.)
1087 >         */
1088 >
1089          SecurityManager security = System.getSecurityManager();
1090          if (security != null)
1091              security.checkPermission(shutdownPerm);
# Line 953 | Line 1093 | public class ThreadPoolExecutor extends
1093          final ReentrantLock mainLock = this.mainLock;
1094          mainLock.lock();
1095          try {
1096 <            if (security != null) {
957 <                // Check if caller can modify worker threads.  This
958 <                // might not be true even if passed above check, if
959 <                // the SecurityManager treats some threads specially.
1096 >            if (security != null) { // Check if caller can modify our threads
1097                  for (Worker w: workers)
1098                      security.checkAccess(w.thread);
1099              }
1100  
1101              int state = runState;
1102 <            if (state == RUNNING) // don't override shutdownNow
1102 >            if (state < SHUTDOWN)
1103                  runState = SHUTDOWN;
967            int nworkers = 0;
1104  
1105              try {
1106                  for (Worker w: workers) {
1107                      w.interruptIfIdle();
972                    ++nworkers;
1108                  }
1109 <            } catch (SecurityException se) {
975 <                // If SecurityManager allows above checks, but
976 <                // then unexpectedly throws exception when
977 <                // interrupting threads (which it ought not do),
978 <                // back out as cleanly as we can. Some threads may
979 <                // have been killed but we remain in non-shutdown
980 <                // state.
1109 >            } catch (SecurityException se) { // Try to back out
1110                  runState = state;
1111 +                tryTerminate();
1112                  throw se;
1113              }
1114  
1115 <            // If no live workers, act on one's behalf to terminate
986 <            if (nworkers == 0 && state != TERMINATED) {
987 <                runState = TERMINATED;
988 <                termination.signalAll();
989 <                terminated();
990 <            }
1115 >            tryTerminate(); // Terminate now if pool and queue empty
1116          } finally {
1117              mainLock.unlock();
1118          }
1119      }
1120  
996
1121      /**
1122       * Attempts to stop all actively executing tasks, halts the
1123       * processing of waiting tasks, and returns a list of the tasks
# Line 1013 | Line 1137 | public class ThreadPoolExecutor extends
1137       * or the security manager's <tt>checkAccess</tt> method denies access.
1138       */
1139      public List<Runnable> shutdownNow() {
1140 <        // Almost the same code as shutdown()
1140 >        /*
1141 >         * shutdownNow differs from shutdown only in that
1142 >         * (1) runState is set to STOP, (2) All worker threads
1143 >         * are interrupted, not just the idle ones, and (3)
1144 >         * the queue is drained and returned.
1145 >         */
1146          SecurityManager security = System.getSecurityManager();
1147          if (security != null)
1148              security.checkPermission(shutdownPerm);
# Line 1021 | Line 1150 | public class ThreadPoolExecutor extends
1150          final ReentrantLock mainLock = this.mainLock;
1151          mainLock.lock();
1152          try {
1153 <            if (security != null) {
1153 >            if (security != null) { // Check if caller can modify our threads
1154                  for (Worker w: workers)
1155                      security.checkAccess(w.thread);
1156              }
1157  
1158              int state = runState;
1159 <            if (state != TERMINATED)
1159 >            if (state < STOP)
1160                  runState = STOP;
1161 <            int nworkers = 0;
1161 >
1162              try {
1163                  for (Worker w : workers) {
1164                      w.interruptNow();
1036                    ++nworkers;
1165                  }
1166 <            } catch (SecurityException se) {
1167 <                runState = state; // back out;
1166 >            } catch (SecurityException se) { // Try to back out
1167 >                runState = state;
1168 >                tryTerminate();
1169                  throw se;
1170              }
1171  
1172 <            if (nworkers == 0 && state != TERMINATED) {
1173 <                runState = TERMINATED;
1174 <                termination.signalAll();
1046 <                terminated();
1047 <            }
1048 <
1049 <            List<Runnable> taskList = new ArrayList<Runnable>();
1050 <            workQueue.drainTo(taskList);
1051 <            return taskList;
1172 >            List<Runnable> tasks = drainQueue();
1173 >            tryTerminate(); // Terminate now if pool and queue empty
1174 >            return tasks;
1175          } finally {
1176              mainLock.unlock();
1177          }
1178      }
1179  
1180 +    /**
1181 +     * Drains the task queue into a new list. Used by shutdownNow.
1182 +     * Call only while holding main lock.
1183 +     */
1184 +    private  List<Runnable> drainQueue() {
1185 +        List<Runnable> taskList = new ArrayList<Runnable>();
1186 +        workQueue.drainTo(taskList);
1187 +        /*
1188 +         * If the queue is a DelayQueue or any other kind of queue
1189 +         * for which poll or drainTo may fail to remove some elements,
1190 +         * we need to manually traverse and remove remaining tasks.
1191 +         * To guarantee atomicity wrt other threads using this queue,
1192 +         * we need to create a new iterator for each element removed.
1193 +         */
1194 +        while (!workQueue.isEmpty()) {
1195 +            Iterator<Runnable> it = workQueue.iterator();
1196 +            try {
1197 +                if (it.hasNext()) {
1198 +                    Runnable r = it.next();
1199 +                    if (workQueue.remove(r))
1200 +                        taskList.add(r);
1201 +                }
1202 +            } catch(ConcurrentModificationException ignore) {
1203 +            }
1204 +        }
1205 +        return taskList;
1206 +    }
1207 +
1208      public boolean isShutdown() {
1209          return runState != RUNNING;
1210      }
# Line 1102 | Line 1253 | public class ThreadPoolExecutor extends
1253          shutdown();
1254      }
1255  
1256 +    /* Getting and setting tunable parameters */
1257 +
1258      /**
1259       * Sets the thread factory used to create new threads.
1260       *
# Line 1149 | Line 1302 | public class ThreadPoolExecutor extends
1302      }
1303  
1304      /**
1152     * Returns the task queue used by this executor. Access to the
1153     * task queue is intended primarily for debugging and monitoring.
1154     * This queue may be in active use.  Retrieving the task queue
1155     * does not prevent queued tasks from executing.
1156     *
1157     * @return the task queue
1158     */
1159    public BlockingQueue<Runnable> getQueue() {
1160        return workQueue;
1161    }
1162
1163    /**
1164     * Removes this task from the executor's internal queue if it is
1165     * present, thus causing it not to be run if it has not already
1166     * started.
1167     *
1168     * <p> This method may be useful as one part of a cancellation
1169     * scheme.  It may fail to remove tasks that have been converted
1170     * into other forms before being placed on the internal queue. For
1171     * example, a task entered using <tt>submit</tt> might be
1172     * converted into a form that maintains <tt>Future</tt> status.
1173     * However, in such cases, method {@link ThreadPoolExecutor#purge}
1174     * may be used to remove those Futures that have been cancelled.
1175     *
1176     * @param task the task to remove
1177     * @return true if the task was removed
1178     */
1179    public boolean remove(Runnable task) {
1180        return getQueue().remove(task);
1181    }
1182
1183
1184    /**
1185     * Tries to remove from the work queue all {@link Future}
1186     * tasks that have been cancelled. This method can be useful as a
1187     * storage reclamation operation, that has no other impact on
1188     * functionality. Cancelled tasks are never executed, but may
1189     * accumulate in work queues until worker threads can actively
1190     * remove them. Invoking this method instead tries to remove them now.
1191     * However, this method may fail to remove tasks in
1192     * the presence of interference by other threads.
1193     */
1194    public void purge() {
1195        // Fail if we encounter interference during traversal
1196        try {
1197            Iterator<Runnable> it = getQueue().iterator();
1198            while (it.hasNext()) {
1199                Runnable r = it.next();
1200                if (r instanceof Future<?>) {
1201                    Future<?> c = (Future<?>)r;
1202                    if (c.isCancelled())
1203                        it.remove();
1204                }
1205            }
1206        }
1207        catch (ConcurrentModificationException ex) {
1208            return;
1209        }
1210    }
1211
1212    /**
1305       * Sets the core number of threads.  This overrides any value set
1306       * in the constructor.  If the new value is smaller than the
1307       * current value, excess existing threads will be terminated when
# Line 1230 | Line 1322 | public class ThreadPoolExecutor extends
1322              int extra = this.corePoolSize - corePoolSize;
1323              this.corePoolSize = corePoolSize;
1324              if (extra < 0) {
1325 <                int n = workQueue.size();
1326 <                // We have to create initially-idle threads here
1235 <                // because we otherwise have no recourse about
1236 <                // what to do with a dequeued task if addThread fails.
1237 <                while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize &&
1238 <                       runState < STOP) {
1325 >                int n = workQueue.size(); // don't add more threads than tasks
1326 >                while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {
1327                      Thread t = addThread(null);
1328                      if (t != null)
1329                          t.start();
# Line 1244 | Line 1332 | public class ThreadPoolExecutor extends
1332                  }
1333              }
1334              else if (extra > 0 && poolSize > corePoolSize) {
1335 <                Iterator<Worker> it = workers.iterator();
1336 <                while (it.hasNext() &&
1337 <                       extra-- > 0 &&
1338 <                       poolSize > corePoolSize &&
1339 <                       workQueue.remainingCapacity() == 0)
1340 <                    it.next().interruptIfIdle();
1335 >                try {
1336 >                    Iterator<Worker> it = workers.iterator();
1337 >                    while (it.hasNext() &&
1338 >                           extra-- > 0 &&
1339 >                           poolSize > corePoolSize &&
1340 >                           workQueue.remainingCapacity() == 0)
1341 >                        it.next().interruptIfIdle();
1342 >                } catch(SecurityException ignore) {
1343 >                    // Not an error; it is OK if the threads can stay live
1344 >                }
1345              }
1346          } finally {
1347              mainLock.unlock();
# Line 1350 | Line 1442 | public class ThreadPoolExecutor extends
1442              int extra = this.maximumPoolSize - maximumPoolSize;
1443              this.maximumPoolSize = maximumPoolSize;
1444              if (extra > 0 && poolSize > maximumPoolSize) {
1445 <                Iterator<Worker> it = workers.iterator();
1446 <                while (it.hasNext() &&
1447 <                       extra > 0 &&
1448 <                       poolSize > maximumPoolSize) {
1449 <                    it.next().interruptIfIdle();
1450 <                    --extra;
1445 >                try {
1446 >                    Iterator<Worker> it = workers.iterator();
1447 >                    while (it.hasNext() &&
1448 >                           extra > 0 &&
1449 >                           poolSize > maximumPoolSize) {
1450 >                        it.next().interruptIfIdle();
1451 >                        --extra;
1452 >                    }
1453 >                } catch(SecurityException ignore) {
1454 >                    // Not an error; it is OK if the threads can stay live
1455                  }
1456              }
1457          } finally {
# Line 1407 | Line 1503 | public class ThreadPoolExecutor extends
1503          return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1504      }
1505  
1506 +    /* User-level queue utilities */
1507 +
1508 +    /**
1509 +     * Returns the task queue used by this executor. Access to the
1510 +     * task queue is intended primarily for debugging and monitoring.
1511 +     * This queue may be in active use.  Retrieving the task queue
1512 +     * does not prevent queued tasks from executing.
1513 +     *
1514 +     * @return the task queue
1515 +     */
1516 +    public BlockingQueue<Runnable> getQueue() {
1517 +        return workQueue;
1518 +    }
1519 +
1520 +    /**
1521 +     * Removes this task from the executor's internal queue if it is
1522 +     * present, thus causing it not to be run if it has not already
1523 +     * started.
1524 +     *
1525 +     * <p> This method may be useful as one part of a cancellation
1526 +     * scheme.  It may fail to remove tasks that have been converted
1527 +     * into other forms before being placed on the internal queue. For
1528 +     * example, a task entered using <tt>submit</tt> might be
1529 +     * converted into a form that maintains <tt>Future</tt> status.
1530 +     * However, in such cases, method {@link ThreadPoolExecutor#purge}
1531 +     * may be used to remove those Futures that have been cancelled.
1532 +     *
1533 +     * @param task the task to remove
1534 +     * @return true if the task was removed
1535 +     */
1536 +    public boolean remove(Runnable task) {
1537 +        return getQueue().remove(task);
1538 +    }
1539 +
1540 +    /**
1541 +     * Tries to remove from the work queue all {@link Future}
1542 +     * tasks that have been cancelled. This method can be useful as a
1543 +     * storage reclamation operation, that has no other impact on
1544 +     * functionality. Cancelled tasks are never executed, but may
1545 +     * accumulate in work queues until worker threads can actively
1546 +     * remove them. Invoking this method instead tries to remove them now.
1547 +     * However, this method may fail to remove tasks in
1548 +     * the presence of interference by other threads.
1549 +     */
1550 +    public void purge() {
1551 +        // Fail if we encounter interference during traversal
1552 +        try {
1553 +            Iterator<Runnable> it = getQueue().iterator();
1554 +            while (it.hasNext()) {
1555 +                Runnable r = it.next();
1556 +                if (r instanceof Future<?>) {
1557 +                    Future<?> c = (Future<?>)r;
1558 +                    if (c.isCancelled())
1559 +                        it.remove();
1560 +                }
1561 +            }
1562 +        }
1563 +        catch (ConcurrentModificationException ex) {
1564 +            return;
1565 +        }
1566 +    }
1567 +
1568      /* Statistics */
1569  
1570      /**
# Line 1502 | Line 1660 | public class ThreadPoolExecutor extends
1660          }
1661      }
1662  
1663 +    /* Extension hooks */
1664 +
1665      /**
1666       * Method invoked prior to executing the given Runnable in the
1667       * given thread.  This method is invoked by thread <tt>t</tt> that
# Line 1550 | Line 1710 | public class ThreadPoolExecutor extends
1710       */
1711      protected void terminated() { }
1712  
1713 +    /* Predefined RejectedExecutionHandlers */
1714 +
1715      /**
1716       * A handler for rejected tasks that runs the rejected task
1717       * directly in the calling thread of the <tt>execute</tt> method,

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines