ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.129 by dl, Wed May 9 10:44:40 2012 UTC vs.
Revision 1.130 by dl, Sun Jun 17 20:12:06 2012 UTC

# Line 546 | Line 546 | public class ThreadPoolExecutor extends
546       * task execution.  This protects against interrupts that are
547       * intended to wake up a worker thread waiting for a task from
548       * instead interrupting a task being run.  We implement a simple
549 <     * non-reentrant mutual exclusion lock rather than use ReentrantLock
550 <     * because we do not want worker tasks to be able to reacquire the
551 <     * lock when they invoke pool control methods like setCorePoolSize.
549 >     * non-reentrant mutual exclusion lock rather than use
550 >     * ReentrantLock because we do not want worker tasks to be able to
551 >     * reacquire the lock when they invoke pool control methods like
552 >     * setCorePoolSize.  Additionally, to suppress interrupts until
553 >     * the thread actually starts running tasks, we initialize lock
554 >     * state to a negative value, and clear it upon start (in
555 >     * runWorker).
556       */
557      private final class Worker
558          extends AbstractQueuedSynchronizer
# Line 572 | Line 576 | public class ThreadPoolExecutor extends
576           * @param firstTask the first task (null if none)
577           */
578          Worker(Runnable firstTask) {
579 +            setState(-1); // inhibit interrupts until runWorker
580              this.firstTask = firstTask;
581              this.thread = getThreadFactory().newThread(this);
582          }
# Line 587 | Line 592 | public class ThreadPoolExecutor extends
592          // The value 1 represents the locked state.
593  
594          protected boolean isHeldExclusively() {
595 <            return getState() == 1;
595 >            return getState() != 0;
596          }
597  
598          protected boolean tryAcquire(int unused) {
# Line 608 | Line 613 | public class ThreadPoolExecutor extends
613          public boolean tryLock()  { return tryAcquire(1); }
614          public void unlock()      { release(1); }
615          public boolean isLocked() { return isHeldExclusively(); }
616 +
617 +        void interruptIfStarted() {
618 +            Thread t;
619 +            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
620 +                try {
621 +                    t.interrupt();
622 +                } catch (SecurityException ignore) {
623 +                }
624 +            }
625 +        }
626      }
627  
628      /*
# Line 706 | Line 721 | public class ThreadPoolExecutor extends
721          final ReentrantLock mainLock = this.mainLock;
722          mainLock.lock();
723          try {
724 <            for (Worker w : workers) {
725 <                try {
711 <                    w.thread.interrupt();
712 <                } catch (SecurityException ignore) {
713 <                }
714 <            }
724 >            for (Worker w : workers)
725 >                w.interruptIfStarted();
726          } finally {
727              mainLock.unlock();
728          }
# Line 768 | Line 779 | public class ThreadPoolExecutor extends
779  
780      private static final boolean ONLY_ONE = true;
781  
771    /**
772     * Ensures that unless the pool is stopping, the current thread
773     * does not have its interrupt set. This requires a double-check
774     * of state in case the interrupt was cleared concurrently with a
775     * shutdownNow -- if so, the interrupt is re-enabled.
776     */
777    private void clearInterruptsForTaskRun() {
778        if (runStateLessThan(ctl.get(), STOP) &&
779            Thread.interrupted() &&
780            runStateAtLeast(ctl.get(), STOP))
781            Thread.currentThread().interrupt();
782    }
783
782      /*
783       * Misc utilities, most of which are also exported to
784       * ScheduledThreadPoolExecutor
# Line 890 | Line 888 | public class ThreadPoolExecutor extends
888          }
889  
890          boolean workerStarted = false;
891 +        boolean workerAdded = false;
892          Worker w = null;
893          try {
894              final ReentrantLock mainLock = this.mainLock;
895              w = new Worker(firstTask);
896              final Thread t = w.thread;
897 <            if (t.isAlive()) // precheck that t is startable
898 <                throw new IllegalThreadStateException();
899 <            mainLock.lock();
900 <            try {
901 <                // Recheck while holding lock.
902 <                // Back out on ThreadFactory failure or if
903 <                // shut down before lock acquired.
904 <                int c = ctl.get();
905 <                int rs = runStateOf(c);
906 <
907 <                if (t == null ||
908 <                    (rs >= SHUTDOWN &&
909 <                     ! (rs == SHUTDOWN &&
910 <                        firstTask == null)))
911 <                    return false;
912 <
913 <                workers.add(w);
914 <
915 <                int s = workers.size();
916 <                if (s > largestPoolSize)
917 <                    largestPoolSize = s;
918 <            } finally {
919 <                mainLock.unlock();
897 >            if (t != null) {
898 >                mainLock.lock();
899 >                try {
900 >                    // Recheck while holding lock.
901 >                    // Back out on ThreadFactory failure or if
902 >                    // shut down before lock acquired.
903 >                    int c = ctl.get();
904 >                    int rs = runStateOf(c);
905 >
906 >                    if (rs < SHUTDOWN ||
907 >                        (rs == SHUTDOWN && firstTask == null)) {
908 >                        if (t.isAlive()) // precheck that t is startable
909 >                            throw new IllegalThreadStateException();
910 >                        workers.add(w);
911 >                        int s = workers.size();
912 >                        if (s > largestPoolSize)
913 >                            largestPoolSize = s;
914 >                        workerAdded = true;
915 >                    }
916 >                } finally {
917 >                    mainLock.unlock();
918 >                }
919 >                if (workerAdded) {
920 >                    t.start();
921 >                    workerStarted = true;
922 >                }
923              }
922
923            t.start();
924            // It is possible (but unlikely) for a thread to have been
925            // added to workers, but not yet started, during transition to
926            // STOP, which could result in a rare missed interrupt,
927            // because Thread.interrupt is not guaranteed to have any
928            // effect on a non-yet-started Thread (see Thread#interrupt).
929            if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
930                t.interrupt();
931
932            return workerStarted = true;
924          } finally {
925              if (! workerStarted)
926                  addWorkerFailed(w);
927          }
928 +        return workerStarted;
929      }
930  
931      /**
# Line 1100 | Line 1092 | public class ThreadPoolExecutor extends
1092       * @param w the worker
1093       */
1094      final void runWorker(Worker w) {
1095 +        Thread wt = Thread.currentThread();
1096          Runnable task = w.firstTask;
1097          w.firstTask = null;
1098 +        w.unlock(); // allow interrupts
1099          boolean completedAbruptly = true;
1100          try {
1101              while (task != null || (task = getTask()) != null) {
1102                  w.lock();
1103 <                clearInterruptsForTaskRun();
1103 >                // If pool is stopping, ensure thread is interrupted;
1104 >                // if not, ensure thread is not interrupted.  This
1105 >                // requires a recheck in second case to deal with
1106 >                // shutdownNow race while clearing interrupt
1107 >                if ((runStateAtLeast(ctl.get(), STOP) ||
1108 >                     (Thread.interrupted() &&
1109 >                      runStateAtLeast(ctl.get(), STOP))) &&
1110 >                    !wt.isInterrupted())
1111 >                    wt.interrupt();
1112                  try {
1113 <                    beforeExecute(w.thread, task);
1113 >                    beforeExecute(wt, task);
1114                      Throwable thrown = null;
1115                      try {
1116                          task.run();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines