ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java (file contents):
Revision 1.36 by dl, Wed Apr 19 15:08:04 2006 UTC vs.
Revision 1.37 by dl, Tue Aug 29 23:30:36 2006 UTC

# Line 26 | Line 26 | import java.util.*;
26   * of the inherited tuning methods are not useful for it. In
27   * particular, because it acts as a fixed-sized pool using
28   * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
29 < * to <tt>maximumPoolSize</tt> have no useful effect.
29 > * to <tt>maximumPoolSize</tt> have no useful effect. Additionally, it
30 > * is almost never a good idea to set <tt>corePoolSize</tt> to zero or
31 > * use <tt>allowCoreThreadTimeOut</tt> because this may leave the pool
32 > * without threads to handle tasks once they become eligible to run.
33   *
34   * <p><b>Extension notes:</b> This class overrides {@link
35   * AbstractExecutorService} <tt>submit</tt> methods to generate
# Line 68 | Line 71 | public class ScheduledThreadPoolExecutor
71          extends ThreadPoolExecutor
72          implements ScheduledExecutorService {
73  
74 +    /*
75 +     * This class specializes ThreadPoolExecutor implementation by
76 +     *
77 +     * 1. Using a custom task type, ScheduledFutureTask for
78 +     *    tasks, even those that don't require scheduling (i.e.,
79 +     *    those submitted using ExecutorService execute, not
80 +     *    ScheduledExecutorService methods) which are treated as
81 +     *    delayed tasks with a delay of zero.
82 +     *
83 +     * 2. Using a custom queue (DelayedWorkQueue) based on an
84 +     *    unbounded DelayQueue. The lack of capacity constraint and
85 +     *    the fact that corePoolSize and maximumPoolSize are
86 +     *    effectively identical simplifies some execution mechanics
87 +     *    (see delayedExecute) compared to ThreadPoolExecutor
88 +     *    version.
89 +     *
90 +     *    The DelayedWorkQueue class is defined below for the sake of
91 +     *    ensuring that all elements are instances of
92 +     *    RunnableScheduledFuture.  Since DelayQueue otherwise
93 +     *    requires type be Delayed, but not necessarily Runnable, and
94 +     *    the workQueue requires the opposite, we need to explicitly
95 +     *    define a class that requires both to ensure that users don't
96 +     *    add objects that aren't RunnableScheduledFutures via
97 +     *    getQueue().add() etc.
98 +     *
99 +     * 3. Supporting optional run-after-shutdown parameters, which
100 +     *    leads to overrides of shutdown methods to remove and cancel
101 +     *    tasks that should NOT be run after shutdown, as well as
102 +     *    different recheck logic when task (re)submission overlaps
103 +     *    with a shutdown.
104 +     *
105 +     * 4. Task decoration methods to allow interception and
106 +     *    instrumentation, which are needed because subclasses cannot
107 +     *    otherwise override submit methods to get this effect. These
108 +     *    don't have any impact on pool control logic though.
109 +     */
110 +
111      /**
112       * False if should cancel/suppress periodic tasks on shutdown.
113       */
# Line 174 | Line 214 | public class ScheduledThreadPoolExecutor
214          }
215  
216          /**
217 <         * Runs a periodic task.
217 >         * Sets the next time to run for a periodic task
218           */
219 <        private void runPeriodic() {
220 <            boolean ok = ScheduledFutureTask.super.runAndReset();
221 <            boolean down = isShutdown();
222 <            // Reschedule if not cancelled and not shutdown or policy allows
223 <            if (ok && (!down ||
224 <                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
185 <                        !isTerminating()))) {
186 <                long p = period;
187 <                if (p > 0)
188 <                    time += p;
189 <                else
190 <                    time = now() - p;
191 <                ScheduledThreadPoolExecutor.super.getQueue().add(this);
192 <            }
193 <            // This might have been the final executed delayed
194 <            // task.  Wake up threads to check.
195 <            else if (down)
196 <                interruptIdleWorkers();
219 >        private void setNextRunTime() {
220 >            long p = period;
221 >            if (p > 0)
222 >                time += p;
223 >            else
224 >                time = now() - p;
225          }
226  
227          /**
228           * Overrides FutureTask version so as to reset/requeue if periodic.
229           */
230          public void run() {
231 <            if (isPeriodic())
232 <                runPeriodic();
233 <            else
231 >            boolean periodic = isPeriodic();
232 >            if (!canRunInCurrentRunState(periodic))
233 >                cancel(false);
234 >            else if (!periodic)
235                  ScheduledFutureTask.super.run();
236 +            else if (ScheduledFutureTask.super.runAndReset()) {
237 +                setNextRunTime();
238 +                reExecutePeriodic(this);
239 +            }
240          }
241      }
242  
243      /**
244 <     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
245 <     */
246 <    private void delayedExecute(Runnable command) {
247 <        if (isShutdown()) {
248 <            reject(command);
249 <            return;
250 <        }
251 <        // Prestart a thread if necessary. We cannot prestart it
252 <        // running the task because the task (probably) shouldn't be
253 <        // run yet, so thread will just idle until delay elapses.
254 <        if (getPoolSize() < getCorePoolSize())
244 >     * Returns true if can run a task given current run state
245 >     * and run-after-shutdown parameters
246 >     * @param periodic true if this task periodic, false if delayed
247 >     */
248 >    boolean canRunInCurrentRunState(boolean periodic) {
249 >        return isRunningOrShutdown(periodic?
250 >                                   continueExistingPeriodicTasksAfterShutdown :
251 >                                   executeExistingDelayedTasksAfterShutdown);
252 >    }
253 >
254 >    /**
255 >     * Main execution method for delayed or periodic tasks.  If pool
256 >     * is shut down, rejects the task. Otherwise adds task to queue
257 >     * and starts a thread, if necessary, to run it.  (We cannot
258 >     * prestart the thread to run the task because the task (probably)
259 >     * shouldn't be run yet,) If the pool is shut down while the task
260 >     * is being added, cancel and remove it if required by state and
261 >     * run-after-shutdown parameters
262 >     * @param task the task
263 >     */
264 >    private void delayedExecute(RunnableScheduledFuture<?> task) {
265 >        if (isShutdown())
266 >            reject(task);
267 >        else {
268 >            super.getQueue().add(task);
269 >            if (isShutdown() &&
270 >                !canRunInCurrentRunState(task.isPeriodic()) &&
271 >                remove(task))
272 >                task.cancel(false);
273              prestartCoreThread();
274 +        }
275 +    }
276  
277 <        super.getQueue().add(command);
277 >    /**
278 >     * Requeues a periodic task unless current run state precludes
279 >     * it. Same idea as delayedExecute except drops task rather than
280 >     * rejecting.
281 >     * @param task the task
282 >     */
283 >    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
284 >        if (canRunInCurrentRunState(true)) {
285 >            super.getQueue().add(task);
286 >            if (!canRunInCurrentRunState(true) && remove(task))
287 >                task.cancel(false);
288 >            prestartCoreThread();
289 >        }
290      }
291  
292      /**
293       * Cancels and clears the queue of all tasks that should not be run
294 <     * due to shutdown policy.
294 >     * due to shutdown policy. Invoked within super.shutdown.
295       */
296 <    private void cancelUnwantedTasks() {
297 <        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
298 <        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
296 >    @Override void onShutdown() {
297 >        BlockingQueue<Runnable> q = super.getQueue();
298 >        boolean keepDelayed =
299 >            getExecuteExistingDelayedTasksAfterShutdownPolicy();
300 >        boolean keepPeriodic =
301 >            getContinueExistingPeriodicTasksAfterShutdownPolicy();
302          if (!keepDelayed && !keepPeriodic)
303 <            super.getQueue().clear();
304 <        else if (keepDelayed || keepPeriodic) {
305 <            Object[] entries = super.getQueue().toArray();
303 >            q.clear();
304 >        else {
305 >            // Traverse snapshot to avoid iterator exceptions
306 >            Object[] entries = q.toArray();
307              for (int i = 0; i < entries.length; ++i) {
308                  Object e = entries[i];
309                  if (e instanceof RunnableScheduledFuture) {
310 <                    RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
311 <                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
312 <                        t.cancel(false);
310 >                    RunnableScheduledFuture<?> t =
311 >                        (RunnableScheduledFuture<?>)e;
312 >                    if ((t.isPeriodic()? !keepPeriodic : !keepDelayed) ||
313 >                        t.isCancelled()) { // also remove if already cancelled
314 >                        if (q.remove(t))
315 >                            t.cancel(false);
316 >                    }
317                  }
318              }
246            entries = null;
247            purge();
319          }
320      }
321  
251    public boolean remove(Runnable task) {
252        if (!(task instanceof RunnableScheduledFuture))
253            return false;
254        return getQueue().remove(task);
255    }
256
322      /**
323       * Modifies or replaces the task used to execute a runnable.
324       * This method can be used to override the concrete
# Line 291 | Line 356 | public class ScheduledThreadPoolExecutor
356       * pool size.
357       *
358       * @param corePoolSize the number of threads to keep in the pool,
359 <     * even if they are idle
359 >     * even if they are idle, unless allowCoreThreadTimeOut is set
360       * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
361       */
362      public ScheduledThreadPoolExecutor(int corePoolSize) {
# Line 304 | Line 369 | public class ScheduledThreadPoolExecutor
369       * initial parameters.
370       *
371       * @param corePoolSize the number of threads to keep in the pool,
372 <     * even if they are idle
372 >     * even if they are idle, unless allowCoreThreadTimeOut is set
373       * @param threadFactory the factory to use when the executor
374       * creates a new thread
375       * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
# Line 321 | Line 386 | public class ScheduledThreadPoolExecutor
386       * initial parameters.
387       *
388       * @param corePoolSize the number of threads to keep in the pool,
389 <     * even if they are idle
389 >     * even if they are idle, unless allowCoreThreadTimeOut is set
390       * @param handler the handler to use when execution is blocked
391       * because the thread bounds and queue capacities are reached
392       * @throws IllegalArgumentException if <tt>corePoolSize &lt; 0</tt>
# Line 338 | Line 403 | public class ScheduledThreadPoolExecutor
403       * initial parameters.
404       *
405       * @param corePoolSize the number of threads to keep in the pool,
406 <     * even if they are idle
406 >     * even if they are idle, unless allowCoreThreadTimeOut is set
407       * @param threadFactory the factory to use when the executor
408       * creates a new thread
409       * @param handler the handler to use when execution is blocked
# Line 353 | Line 418 | public class ScheduledThreadPoolExecutor
418                new DelayedWorkQueue(), threadFactory, handler);
419      }
420  
421 +    /**
422 +     * @throws RejectedExecutionException {@inheritDoc}
423 +     * @throws NullPointerException       {@inheritDoc}
424 +     */
425      public ScheduledFuture<?> schedule(Runnable command,
426                                         long delay,
427                                         TimeUnit unit) {
428          if (command == null || unit == null)
429              throw new NullPointerException();
430 +        if (delay < 0) delay = 0;
431          long triggerTime = now() + unit.toNanos(delay);
432          RunnableScheduledFuture<?> t = decorateTask(command,
433              new ScheduledFutureTask<Boolean>(command, null, triggerTime));
# Line 365 | Line 435 | public class ScheduledThreadPoolExecutor
435          return t;
436      }
437  
438 +    /**
439 +     * @throws RejectedExecutionException {@inheritDoc}
440 +     * @throws NullPointerException       {@inheritDoc}
441 +     */
442      public <V> ScheduledFuture<V> schedule(Callable<V> callable,
443                                             long delay,
444                                             TimeUnit unit) {
# Line 378 | Line 452 | public class ScheduledThreadPoolExecutor
452          return t;
453      }
454  
455 +    /**
456 +     * @throws RejectedExecutionException {@inheritDoc}
457 +     * @throws NullPointerException       {@inheritDoc}
458 +     * @throws IllegalArgumentException   {@inheritDoc}
459 +     */
460      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
461                                                    long initialDelay,
462                                                    long period,
# Line 397 | Line 476 | public class ScheduledThreadPoolExecutor
476          return t;
477      }
478  
479 +    /**
480 +     * @throws RejectedExecutionException {@inheritDoc}
481 +     * @throws NullPointerException       {@inheritDoc}
482 +     * @throws IllegalArgumentException   {@inheritDoc}
483 +     */
484      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
485                                                       long initialDelay,
486                                                       long delay,
# Line 416 | Line 500 | public class ScheduledThreadPoolExecutor
500          return t;
501      }
502  
419
503      /**
504       * Executes command with zero required delay. This has effect
505       * equivalent to <tt>schedule(command, 0, anyUnit)</tt>.  Note
# Line 438 | Line 521 | public class ScheduledThreadPoolExecutor
521  
522      // Override AbstractExecutorService methods
523  
524 +    /**
525 +     * @throws RejectedExecutionException {@inheritDoc}
526 +     * @throws NullPointerException       {@inheritDoc}
527 +     */
528      public Future<?> submit(Runnable task) {
529          return schedule(task, 0, TimeUnit.NANOSECONDS);
530      }
531  
532 +    /**
533 +     * @throws RejectedExecutionException {@inheritDoc}
534 +     * @throws NullPointerException       {@inheritDoc}
535 +     */
536      public <T> Future<T> submit(Runnable task, T result) {
537          return schedule(Executors.callable(task, result),
538                          0, TimeUnit.NANOSECONDS);
539      }
540  
541 +    /**
542 +     * @throws RejectedExecutionException {@inheritDoc}
543 +     * @throws NullPointerException       {@inheritDoc}
544 +     */
545      public <T> Future<T> submit(Callable<T> task) {
546          return schedule(task, 0, TimeUnit.NANOSECONDS);
547      }
548  
549      /**
550 <     * Sets the policy on whether to continue executing existing periodic
551 <     * tasks even when this executor has been <tt>shutdown</tt>. In
552 <     * this case, these tasks will only terminate upon
553 <     * <tt>shutdownNow</tt>, or after setting the policy to
554 <     * <tt>false</tt> when already shutdown. This value is by default
555 <     * false.
550 >     * Sets the policy on whether to continue executing existing
551 >     * periodic tasks even when this executor has been
552 >     * <tt>shutdown</tt>. In this case, these tasks will only
553 >     * terminate upon <tt>shutdownNow</tt>, or after setting the
554 >     * policy to <tt>false</tt> when already shutdown. This value is
555 >     * by default false.
556       *
557       * @param value if true, continue after shutdown, else don't.
558       * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
559       */
560      public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
561          continueExistingPeriodicTasksAfterShutdown = value;
562 <        if (!value && isShutdown())
563 <            cancelUnwantedTasks();
562 >        if (!value && isShutdown()) {
563 >            onShutdown();
564 >            tryTerminate();
565 >        }
566      }
567  
568      /**
# Line 496 | Line 593 | public class ScheduledThreadPoolExecutor
593       */
594      public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
595          executeExistingDelayedTasksAfterShutdown = value;
596 <        if (!value && isShutdown())
597 <            cancelUnwantedTasks();
596 >        if (!value && isShutdown()) {
597 >            onShutdown();
598 >            tryTerminate();
599 >        }
600      }
601  
602      /**
# Line 515 | Line 614 | public class ScheduledThreadPoolExecutor
614          return executeExistingDelayedTasksAfterShutdown;
615      }
616  
518
617      /**
618       * Initiates an orderly shutdown in which previously submitted
619       * tasks are executed, but no new tasks will be accepted. If the
# Line 527 | Line 625 | public class ScheduledThreadPoolExecutor
625       * tasks will be cancelled.
626       */
627      public void shutdown() {
530        cancelUnwantedTasks();
628          super.shutdown();
629      }
630  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines