ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.120 by jsr166, Wed Aug 8 16:46:16 2007 UTC vs.
Revision 1.121 by jsr166, Sun May 18 23:47:56 2008 UTC

# Line 363 | Line 363 | public class ThreadPoolExecutor extends
363       */
364  
365      private static boolean runStateLessThan(int c, int s) {
366 <        return c < s;
366 >        return c < s;
367      }
368  
369      private static boolean runStateAtLeast(int c, int s) {
370 <        return c >= s;
370 >        return c >= s;
371      }
372  
373      private static boolean isRunning(int c) {
374 <        return c < SHUTDOWN;
374 >        return c < SHUTDOWN;
375      }
376  
377      /**
378       * Attempt to CAS-increment the workerCount field of ctl.
379       */
380      private boolean compareAndIncrementWorkerCount(int expect) {
381 <        return ctl.compareAndSet(expect, expect + 1);
381 >        return ctl.compareAndSet(expect, expect + 1);
382      }
383  
384      /**
385       * Attempt to CAS-decrement the workerCount field of ctl.
386       */
387      private boolean compareAndDecrementWorkerCount(int expect) {
388 <        return ctl.compareAndSet(expect, expect - 1);
388 >        return ctl.compareAndSet(expect, expect - 1);
389      }
390  
391      /**
# Line 394 | Line 394 | public class ThreadPoolExecutor extends
394       * decrements are performed within getTask.
395       */
396      private void decrementWorkerCount() {
397 <        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
397 >        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
398      }
399  
400      /**
# Line 544 | Line 544 | public class ThreadPoolExecutor extends
544       * lock when they invoke pool control methods like setCorePoolSize.
545       */
546      private final class Worker
547 <        extends AbstractQueuedSynchronizer
548 <        implements Runnable
547 >        extends AbstractQueuedSynchronizer
548 >        implements Runnable
549      {
550 <        /**
551 <         * This class will never be serialized, but we provide a
552 <         * serialVersionUID to suppress a javac warning.
553 <         */
554 <        private static final long serialVersionUID = 6138294804551838833L;
550 >        /**
551 >         * This class will never be serialized, but we provide a
552 >         * serialVersionUID to suppress a javac warning.
553 >         */
554 >        private static final long serialVersionUID = 6138294804551838833L;
555  
556          /** Thread this worker is running in.  Null if factory fails. */
557          final Thread thread;
# Line 566 | Line 566 | public class ThreadPoolExecutor extends
566           */
567          Worker(Runnable firstTask) {
568              this.firstTask = firstTask;
569 <            this.thread = getThreadFactory().newThread(this);
569 >            this.thread = getThreadFactory().newThread(this);
570          }
571  
572          /** Delegates main run loop to outer runWorker  */
# Line 574 | Line 574 | public class ThreadPoolExecutor extends
574              runWorker(this);
575          }
576  
577 <        // Lock methods
578 <        //
579 <        // The value 0 represents the unlocked state.
580 <        // The value 1 represents the locked state.
581 <
582 <        protected boolean isHeldExclusively() {
583 <            return getState() == 1;
584 <        }
585 <
586 <        protected boolean tryAcquire(int unused) {
587 <            if (compareAndSetState(0, 1)) {
588 <                setExclusiveOwnerThread(Thread.currentThread());
589 <                return true;
590 <            }
591 <            return false;
592 <        }
593 <
594 <        protected boolean tryRelease(int unused) {
595 <            setExclusiveOwnerThread(null);
596 <            setState(0);
597 <            return true;
598 <        }
599 <
600 <        public void lock()        { acquire(1); }
601 <        public boolean tryLock()  { return tryAcquire(1); }
602 <        public void unlock()      { release(1); }
603 <        public boolean isLocked() { return isHeldExclusively(); }
577 >        // Lock methods
578 >        //
579 >        // The value 0 represents the unlocked state.
580 >        // The value 1 represents the locked state.
581 >
582 >        protected boolean isHeldExclusively() {
583 >            return getState() == 1;
584 >        }
585 >
586 >        protected boolean tryAcquire(int unused) {
587 >            if (compareAndSetState(0, 1)) {
588 >                setExclusiveOwnerThread(Thread.currentThread());
589 >                return true;
590 >            }
591 >            return false;
592 >        }
593 >
594 >        protected boolean tryRelease(int unused) {
595 >            setExclusiveOwnerThread(null);
596 >            setState(0);
597 >            return true;
598 >        }
599 >
600 >        public void lock()        { acquire(1); }
601 >        public boolean tryLock()  { return tryAcquire(1); }
602 >        public void unlock()      { release(1); }
603 >        public boolean isLocked() { return isHeldExclusively(); }
604      }
605  
606      /*
# Line 636 | Line 636 | public class ThreadPoolExecutor extends
636      final void tryTerminate() {
637          for (;;) {
638              int c = ctl.get();
639 <            if (isRunning(c) ||
640 <                runStateAtLeast(c, TIDYING) ||
641 <                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
642 <                return;
639 >            if (isRunning(c) ||
640 >                runStateAtLeast(c, TIDYING) ||
641 >                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
642 >                return;
643              if (workerCountOf(c) != 0) { // Eligible to terminate
644                  interruptIdleWorkers(ONLY_ONE);
645                  return;
646              }
647  
648 <            final ReentrantLock mainLock = this.mainLock;
649 <            mainLock.lock();
650 <            try {
651 <                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
652 <                    try {
653 <                        terminated();
654 <                    } finally {
655 <                        ctl.set(ctlOf(TERMINATED, 0));
656 <                        termination.signalAll();
657 <                    }
658 <                    return;
659 <                }
660 <            } finally {
661 <                mainLock.unlock();
662 <            }
648 >            final ReentrantLock mainLock = this.mainLock;
649 >            mainLock.lock();
650 >            try {
651 >                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
652 >                    try {
653 >                        terminated();
654 >                    } finally {
655 >                        ctl.set(ctlOf(TERMINATED, 0));
656 >                        termination.signalAll();
657 >                    }
658 >                    return;
659 >                }
660 >            } finally {
661 >                mainLock.unlock();
662 >            }
663              // else retry on failed CAS
664          }
665      }
# Line 730 | Line 730 | public class ThreadPoolExecutor extends
730       * waiting for a straggler task to finish.
731       */
732      private void interruptIdleWorkers(boolean onlyOne) {
733 <        final ReentrantLock mainLock = this.mainLock;
733 >        final ReentrantLock mainLock = this.mainLock;
734          mainLock.lock();
735          try {
736 <            for (Worker w : workers) {
736 >            for (Worker w : workers) {
737                  Thread t = w.thread;
738 <                if (!t.isInterrupted() && w.tryLock()) {
738 >                if (!t.isInterrupted() && w.tryLock()) {
739                      try {
740                          t.interrupt();
741                      } catch (SecurityException ignore) {
# Line 855 | Line 855 | public class ThreadPoolExecutor extends
855       * @return true if successful
856       */
857      private boolean addWorker(Runnable firstTask, boolean core) {
858 <        retry:
858 >        retry:
859          for (;;) {
860 <            int c = ctl.get();
861 <            int rs = runStateOf(c);
860 >            int c = ctl.get();
861 >            int rs = runStateOf(c);
862  
863 <            // Check if queue empty only if necessary.
863 >            // Check if queue empty only if necessary.
864              if (rs >= SHUTDOWN &&
865 <                ! (rs == SHUTDOWN &&
866 <                   firstTask == null &&
867 <                   ! workQueue.isEmpty()))
868 <                return false;
869 <
870 <            for (;;) {
871 <                int wc = workerCountOf(c);
872 <                if (wc >= CAPACITY ||
873 <                    wc >= (core ? corePoolSize : maximumPoolSize))
874 <                    return false;
875 <                if (compareAndIncrementWorkerCount(c))
876 <                    break retry;
877 <                c = ctl.get();  // Re-read ctl
878 <                if (runStateOf(c) != rs)
879 <                    continue retry;
880 <                // else CAS failed due to workerCount change; retry inner loop
881 <            }
865 >                ! (rs == SHUTDOWN &&
866 >                   firstTask == null &&
867 >                   ! workQueue.isEmpty()))
868 >                return false;
869 >
870 >            for (;;) {
871 >                int wc = workerCountOf(c);
872 >                if (wc >= CAPACITY ||
873 >                    wc >= (core ? corePoolSize : maximumPoolSize))
874 >                    return false;
875 >                if (compareAndIncrementWorkerCount(c))
876 >                    break retry;
877 >                c = ctl.get();  // Re-read ctl
878 >                if (runStateOf(c) != rs)
879 >                    continue retry;
880 >                // else CAS failed due to workerCount change; retry inner loop
881 >            }
882          }
883  
884          Worker w = new Worker(firstTask);
# Line 887 | Line 887 | public class ThreadPoolExecutor extends
887          final ReentrantLock mainLock = this.mainLock;
888          mainLock.lock();
889          try {
890 <            // Recheck while holding lock.
891 <            // Back out on ThreadFactory failure or if
892 <            // shut down before lock acquired.
890 >            // Recheck while holding lock.
891 >            // Back out on ThreadFactory failure or if
892 >            // shut down before lock acquired.
893              int c = ctl.get();
894 <            int rs = runStateOf(c);
894 >            int rs = runStateOf(c);
895  
896 <            if (t == null ||
897 <                (rs >= SHUTDOWN &&
898 <                 ! (rs == SHUTDOWN &&
899 <                    firstTask == null))) {
900 <                decrementWorkerCount();
901 <                tryTerminate();
902 <                return false;
903 <            }
896 >            if (t == null ||
897 >                (rs >= SHUTDOWN &&
898 >                 ! (rs == SHUTDOWN &&
899 >                    firstTask == null))) {
900 >                decrementWorkerCount();
901 >                tryTerminate();
902 >                return false;
903 >            }
904  
905 <            workers.add(w);
905 >            workers.add(w);
906  
907 <            int s = workers.size();
907 >            int s = workers.size();
908              if (s > largestPoolSize)
909                  largestPoolSize = s;
910          } finally {
# Line 912 | Line 912 | public class ThreadPoolExecutor extends
912          }
913  
914          t.start();
915 <        // It is possible (but unlikely) for a thread to have been
916 <        // added to workers, but not yet started, during transition to
917 <        // STOP, which could result in a rare missed interrupt,
918 <        // because Thread.interrupt is not guaranteed to have any effect
919 <        // on a non-yet-started Thread (see Thread#interrupt).
920 <        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
921 <            t.interrupt();
915 >        // It is possible (but unlikely) for a thread to have been
916 >        // added to workers, but not yet started, during transition to
917 >        // STOP, which could result in a rare missed interrupt,
918 >        // because Thread.interrupt is not guaranteed to have any effect
919 >        // on a non-yet-started Thread (see Thread#interrupt).
920 >        if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
921 >            t.interrupt();
922  
923 <        return true;
923 >        return true;
924      }
925  
926      /**
# Line 951 | Line 951 | public class ThreadPoolExecutor extends
951  
952          tryTerminate();
953  
954 <        int c = ctl.get();
955 <        if (runStateLessThan(c, STOP)) {
956 <            if (!completedAbruptly) {
957 <                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
958 <                if (min == 0 && ! workQueue.isEmpty())
959 <                    min = 1;
960 <                if (workerCountOf(c) >= min)
961 <                    return; // replacement not needed
962 <            }
963 <            addWorker(null, false);
964 <        }
954 >        int c = ctl.get();
955 >        if (runStateLessThan(c, STOP)) {
956 >            if (!completedAbruptly) {
957 >                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
958 >                if (min == 0 && ! workQueue.isEmpty())
959 >                    min = 1;
960 >                if (workerCountOf(c) >= min)
961 >                    return; // replacement not needed
962 >            }
963 >            addWorker(null, false);
964 >        }
965      }
966  
967      /**
# Line 981 | Line 981 | public class ThreadPoolExecutor extends
981       *         workerCount is decremented
982       */
983      private Runnable getTask() {
984 <        boolean timedOut = false; // Did the last poll() time out?
984 >        boolean timedOut = false; // Did the last poll() time out?
985  
986 <        retry:
987 <        for (;;) {
986 >        retry:
987 >        for (;;) {
988              int c = ctl.get();
989 <            int rs = runStateOf(c);
989 >            int rs = runStateOf(c);
990 >
991 >            // Check if queue empty only if necessary.
992 >            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
993 >                decrementWorkerCount();
994 >                return null;
995 >            }
996  
997 <            // Check if queue empty only if necessary.
998 <            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
999 <                decrementWorkerCount();
1000 <                return null;
1001 <            }
1002 <
1003 <            boolean timed;      // Are workers subject to culling?
1004 <
1005 <            for (;;) {
1006 <                int wc = workerCountOf(c);
1007 <                timed = allowCoreThreadTimeOut || wc > corePoolSize;
1008 <
1009 <                if (wc <= maximumPoolSize && ! (timedOut && timed))
1010 <                    break;
1005 <                if (compareAndDecrementWorkerCount(c))
1006 <                    return null;
1007 <                c = ctl.get();  // Re-read ctl
1008 <                if (runStateOf(c) != rs)
1009 <                    continue retry;
1010 <                // else CAS failed due to workerCount change; retry inner loop
997 >            boolean timed;      // Are workers subject to culling?
998 >
999 >            for (;;) {
1000 >                int wc = workerCountOf(c);
1001 >                timed = allowCoreThreadTimeOut || wc > corePoolSize;
1002 >
1003 >                if (wc <= maximumPoolSize && ! (timedOut && timed))
1004 >                    break;
1005 >                if (compareAndDecrementWorkerCount(c))
1006 >                    return null;
1007 >                c = ctl.get();  // Re-read ctl
1008 >                if (runStateOf(c) != rs)
1009 >                    continue retry;
1010 >                // else CAS failed due to workerCount change; retry inner loop
1011              }
1012  
1013              try {
# Line 1016 | Line 1016 | public class ThreadPoolExecutor extends
1016                      workQueue.take();
1017                  if (r != null)
1018                      return r;
1019 <                timedOut = true;
1019 >                timedOut = true;
1020              } catch (InterruptedException retry) {
1021 <                timedOut = false;
1021 >                timedOut = false;
1022              }
1023          }
1024      }
# Line 1688 | Line 1688 | public class ThreadPoolExecutor extends
1688              while (it.hasNext()) {
1689                  Runnable r = it.next();
1690                  if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
1691 <                    it.remove();
1691 >                    it.remove();
1692              }
1693          } catch (ConcurrentModificationException fallThrough) {
1694 <            // Take slow path if we encounter interference during traversal.
1694 >            // Take slow path if we encounter interference during traversal.
1695              // Make copy for traversal and call remove for cancelled entries.
1696 <            // The slow path is more likely to be O(N*N).
1696 >            // The slow path is more likely to be O(N*N).
1697              for (Object r : q.toArray())
1698                  if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
1699 <                    q.remove(r);
1699 >                    q.remove(r);
1700          }
1701  
1702          tryTerminate(); // In case SHUTDOWN and now empty
# Line 1713 | Line 1713 | public class ThreadPoolExecutor extends
1713          final ReentrantLock mainLock = this.mainLock;
1714          mainLock.lock();
1715          try {
1716 <            // Remove rare and surprising possibility of
1717 <            // isTerminated() && getPoolSize() > 0
1716 >            // Remove rare and surprising possibility of
1717 >            // isTerminated() && getPoolSize() > 0
1718              return runStateAtLeast(ctl.get(), TIDYING) ? 0
1719 <                : workers.size();
1719 >                : workers.size();
1720          } finally {
1721              mainLock.unlock();
1722          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines