ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.15 by dl, Sat Aug 9 19:55:30 2003 UTC vs.
Revision 1.16 by dl, Thu Aug 14 15:34:04 2003 UTC

# Line 192 | Line 192 | public class ThreadPoolExecutor implemen
192      private volatile int   poolSize;
193  
194      /**
195 <     * Shutdown status, becomes (and remains) nonzero when shutdown called.
195 >     * Lifecycle state
196       */
197 <    private volatile int shutdownStatus;
197 >    private volatile int runState;
198  
199 <    // Special values for status
199 >    // Special values for runState
200      /** Normal, not-shutdown mode */
201 <    private static final int NOT_SHUTDOWN       = 0;
201 >    private static final int RUNNING    = 0;
202      /** Controlled shutdown mode */
203 <    private static final int SHUTDOWN_WHEN_IDLE = 1;
204 <    /*8 Immediate shutdown mode */
205 <    private static final int SHUTDOWN_NOW       = 2;
206 <
207 <    /**
208 <     * Latch that becomes true when all threads terminate after shutdown.
209 <     */
210 <    private volatile boolean isTerminated;
203 >    private static final int SHUTDOWN   = 1;
204 >    /** Immediate shutdown mode */
205 >    private static final int STOP       = 2;
206 >    /** Final state */
207 >    private static final int TERMINATED = 3;
208  
209      /**
210       * Handler called when saturated or shutdown in execute.
# Line 231 | Line 228 | public class ThreadPoolExecutor implemen
228      private long completedTaskCount;
229  
230      /**
231 <     * The default thread facotry
231 >     * The default thread factory
232       */
233      private static final ThreadFactory defaultThreadFactory =
234          new ThreadFactory() {
# Line 253 | Line 250 | public class ThreadPoolExecutor implemen
250          handler.rejectedExecution(command, this);
251      }
252  
256
253      /**
254       * Create and return a new thread running firstTask as its first
255       * task. Call only while holding mainLock
# Line 272 | Line 268 | public class ThreadPoolExecutor implemen
268          return t;
269      }
270  
271 <    // addIfUnderCorePoolSize is non-private; accessible to ScheduledExecutor
271 >
272  
273      /**
274       * Create and start a new thread running firstTask as its first
# Line 281 | Line 277 | public class ThreadPoolExecutor implemen
277       * null if none)
278       * @return true if successful.
279       */
280 <    boolean addIfUnderCorePoolSize(Runnable firstTask) {
280 >    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
281          Thread t = null;
282          mainLock.lock();
283          try {
# Line 332 | Line 328 | public class ThreadPoolExecutor implemen
328       */
329      private Runnable getTask() throws InterruptedException {
330          for (;;) {
331 <            int stat = shutdownStatus;
332 <            if (stat == SHUTDOWN_NOW)
333 <                return null;
334 <            if (stat == SHUTDOWN_WHEN_IDLE) // help drain queue before dying
335 <                return workQueue.poll();
336 <            if (poolSize <= corePoolSize)   // untimed wait if core
337 <                return workQueue.take();
338 <            long timeout = keepAliveTime;
339 <            if (timeout <= 0) // must die immediately for 0 timeout
340 <                return null;
341 <            Runnable task =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
342 <            if (task != null)
343 <                return task;
344 <            if (poolSize > corePoolSize) // timed out
331 >            switch(runState) {
332 >            case RUNNING: {
333 >                if (poolSize <= corePoolSize)   // untimed wait if core
334 >                    return workQueue.take();
335 >                
336 >                long timeout = keepAliveTime;
337 >                if (timeout <= 0) // die immediately for 0 timeout
338 >                    return null;
339 >                Runnable r =  workQueue.poll(timeout, TimeUnit.NANOSECONDS);
340 >                if (r != null)
341 >                    return r;
342 >                if (poolSize > corePoolSize) // timed out
343 >                    return null;
344 >                // else, after timeout, pool shrank so shouldn't die, so retry
345 >                break;
346 >            }
347 >
348 >            case SHUTDOWN: {
349 >                // Help drain queue
350 >                Runnable r = workQueue.poll();
351 >                if (r != null)
352 >                    return r;
353 >                    
354 >                // Check if can terminate
355 >                if (workQueue.isEmpty()) {
356 >                    interruptIdleWorkers();
357 >                    return null;
358 >                }
359 >
360 >                // There could still be delayed tasks in queue.
361 >                // Wait for one, re-checking state upon interruption
362 >                try {
363 >                    return workQueue.take();
364 >                }
365 >                catch(InterruptedException ignore) {
366 >                }
367 >                break;
368 >            }
369 >
370 >            case STOP:
371                  return null;
372 <            // else, after timeout, pool shrank so shouldn't die, so retry
372 >            default:
373 >                assert false;
374 >            }
375 >        }
376 >    }
377 >
378 >    /**
379 >     * Wake up all threads that might be waiting for tasks.
380 >     */
381 >    void interruptIdleWorkers() {
382 >        mainLock.lock();
383 >        try {
384 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
385 >                it.next().interruptIfIdle();
386 >        } finally {
387 >            mainLock.unlock();
388          }
389      }
390  
# Line 356 | Line 393 | public class ThreadPoolExecutor implemen
393       * @param w the worker
394       */
395      private void workerDone(Worker w) {
359        boolean allDone = false;
396          mainLock.lock();
397          try {
398              completedTaskCount += w.completedTasks;
399              workers.remove(w);
364
400              if (--poolSize > 0)
401                  return;
402  
403 <            // If this was last thread, deal with potential shutdown
404 <            int stat = shutdownStatus;
403 >            // Else, this is the last thread. Deal with potential shutdown.
404 >
405 >            int state = runState;
406 >            assert state != TERMINATED;
407  
408 <            // If there are queued tasks but no threads, create replacement.
409 <            if (stat != SHUTDOWN_NOW) {
408 >            if (state != STOP) {
409 >                // If there are queued tasks but no threads, create
410 >                // replacement.
411                  Runnable r = workQueue.poll();
412                  if (r != null) {
413                      addThread(r).start();
414                      return;
415                  }
378            }
416  
417 <            // if no tasks and not shutdown, can exit without replacement
418 <            if (stat == NOT_SHUTDOWN)
419 <                return;
417 >                // If there are some (presumably delayed) tasks but
418 >                // none pollable, create an idle replacement to wait.
419 >                if (!workQueue.isEmpty()) {
420 >                    addThread(null).start();
421 >                    return;
422 >                }
423 >
424 >                // Otherwise, we can exit without replacement
425 >                if (state == RUNNING)
426 >                    return;
427 >            }
428  
429 <            allDone = true;
430 <            isTerminated = true;
429 >            // Either state is STOP, or state is SHUTDOWN and there is
430 >            // no work to do. So we can terminate.
431 >            runState = TERMINATED;
432              termination.signalAll();
433 +            // fall through to call terminate() outside of lock.
434          } finally {
435              mainLock.unlock();
436          }
437  
438 <        if (allDone) // call outside lock
439 <            terminated();
438 >        assert runState == TERMINATED;
439 >        terminated();
440      }
441  
442      /**
# Line 458 | Line 505 | public class ThreadPoolExecutor implemen
505              try {
506                  // Abort now if immediate cancel.  Otherwise, we have
507                  // committed to run this task.
508 <                if (shutdownStatus == SHUTDOWN_NOW)
508 >                if (runState == STOP)
509                      return;
510  
511                  Thread.interrupted(); // clear interrupt status on entry
# Line 672 | Line 719 | public class ThreadPoolExecutor implemen
719       */
720      public void execute(Runnable command) {
721          for (;;) {
722 <            if (shutdownStatus != NOT_SHUTDOWN) {
722 >            if (runState != RUNNING) {
723                  reject(command);
724                  return;
725              }
# Line 694 | Line 741 | public class ThreadPoolExecutor implemen
741      public void shutdown() {
742          mainLock.lock();
743          try {
744 <            if (shutdownStatus == NOT_SHUTDOWN) // don't override shutdownNow
745 <                shutdownStatus = SHUTDOWN_WHEN_IDLE;
699 <
744 >            if (runState == RUNNING) // don't override shutdownNow
745 >                runState = SHUTDOWN;
746              for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
747                  it.next().interruptIfIdle();
748          } finally {
# Line 704 | Line 750 | public class ThreadPoolExecutor implemen
750          }
751      }
752  
753 +
754      public List shutdownNow() {
755          mainLock.lock();
756          try {
757 <            shutdownStatus = SHUTDOWN_NOW;
757 >            if (runState != TERMINATED)
758 >                runState = STOP;
759              for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
760                  it.next().interruptNow();
761          } finally {
# Line 717 | Line 765 | public class ThreadPoolExecutor implemen
765      }
766  
767      public boolean isShutdown() {
768 <        return shutdownStatus != NOT_SHUTDOWN;
768 >        return runState != RUNNING;
769 >    }
770 >
771 >    /**
772 >     * Return true if this executor is in the process of terminating
773 >     * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not
774 >     * completely terminated.  This method may be useful for
775 >     * debugging. A return of <tt>true</tt> reported a sufficient
776 >     * period after shutdown may indicate that submitted tasks have
777 >     * ignored or suppressed interruption, causing this executor not
778 >     * to properly terminate.
779 >     * @return true if terminating but not yet terminated.
780 >     */
781 >    public boolean isTerminating() {
782 >        return runState == STOP;
783      }
784  
785      public boolean isTerminated() {
786 <        return isTerminated;
786 >        return runState == TERMINATED;
787      }
788  
789      public boolean awaitTermination(long timeout, TimeUnit unit)
# Line 807 | Line 869 | public class ThreadPoolExecutor implemen
869  
870  
871      /**
872 <     * Removes from the work queue all {@link Cancellable} tasks
873 <     * that have been cancelled. This method can be useful as a
874 <     * storage reclamation operation, that has no other impact
875 <     * on functionality. Cancelled tasks are never executed, but
876 <     * may accumulate in work queues until worker threads can
877 <     * actively remove them. Invoking this method ensures that they
878 <     * are instead removed now.
872 >     * Tries to remove from the work queue all {@link Cancellable}
873 >     * tasks that have been cancelled. This method can be useful as a
874 >     * storage reclamation operation, that has no other impact on
875 >     * functionality. Cancelled tasks are never executed, but may
876 >     * accumulate in work queues until worker threads can actively
877 >     * remove them. Invoking this method instead tries to remove them now.
878 >     * However, this method may fail to remove all such tasks in
879 >     * the presence of interference by other threads.
880       */
881  
882      public void purge() {
883 <        Iterator<Runnable> it = getQueue().iterator();
884 <        while (it.hasNext()) {
885 <            Runnable r = it.next();
886 <            if (r instanceof Cancellable) {
887 <                Cancellable c = (Cancellable)r;
888 <                if (c.isCancelled())
889 <                    it.remove();
883 >        // Fail if we encounter interference during traversal
884 >        try {
885 >            Iterator<Runnable> it = getQueue().iterator();
886 >            while (it.hasNext()) {
887 >                Runnable r = it.next();
888 >                if (r instanceof Cancellable) {
889 >                    Cancellable c = (Cancellable)r;
890 >                    if (c.isCancelled())
891 >                        it.remove();
892 >                }
893              }
894          }
895 +        catch(ConcurrentModificationException ex) {
896 +            return;
897 +        }
898      }
899  
900      /**
# Line 873 | Line 942 | public class ThreadPoolExecutor implemen
942      }
943  
944      /**
945 +     * Start a core thread, causing it to idly wait for work. This
946 +     * overrides the default policy of starting core threads only when
947 +     * new tasks are executed. This method will return <tt>false</tt>
948 +     * if all core threads have already been started.
949 +     * @return true if a thread was started
950 +     */
951 +    public boolean prestartCoreThread() {
952 +        return addIfUnderCorePoolSize(null);
953 +    }
954 +
955 +    /**
956 +     * Start all core threads, causing them to idly wait for work. This
957 +     * overrides the default policy of starting core threads only when
958 +     * new tasks are executed.
959 +     * @return the number of threads started.
960 +     */
961 +    public int prestartAllCoreThreads() {
962 +        int n = 0;
963 +        while (addIfUnderCorePoolSize(null))
964 +            ++n;
965 +        return n;
966 +    }
967 +
968 +    /**
969       * Sets the maximum allowed number of threads. This overrides any
970       * value set in the constructor. If the new value is smaller than
971       * the current value, excess existing threads will be

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines