ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java (file contents):
Revision 1.9 by dl, Tue Jul 8 00:46:35 2003 UTC vs.
Revision 1.10 by tim, Thu Jul 31 20:32:00 2003 UTC

# Line 54 | Line 54 | import java.util.*;
54   * corePoolSize and maximumPoolSize the same, you create a fixed-size
55   * thread pool.</dd>
56   *
57 < * <dt>Keep-alive</dt>
57 > * <dt>Keep-alive</dt>
58   *
59   * <dd>The keepAliveTime determines what happens to idle threads.  If
60   * the pool currently has more than the core number of threads, excess
61   * threads will be terminated if they have been idle for more than the
62   * keepAliveTime.</dd>
63   *
64 < * <dt>Queueing</dt>
65 < *
64 > * <dt>Queueing</dt>
65 > *
66   * <dd>You are free to specify the queuing mechanism used to handle
67   * submitted tasks.  A good default is to use queueless synchronous
68   * channels to to hand off work to threads.  This is a safe,
# Line 82 | Line 82 | import java.util.*;
82   * <p>While queuing can be useful in smoothing out transient bursts of
83   * requests, especially in socket-based services, it is not very well
84   * behaved when commands continue to arrive on average faster than
85 < * they can be processed.  
85 > * they can be processed.
86   *
87   * Queue sizes and maximum pool sizes can often be traded off for each
88   * other. Using large queues and small pools minimizes CPU usage, OS
# Line 144 | Line 144 | import java.util.*;
144   public class ThreadPoolExecutor implements ExecutorService {
145      /**
146       * Queue used for holding tasks and handing off to worker threads.
147 <     */
147 >     */
148      private final BlockingQueue<Runnable> workQueue;
149  
150      /**
151       * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and
152       * workers set.
153 <     */
153 >     */
154      private final ReentrantLock mainLock = new ReentrantLock();
155  
156      /**
157       * Wait condition to support awaitTermination
158 <     */
158 >     */
159      private final Condition termination = mainLock.newCondition();
160  
161      /**
162       * Set containing all worker threads in pool.
163 <     */
163 >     */
164      private final Set<Worker> workers = new HashSet<Worker>();
165  
166      /**
167       * Timeout in nanosecods for idle threads waiting for work.
168       * Threads use this timeout only when there are more than
169       * corePoolSize present. Otherwise they wait forever for new work.
170 <     */
170 >     */
171      private volatile long  keepAliveTime;
172  
173      /**
174       * Core pool size, updated only while holding mainLock,
175       * but volatile to allow concurrent readability even
176       * during updates.
177 <     */
177 >     */
178      private volatile int   corePoolSize;
179  
180      /**
181       * Maximum pool size, updated only while holding mainLock
182       * but volatile to allow concurrent readability even
183       * during updates.
184 <     */
184 >     */
185      private volatile int   maximumPoolSize;
186  
187      /**
188       * Current pool size, updated only while holding mainLock
189       * but volatile to allow concurrent readability even
190       * during updates.
191 <     */
191 >     */
192      private volatile int   poolSize;
193  
194      /**
195       * Shutdown status, becomes (and remains) nonzero when shutdown called.
196 <     */
196 >     */
197      private volatile int shutdownStatus;
198  
199      // Special values for status
# Line 202 | Line 202 | public class ThreadPoolExecutor implemen
202      /** Controlled shutdown mode */
203      private static final int SHUTDOWN_WHEN_IDLE = 1;
204      /*8 Immediate shutdown mode */
205 <    private static final int SHUTDOWN_NOW       = 2;        
205 >    private static final int SHUTDOWN_NOW       = 2;
206  
207      /**
208       * Latch that becomes true when all threads terminate after shutdown.
209 <     */
209 >     */
210      private volatile boolean isTerminated;
211  
212      /**
213       * Handler called when saturated or shutdown in execute.
214 <     */
214 >     */
215      private volatile RejectedExecutionHandler handler = defaultHandler;
216  
217      /**
218       * Factory for new threads.
219 <     */
219 >     */
220      private volatile ThreadFactory threadFactory = defaultThreadFactory;
221  
222      /**
223       * Tracks largest attained pool size.
224 <     */
224 >     */
225      private int largestPoolSize;
226  
227      /**
228       * Counter for completed tasks. Updated only on termination of
229       * worker threads.
230 <     */
230 >     */
231      private long completedTaskCount;
232  
233      /**
234 <     * The default thread facotry
234 >     * The default thread facotry
235       */
236 <    private static final ThreadFactory defaultThreadFactory =
236 >    private static final ThreadFactory defaultThreadFactory =
237          new ThreadFactory() {
238              public Thread newThread(Runnable r) {
239                  return new Thread(r);
# Line 243 | Line 243 | public class ThreadPoolExecutor implemen
243      /**
244       * The default rejectect execution handler
245       */
246 <    private static final RejectedExecutionHandler defaultHandler =
246 >    private static final RejectedExecutionHandler defaultHandler =
247          new AbortPolicy();
248  
249      /**
# Line 275 | Line 275 | public class ThreadPoolExecutor implemen
275          Thread t = null;
276          mainLock.lock();
277          try {
278 <            if (poolSize < corePoolSize)
278 >            if (poolSize < corePoolSize)
279                  t = addThread(firstTask);
280          }
281          finally {
# Line 345 | Line 345 | public class ThreadPoolExecutor implemen
345  
346      /**
347       * Perform bookkeeping for a terminated worker thread.
348 <     * @param w the worker
348 >     * @param w the worker
349       */
350      private void workerDone(Worker w) {
351          boolean allDone = false;
# Line 354 | Line 354 | public class ThreadPoolExecutor implemen
354              completedTaskCount += w.completedTasks;
355              workers.remove(w);
356  
357 <            if (--poolSize > 0)
357 >            if (--poolSize > 0)
358                  return;
359  
360              // If this was last thread, deal with potential shutdown
361              int stat = shutdownStatus;
362 <            
362 >
363              // If there are queued tasks but no threads, create replacement.
364              if (stat != SHUTDOWN_NOW) {
365                  Runnable r = workQueue.poll();
# Line 370 | Line 370 | public class ThreadPoolExecutor implemen
370              }
371  
372              // if no tasks and not shutdown, can exit without replacement
373 <            if (stat == NOT_SHUTDOWN)
373 >            if (stat == NOT_SHUTDOWN)
374                  return;
375  
376              allDone = true;
# Line 386 | Line 386 | public class ThreadPoolExecutor implemen
386      }
387  
388      /**
389 <     *  Worker threads
389 >     *  Worker threads
390       */
391      private class Worker implements Runnable {
392  
# Line 425 | Line 425 | public class ThreadPoolExecutor implemen
425  
426          /**
427           * Interrupt thread if not running a task
428 <         */
428 >         */
429          void interruptIfIdle() {
430              if (runLock.tryLock()) {
431                  try {
# Line 439 | Line 439 | public class ThreadPoolExecutor implemen
439  
440          /**
441           * Cause thread to die even if running a task.
442 <         */
442 >         */
443          void interruptNow() {
444              thread.interrupt();
445          }
# Line 498 | Line 498 | public class ThreadPoolExecutor implemen
498                      task = null; // unnecessary but can help GC
499                  }
500              }
501 <            catch(InterruptedException ie) {
501 >            catch(InterruptedException ie) {
502                  // fall through
503              }
504              finally {
# Line 534 | Line 534 | public class ThreadPoolExecutor implemen
534                                long keepAliveTime,
535                                TimeUnit unit,
536                                BlockingQueue<Runnable> workQueue) {
537 <        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
537 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
538               defaultThreadFactory, defaultHandler);
539      }
540  
# Line 555 | Line 555 | public class ThreadPoolExecutor implemen
555       * are executed. This queue will hold only the <tt>Runnable</tt>
556       * tasks submitted by the <tt>execute</tt> method.
557       * @param threadFactory the factory to use when the executor
558 <     * creates a new thread.
558 >     * creates a new thread.
559       * @throws IllegalArgumentException if corePoolSize, or
560       * keepAliveTime less than zero, or if maximumPoolSize less than or
561       * equal to zero, or if corePoolSize greater than maximumPoolSize.
562 <     * @throws NullPointerException if <tt>workQueue</tt>
562 >     * @throws NullPointerException if <tt>workQueue</tt>
563       * or <tt>threadFactory</tt> are null.
564       */
565      public ThreadPoolExecutor(int corePoolSize,
# Line 569 | Line 569 | public class ThreadPoolExecutor implemen
569                                BlockingQueue<Runnable> workQueue,
570                                ThreadFactory threadFactory) {
571  
572 <        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
572 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
573               threadFactory, defaultHandler);
574      }
575  
# Line 594 | Line 594 | public class ThreadPoolExecutor implemen
594       * @throws IllegalArgumentException if corePoolSize, or
595       * keepAliveTime less than zero, or if maximumPoolSize less than or
596       * equal to zero, or if corePoolSize greater than maximumPoolSize.
597 <     * @throws NullPointerException if <tt>workQueue</tt>
597 >     * @throws NullPointerException if <tt>workQueue</tt>
598       * or  <tt>handler</tt> are null.
599       */
600      public ThreadPoolExecutor(int corePoolSize,
# Line 603 | Line 603 | public class ThreadPoolExecutor implemen
603                                TimeUnit unit,
604                                BlockingQueue<Runnable> workQueue,
605                                RejectedExecutionHandler handler) {
606 <        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
606 >        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
607               defaultThreadFactory, handler);
608      }
609  
# Line 624 | Line 624 | public class ThreadPoolExecutor implemen
624       * are executed. This queue will hold only the <tt>Runnable</tt>
625       * tasks submitted by the <tt>execute</tt> method.
626       * @param threadFactory the factory to use when the executor
627 <     * creates a new thread.
627 >     * creates a new thread.
628       * @param handler the handler to use when execution is blocked
629       * because the thread bounds and queue capacities are reached.
630       * @throws IllegalArgumentException if corePoolSize, or
631       * keepAliveTime less than zero, or if maximumPoolSize less than or
632       * equal to zero, or if corePoolSize greater than maximumPoolSize.
633 <     * @throws NullPointerException if <tt>workQueue</tt>
633 >     * @throws NullPointerException if <tt>workQueue</tt>
634       * or <tt>threadFactory</tt> or <tt>handler</tt> are null.
635       */
636      public ThreadPoolExecutor(int corePoolSize,
# Line 640 | Line 640 | public class ThreadPoolExecutor implemen
640                                BlockingQueue<Runnable> workQueue,
641                                ThreadFactory threadFactory,
642                                RejectedExecutionHandler handler) {
643 <        if (corePoolSize < 0 ||
643 >        if (corePoolSize < 0 ||
644              maximumPoolSize <= 0 ||
645 <            maximumPoolSize < corePoolSize ||
645 >            maximumPoolSize < corePoolSize ||
646              keepAliveTime < 0)
647              throw new IllegalArgumentException();
648          if (workQueue == null || threadFactory == null || handler == null)
# Line 662 | Line 662 | public class ThreadPoolExecutor implemen
662       *
663       * If the task cannot be submitted for execution, either because this
664       * executor has been shutdown or because its capacity has been reached,
665 <     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.  
665 >     * the task is handled by the current <tt>RejectedExecutionHandler</tt>.
666       *
667       * @param command the task to execute
668       * @throws RejectedExecutionException at discretion of
669       * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
670       * for execution
671       */
672 <    public void execute(Runnable command) {
672 >    public void execute(Runnable command) {
673          for (;;) {
674              if (shutdownStatus != NOT_SHUTDOWN) {
675                  handler.rejectedExecution(command, this);
# Line 735 | Line 735 | public class ThreadPoolExecutor implemen
735              mainLock.unlock();
736          }
737      }
738 <    
738 >
739      /**
740       * Sets the thread factory used to create new threads.
741       *
# Line 787 | Line 787 | public class ThreadPoolExecutor implemen
787       * Removes this task from internal queue if it is present, thus
788       * causing it not to be run if it has not already started.  This
789       * method may be useful as one part of a cancellation scheme.
790 <     *
790 >     *
791       * @param task the task to remove
792       * @return true if the task was removed
793       */
# Line 797 | Line 797 | public class ThreadPoolExecutor implemen
797  
798  
799      /**
800 <     * Removes from the work queue all {@ link Cancellable} tasks
800 >     * Removes from the work queue all {@link Cancellable} tasks
801       * that have been cancelled. This method can be useful as a
802       * storage reclamation operation, that has no other impact
803       * on functionality. Cancelled tasks are never executed, but
# Line 812 | Line 812 | public class ThreadPoolExecutor implemen
812              Runnable r = it.next();
813              if (r instanceof Cancellable) {
814                  Cancellable c = (Cancellable)r;
815 <                if (c.isCancelled())
815 >                if (c.isCancelled())
816                      it.remove();
817              }
818          }
# Line 825 | Line 825 | public class ThreadPoolExecutor implemen
825       * they next become idle.
826       *
827       * @param corePoolSize the new core size
828 <     * @throws IllegalArgumentException if <tt>corePoolSize</tt>
828 >     * @throws IllegalArgumentException if <tt>corePoolSize</tt>
829       * less than zero
830       */
831      public void setCorePoolSize(int corePoolSize) {
# Line 837 | Line 837 | public class ThreadPoolExecutor implemen
837              this.corePoolSize = corePoolSize;
838              if (extra > 0 && poolSize > corePoolSize) {
839                  Iterator<Worker> it = workers.iterator();
840 <                while (it.hasNext() &&
841 <                       extra > 0 &&
840 >                while (it.hasNext() &&
841 >                       extra > 0 &&
842                         poolSize > corePoolSize &&
843                         workQueue.remainingCapacity() == 0) {
844                      it.next().interruptIfIdle();
845                      --extra;
846                  }
847              }
848 <                
848 >
849          }
850          finally {
851              mainLock.unlock();
# Line 857 | Line 857 | public class ThreadPoolExecutor implemen
857       *
858       * @return the core number of threads
859       */
860 <    public int getCorePoolSize() {
860 >    public int getCorePoolSize() {
861          return corePoolSize;
862      }
863  
# Line 880 | Line 880 | public class ThreadPoolExecutor implemen
880              this.maximumPoolSize = maximumPoolSize;
881              if (extra > 0 && poolSize > maximumPoolSize) {
882                  Iterator<Worker> it = workers.iterator();
883 <                while (it.hasNext() &&
884 <                       extra > 0 &&
883 >                while (it.hasNext() &&
884 >                       extra > 0 &&
885                         poolSize > maximumPoolSize) {
886                      it.next().interruptIfIdle();
887                      --extra;
# Line 898 | Line 898 | public class ThreadPoolExecutor implemen
898       *
899       * @return the maximum allowed number of threads
900       */
901 <    public int getMaximumPoolSize() {
901 >    public int getMaximumPoolSize() {
902          return maximumPoolSize;
903      }
904  
# Line 922 | Line 922 | public class ThreadPoolExecutor implemen
922      /**
923       * Returns the thread keep-alive time, which is the amount of time
924       * which threads in excess of the core pool size may remain
925 <     * idle before being terminated.
925 >     * idle before being terminated.
926       *
927       * @param unit the desired time unit of the result
928       * @return the time limit
929       */
930 <    public long getKeepAliveTime(TimeUnit unit) {
930 >    public long getKeepAliveTime(TimeUnit unit) {
931          return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
932      }
933  
# Line 938 | Line 938 | public class ThreadPoolExecutor implemen
938       *
939       * @return the number of threads
940       */
941 <    public int getPoolSize() {
941 >    public int getPoolSize() {
942          return poolSize;
943      }
944  
# Line 948 | Line 948 | public class ThreadPoolExecutor implemen
948       *
949       * @return the number of threads
950       */
951 <    public int getActiveCount() {
951 >    public int getActiveCount() {
952          mainLock.lock();
953          try {
954              int n = 0;
# Line 969 | Line 969 | public class ThreadPoolExecutor implemen
969       *
970       * @return the number of threads
971       */
972 <    public int getLargestPoolSize() {
972 >    public int getLargestPoolSize() {
973          mainLock.lock();
974          try {
975              return largestPoolSize;
# Line 987 | Line 987 | public class ThreadPoolExecutor implemen
987       *
988       * @return the number of tasks
989       */
990 <    public long getTaskCount() {
990 >    public long getTaskCount() {
991          mainLock.lock();
992          try {
993              long n = completedTaskCount;
# Line 1012 | Line 1012 | public class ThreadPoolExecutor implemen
1012       *
1013       * @return the number of tasks
1014       */
1015 <    public long getCompletedTaskCount() {
1015 >    public long getCompletedTaskCount() {
1016          mainLock.lock();
1017          try {
1018              long n = completedTaskCount;
1019 <            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1019 >            for (Iterator<Worker> it = workers.iterator(); it.hasNext(); )
1020                  n += it.next().completedTasks;
1021              return n;
1022          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines