ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.84 by jsr166, Tue Jun 13 00:26:59 2006 UTC vs.
Revision 1.85 by dl, Fri Jun 16 18:49:43 2006 UTC

# Line 290 | Line 290 | import java.util.*;
290   * @author Doug Lea
291   */
292   public class ThreadPoolExecutor extends AbstractExecutorService {
293 <    /**
294 <     * Only used to force toArray() to produce a Runnable[].
293 >
294 >    /*
295 >     * A TPE manages a largish set of control fields, mainly runState,
296 >     * poolSize, corePoolSize, maximumPoolSize.  In general, state
297 >     * changes only occur within mainLock regions, but nearly all
298 >     * fields are volatile, so can be read outside of locked
299 >     * regions. This enables the most performance-critical actions,
300 >     * such as enqueuing and dequeing tasks in workQueue, to normally
301 >     * proceed without holding this lock when they see that the state
302 >     * allows actions. This sometimes requires a form of double-check.
303 >     * For example when it appears that poolSize is less than
304 >     * corePoolSize, addIfUnderCorePoolSize is called, which checks
305 >     * sizes and runState under the lock before actually creating a
306 >     * new thread.
307 >     *
308 >     * The main lifecyle control is via runState, taking on values:
309 >     *   RUNNING:  Accept new tasks and process queued tasks
310 >     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
311 >     *   STOP:     Don't accept new tasks,  don't process queued tasks,
312 >     *             and interrupt in-progress tasks
313 >     *   TERMINATED: Same as stop, plus all threads have terminated
314 >     * with transitions:
315 >     *
316 >     * RUNNING -> SHUTDOWN
317 >     *    On invocation of shutdown() when pool or queue nonempty
318 >     * {RUNNING or SHUTDOWN}  -> STOP  
319 >     *    On invocation of shutdownNow() when pool or queue nonempty
320 >     * {SHUTDOWN or STOP} -> TERMINATED
321 >     *    When both queue and pool become empty
322 >     * RUNNING -> TERMINATED
323 >     *    On invocation of shutdown when both queue and pool empty
324 >     *    (This bypasses creating a new thread just to cause termination)
325 >     *
326       */
296    private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0];
327  
328      /**
329       * Permission for checking shutdown
# Line 307 | Line 337 | public class ThreadPoolExecutor extends
337      private final BlockingQueue<Runnable> workQueue;
338  
339      /**
340 <     * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
341 <     * workers set.
340 >     * Lock held on updates to poolSize, corePoolSize,
341 >     * maximumPoolSize, runState, and workers set.
342       */
343      private final ReentrantLock mainLock = new ReentrantLock();
344  
# Line 361 | Line 391 | public class ThreadPoolExecutor extends
391       */
392      volatile int runState;
393  
394 <    // Special values for runState
394 >    /*
395 >     * Special values for runState. The numerical order among values
396 >     * matters. The runState monotonically increases over time, but
397 >     * need not hit each state.
398 >     */
399      /** Normal, not-shutdown mode */
400      static final int RUNNING    = 0;
401      /** Controlled shutdown mode */
# Line 413 | Line 447 | public class ThreadPoolExecutor extends
447       * @return the new thread, or null if threadFactory fails to create thread
448       */
449      private Thread addThread(Runnable firstTask) {
416        if (runState == TERMINATED) // Don't create thread if terminated
417            return null;
450          Worker w = new Worker(firstTask);
451          Thread t = threadFactory.newThread(w);
452          if (t != null) {
# Line 439 | Line 471 | public class ThreadPoolExecutor extends
471          final ReentrantLock mainLock = this.mainLock;
472          mainLock.lock();
473          try {
474 <            if (poolSize < corePoolSize)
474 >            if (poolSize < corePoolSize && runState == RUNNING)
475                  t = addThread(firstTask);
476          } finally {
477              mainLock.unlock();
# Line 468 | Line 500 | public class ThreadPoolExecutor extends
500          final ReentrantLock mainLock = this.mainLock;
501          mainLock.lock();
502          try {
503 <            if (poolSize < maximumPoolSize) {
503 >            if (poolSize < maximumPoolSize && runState == RUNNING) {
504                  Runnable next = workQueue.poll();
505                  if (next == null) {
506                      next = firstTask;
# Line 486 | Line 518 | public class ThreadPoolExecutor extends
518          return status;
519      }
520  
489
521      /**
522       * Gets the next task for a worker thread to run.
523       * @return the task
# Line 528 | Line 559 | public class ThreadPoolExecutor extends
559                      return workQueue.take();
560                  }
561  
562 <                case STOP:
562 >                default: // stopping/stopped
563                      return null;
533                default:
534                    assert false;
564                  }
565              } catch (InterruptedException ie) {
566                  // On interruption, re-check runstate
# Line 539 | Line 568 | public class ThreadPoolExecutor extends
568          }
569      }
570  
571 +
572 +    /**
573 +     * Rejects a task that was queued concurrently with a call to
574 +     * shutdownNow. If still present in the queue, this task must be
575 +     * removed and rejected to preserve shutdownNow guarantees.
576 +     * @param command the task
577 +     */
578 +    private void rejectIfQueued(Runnable command) {
579 +        final ReentrantLock mainLock = this.mainLock;
580 +        mainLock.lock();
581 +        boolean present;
582 +        try {
583 +            present = workQueue.remove(command);
584 +        } finally {
585 +            mainLock.unlock();
586 +        }
587 +        if (present)
588 +            reject(command);
589 +    }
590 +
591      /**
592       * Wakes up all threads that might be waiting for tasks.
593       */
# Line 563 | Line 612 | public class ThreadPoolExecutor extends
612          try {
613              completedTaskCount += w.completedTasks;
614              workers.remove(w);
615 <            if (--poolSize > 0)
616 <                return;
617 <
618 <            // Else, this is the last thread. Deal with potential shutdown.
619 <
620 <            int state = runState;
621 <            assert state != TERMINATED;
622 <
574 <            if (state != STOP) {
575 <                // If there are queued tasks but no threads, create
576 <                // replacement thread. We must create it initially
577 <                // idle to avoid orphaned tasks in case addThread
578 <                // fails.  This also handles case of delayed tasks
579 <                // that will sometime later become runnable.
580 <                if (!workQueue.isEmpty()) {
615 >            if (--poolSize == 0) { // Deal with potential shutdown.
616 >                int state = runState;
617 >                // If not stopping and there are queued tasks but no
618 >                // threads, create replacement thread. We must create
619 >                // it initially idle to avoid orphaned tasks in case
620 >                // addThread fails.  This also handles case of delayed
621 >                // tasks that will sometime later become runnable.
622 >                if (state < STOP && !workQueue.isEmpty()) {
623                      Thread t = addThread(null);
624                      if (t != null)
625                          t.start();
626 <                    return;
626 >                    state = RUNNING; // to cause termination check to fail
627 >                }
628 >                if (state == STOP || state == SHUTDOWN) { // can terminate
629 >                    runState = TERMINATED;
630 >                    termination.signalAll();
631 >                    terminated();
632                  }
586
587                // Otherwise, we can exit without replacement
588                if (state == RUNNING)
589                    return;
633              }
591
592            // Either state is STOP, or state is SHUTDOWN and there is
593            // no work to do. So we can terminate.
594            termination.signalAll();
595            runState = TERMINATED;
596            // fall through to call terminate() outside of lock.
634          } finally {
635              mainLock.unlock();
636          }
600
601        assert runState == TERMINATED;
602        terminated();
637      }
638  
639      /**
640       *  Worker threads
641       */
642 <    private class Worker implements Runnable {
642 >    private final class Worker implements Runnable {
643  
644          /**
645           * The runLock is acquired and released surrounding each task
# Line 669 | Line 703 | public class ThreadPoolExecutor extends
703              runLock.lock();
704              try {
705                  // If not shutting down then clear an outstanding interrupt.
706 <                if (runState != STOP &&
706 >                if (runState < STOP &&
707                      Thread.interrupted() &&
708 <                    runState == STOP) // Re-interrupt if stopped after clearing
708 >                    runState >= STOP) // Re-interrupt if stopped after clearing
709                      thread.interrupt();
710                  boolean ran = false;
711                  beforeExecute(thread, task);
# Line 702 | Line 736 | public class ThreadPoolExecutor extends
736                  firstTask = null;
737                  while (task != null || (task = getTask()) != null) {
738                      runTask(task);
739 <                    task = null; // unnecessary but can help GC
739 >                    task = null;
740                  }
741              } finally {
742                  workerDone(this);
# Line 860 | Line 894 | public class ThreadPoolExecutor extends
894          this.handler = handler;
895      }
896  
863
897      /**
898       * Executes the given task sometime in the future.  The task
899       * may execute in a new thread or in an existing pooled thread.
# Line 878 | Line 911 | public class ThreadPoolExecutor extends
911      public void execute(Runnable command) {
912          if (command == null)
913              throw new NullPointerException();
914 <        for (;;) {
882 <            if (runState != RUNNING) {
883 <                reject(command);
884 <                return;
885 <            }
914 >        while (runState == RUNNING) {
915              if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
916                  return;
917 <            if (workQueue.offer(command))
917 >            if (workQueue.offer(command)) {// recheck state after queuing
918 >                if (runState != RUNNING)
919 >                    rejectIfQueued(command);
920 >                else if (poolSize < corePoolSize)
921 >                    addIfUnderCorePoolSize(null);
922                  return;
923 +            }
924              int status = addIfUnderMaximumPoolSize(command);
925 <            if (status > 0)      // created new thread
925 >            if (status > 0)   // Created new thread to handle task
926                  return;
927 <            if (status == 0) {   // failed to create thread
928 <                reject(command);
929 <                return;
896 <            }
897 <            // Retry if created a new thread but it is busy with another task
927 >            if (status == 0)  // Cannot create thread
928 >                break;
929 >            // Retry if created thread but it is busy with another task
930          }
931 +
932 +        reject(command); // is shutdown or can't create thread or queue task
933 +        return;
934      }
935  
936      /**
# Line 915 | Line 950 | public class ThreadPoolExecutor extends
950          if (security != null)
951              security.checkPermission(shutdownPerm);
952  
918        boolean fullyTerminated = false;
953          final ReentrantLock mainLock = this.mainLock;
954          mainLock.lock();
955          try {
956 <            if (workers.size() > 0) {
956 >            if (security != null) {
957                  // Check if caller can modify worker threads.  This
958                  // might not be true even if passed above check, if
959                  // the SecurityManager treats some threads specially.
960 <                if (security != null) {
961 <                    for (Worker w: workers)
962 <                        security.checkAccess(w.thread);
929 <                }
960 >                for (Worker w: workers)
961 >                    security.checkAccess(w.thread);
962 >            }
963  
964 <                int state = runState;
965 <                if (state == RUNNING) // don't override shutdownNow
966 <                    runState = SHUTDOWN;
964 >            int state = runState;
965 >            if (state == RUNNING) // don't override shutdownNow
966 >                runState = SHUTDOWN;
967 >            int nworkers = 0;
968  
969 <                try {
970 <                    for (Worker w: workers)
971 <                        w.interruptIfIdle();
972 <                } catch (SecurityException se) {
939 <                    // If SecurityManager allows above checks, but
940 <                    // then unexpectedly throws exception when
941 <                    // interrupting threads (which it ought not do),
942 <                    // back out as cleanly as we can. Some threads may
943 <                    // have been killed but we remain in non-shutdown
944 <                    // state.
945 <                    runState = state;
946 <                    throw se;
969 >            try {
970 >                for (Worker w: workers) {
971 >                    w.interruptIfIdle();
972 >                    ++nworkers;
973                  }
974 +            } catch (SecurityException se) {
975 +                // If SecurityManager allows above checks, but
976 +                // then unexpectedly throws exception when
977 +                // interrupting threads (which it ought not do),
978 +                // back out as cleanly as we can. Some threads may
979 +                // have been killed but we remain in non-shutdown
980 +                // state.
981 +                runState = state;
982 +                throw se;
983              }
984 <            else { // If no workers, trigger full termination now
985 <                fullyTerminated = true;
984 >
985 >            // If no live workers, act on one's behalf to terminate
986 >            if (nworkers == 0 && state != TERMINATED) {
987                  runState = TERMINATED;
988                  termination.signalAll();
989 +                terminated();
990              }
991          } finally {
992              mainLock.unlock();
993          }
957        if (fullyTerminated)
958            terminated();
994      }
995  
996  
997      /**
998       * Attempts to stop all actively executing tasks, halts the
999       * processing of waiting tasks, and returns a list of the tasks
1000 <     * that were awaiting execution.
1000 >     * that were awaiting execution. These tasks are drained (removed)
1001 >     * from the task queue upon return from this method.
1002       *
1003       * <p>There are no guarantees beyond best-effort attempts to stop
1004       * processing actively executing tasks.  This implementation
# Line 982 | Line 1018 | public class ThreadPoolExecutor extends
1018          if (security != null)
1019              security.checkPermission(shutdownPerm);
1020  
985        boolean fullyTerminated = false;
1021          final ReentrantLock mainLock = this.mainLock;
1022          mainLock.lock();
1023          try {
1024 <            if (workers.size() > 0) {
1025 <                if (security != null) {
1026 <                    for (Worker w: workers)
1027 <                        security.checkAccess(w.thread);
993 <                }
1024 >            if (security != null) {
1025 >                for (Worker w: workers)
1026 >                    security.checkAccess(w.thread);
1027 >            }
1028  
1029 <                int state = runState;
1030 <                if (state != TERMINATED)
1031 <                    runState = STOP;
1032 <                try {
1033 <                    for (Worker w : workers)
1034 <                        w.interruptNow();
1035 <                } catch (SecurityException se) {
1036 <                    runState = state; // back out;
1003 <                    throw se;
1029 >            int state = runState;
1030 >            if (state != TERMINATED)
1031 >                runState = STOP;
1032 >            int nworkers = 0;
1033 >            try {
1034 >                for (Worker w : workers) {
1035 >                    w.interruptNow();
1036 >                    ++nworkers;
1037                  }
1038 +            } catch (SecurityException se) {
1039 +                runState = state; // back out;
1040 +                throw se;
1041              }
1042 <            else { // If no workers, trigger full termination now
1043 <                fullyTerminated = true;
1042 >
1043 >            if (nworkers == 0 && state != TERMINATED) {
1044                  runState = TERMINATED;
1045                  termination.signalAll();
1046 +                terminated();
1047              }
1048 +
1049 +            List<Runnable> taskList = new ArrayList<Runnable>();
1050 +            workQueue.drainTo(taskList);
1051 +            return taskList;
1052          } finally {
1053              mainLock.unlock();
1054          }
1014        if (fullyTerminated)
1015            terminated();
1016        return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY));
1055      }
1056  
1057      public boolean isShutdown() {
# Line 1196 | Line 1234 | public class ThreadPoolExecutor extends
1234                  // We have to create initially-idle threads here
1235                  // because we otherwise have no recourse about
1236                  // what to do with a dequeued task if addThread fails.
1237 <                while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) {
1237 >                while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize &&
1238 >                       runState < STOP) {
1239                      Thread t = addThread(null);
1240                      if (t != null)
1241                          t.start();
# Line 1417 | Line 1456 | public class ThreadPoolExecutor extends
1456      }
1457  
1458      /**
1459 <     * Returns the approximate total number of tasks that have been
1459 >     * Returns the approximate total number of tasks that have ever been
1460       * scheduled for execution. Because the states of tasks and
1461       * threads may change dynamically during computation, the returned
1462       * value is only an approximation, but one that does not ever

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines