ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.118 by dl, Sun Feb 18 23:14:58 2007 UTC vs.
Revision 1.119 by jsr166, Mon Feb 19 00:59:54 2007 UTC

# Line 604 | Line 604 | public class ThreadPoolExecutor extends
604              int c = ctl.get();
605              if (isRunning(c) ||
606                  runStateAtLeast(c, TIDYING) ||
607 <                (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
608 <                return;
607 >                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
608 >                return;
609              if (workerCountOf(c) != 0) { // Eligible to terminate
610                  interruptIdleWorkers(ONLY_ONE);
611                  return;
612              }
613 <            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
614 <                mainLock.lock();
615 <                try {
613 >
614 >            final ReentrantLock mainLock = this.mainLock;
615 >            mainLock.lock();
616 >            try {
617 >                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
618                      try {
619                          terminated();
620                      } finally {
621                          ctl.set(ctlOf(TERMINATED, 0));
622                          termination.signalAll();
623                      }
624 <                } finally {
625 <                    mainLock.unlock();
626 <                }
627 <                return;
628 <            }
624 >                    return;
625 >                }
626 >            } finally {
627 >                mainLock.unlock();
628 >            }
629              // else retry on failed CAS
630          }
631      }
# Line 719 | Line 721 | public class ThreadPoolExecutor extends
721       * Common form of interruptIdleWorkers, to avoid having to
722       * remember what the boolean argument means.
723       */
724 <    private void interruptIdleWorkers() {
725 <        interruptIdleWorkers(false);
724 >    private void interruptIdleWorkers() {
725 >        interruptIdleWorkers(false);
726      }
727  
728      private static final boolean ONLY_ONE = true;
# Line 819 | Line 821 | public class ThreadPoolExecutor extends
821       * @return true if successful
822       */
823      private boolean addWorker(Runnable firstTask, boolean core) {
824 +        retry:
825          for (;;) {
826 <            int c = ctl.get();
827 <            if (runStateAtLeast(c, SHUTDOWN)) {
828 <                int rs = runStateOf(c);
829 <                if (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
830 <                    return false;
831 <                // Avoid unnecessary CAS failure if isEmpty is slow.
832 <                int reread = ctl.get();
833 <                if (runStateOf(reread) != rs)
834 <                    continue; // must recheck
835 <                c = reread;
836 <            }
837 <            int wc = workerCountOf(c);
838 <            if (wc >= CAPACITY ||
839 <                wc >= (core ? corePoolSize : maximumPoolSize))
840 <                return false;
841 <            if (compareAndIncrementWorkerCount(c))
842 <                break;
826 >            int c = ctl.get();
827 >            int rs = runStateOf(c);
828 >
829 >            // Check if queue empty only if necessary.
830 >            if (rs >= SHUTDOWN &&
831 >                ! (rs == SHUTDOWN &&
832 >                   firstTask == null &&
833 >                   ! workQueue.isEmpty()))
834 >                return false;
835 >
836 >            for (;;) {
837 >                int wc = workerCountOf(c);
838 >                if (wc >= CAPACITY ||
839 >                    wc >= (core ? corePoolSize : maximumPoolSize))
840 >                    return false;
841 >                if (compareAndIncrementWorkerCount(c))
842 >                    break retry;
843 >                c = ctl.get();  // Re-read ctl
844 >                if (runStateOf(c) != rs)
845 >                    continue retry;
846 >                // else CAS failed due to workerCount change; retry inner loop
847 >            }
848          }
849  
850          Worker w = new Worker(firstTask);
# Line 845 | Line 853 | public class ThreadPoolExecutor extends
853          final ReentrantLock mainLock = this.mainLock;
854          mainLock.lock();
855          try {
856 +            // Recheck while holding lock.
857              // Back out on ThreadFactory failure or if
858              // shut down before lock acquired.
859              int c = ctl.get();
860 +            int rs = runStateOf(c);
861 +
862              if (t == null ||
863 <                (runStateAtLeast(c, SHUTDOWN) &&
864 <                 ((runStateOf(c) != SHUTDOWN || firstTask != null)))) {
863 >                (rs >= SHUTDOWN &&
864 >                 ! (rs == SHUTDOWN &&
865 >                    firstTask == null))) {
866                  decrementWorkerCount();
867                  tryTerminate();
868                  return false;
869              }
870 <            workers.add(w);
871 <            int s = workers.size();
870 >
871 >            workers.add(w);
872 >
873 >            int s = workers.size();
874              if (s > largestPoolSize)
875                  largestPoolSize = s;
876          } finally {
# Line 864 | Line 878 | public class ThreadPoolExecutor extends
878          }
879  
880          t.start();
881 <        return true;
881 >        // It is possible (but unlikely) for a thread to have been
882 >        // added to workers, but not yet started, during transition to
883 >        // STOP, which could result in a rare missed interrupt,
884 >        // because Thread.interrupt is not guaranteed to have any effect
885 >        // on a non-yet-started Thread (see Thread#interrupt).
886 >        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
887 >            t.interrupt();
888 >
889 >        return true;
890      }
891  
892      /**
# Line 915 | Line 937 | public class ThreadPoolExecutor extends
937       * 1. There are more than maximumPoolSize workers (due to
938       *    a call to setMaximumPoolSize).
939       * 2. The pool is stopped.
940 <     * 3. The queue is empty, and either the pool is shutdown,
941 <     *    or the thread has already timed out at least once
942 <     *    waiting for a task, and would otherwise enter another
943 <     *    timed wait.
940 >     * 3. The pool is shutdown and the queue is empty.
941 >     * 4. This worker timed out waiting for a task, and timed-out
942 >     *    workers are subject to termination (that is,
943 >     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
944 >     *    both before and after the timed wait.
945       *
946       * @return task, or null if the worker must exit, in which case
947       *         workerCount is decremented
948       */
949      private Runnable getTask() {
950 <        /*
951 <         * Variable "empty" tracks whether the queue appears to be
952 <         * empty in case we need to know to check exit. This is set
953 <         * true on time-out from timed poll as an indicator of likely
931 <         * emptiness, in which case it is rechecked explicitly via
932 <         * isEmpty when deciding whether to exit.  Emptiness must also
933 <         * be checked in state SHUTDOWN.  The variable is initialized
934 <         * false to indicate lack of prior timeout, and left false
935 <         * until otherwise required to check.
936 <         */
937 <        boolean empty = false;
938 <        for (;;) {
950 >        boolean timedOut = false; // Did the last poll() time out?
951 >
952 >        retry:
953 >        for (;;) {
954              int c = ctl.get();
955 <            int rs = runStateOf(c);
956 <            if (rs == SHUTDOWN || empty) {
957 <                empty = workQueue.isEmpty();
958 <                if (runStateOf(c = ctl.get()) != rs)
959 <                    continue; // retry if state changed
960 <            }
955 >            int rs = runStateOf(c);
956 >
957 >            // Check if queue empty only if necessary.
958 >            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
959 >                decrementWorkerCount();
960 >                return null;
961 >            }
962  
963 <            int wc = workerCountOf(c);
948 <            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
963 >            boolean timed;      // Are workers subject to culling?
964  
965 <            // Try to exit if too many threads, shutting down, and/or timed out
966 <            if (wc > maximumPoolSize || rs > SHUTDOWN ||
967 <                (empty && (timed || rs == SHUTDOWN))) {
968 <                if (compareAndDecrementWorkerCount(c))
969 <                    return null;
970 <                else
971 <                    continue; // retry on CAS failure
965 >            for (;;) {
966 >                int wc = workerCountOf(c);
967 >                timed = allowCoreThreadTimeOut || wc > corePoolSize;
968 >
969 >                if (wc <= maximumPoolSize && ! (timedOut && timed))
970 >                    break;
971 >                if (compareAndDecrementWorkerCount(c))
972 >                    return null;
973 >                c = ctl.get();  // Re-read ctl
974 >                if (runStateOf(c) != rs)
975 >                    continue retry;
976 >                // else CAS failed due to workerCount change; retry inner loop
977              }
978  
979              try {
# Line 962 | Line 982 | public class ThreadPoolExecutor extends
982                      workQueue.take();
983                  if (r != null)
984                      return r;
985 <                empty = true; // queue probably empty; recheck above
985 >                timedOut = true;
986              } catch (InterruptedException retry) {
987 +                timedOut = false;
988              }
989          }
990      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines