ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.32
Committed: Sun Aug 28 13:31:27 2005 UTC (18 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.31: +15 -6 lines
Log Message:
Clarify subclassing guidance

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
9 import java.util.concurrent.atomic.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27 * of the inherited tuning methods are not useful for it. In
28 * particular, because it acts as a fixed-sized pool using
29 * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30 * to <tt>maximumPoolSize</tt> have no useful effect.
31 *
32 * <p><b>Extension notes:</b> This class overrides {@link
33 * AbstractExecutorService} <tt>submit</tt> methods to generate
34 * internal objects to control per-task delays and scheduling. To
35 * preserve functionality, any further overrides of these methods in
36 * subclasses must invoke superclass versions, which effectively
37 * disables additional task customization. However, this class
38 * provides alternative protected extension method
39 * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
40 * <tt>Callable</tt>) that can be used to customize the concrete task
41 * types used to execute commands entered via <tt>execute</tt>,
42 * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>,
43 * and <tt>scheduleWithFixedDelay</tt>. By default, a
44 * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending
45 * {@link FutureTask}. However, this may be modified or replaced using
46 * subclasses of the form:
47 *
48 * <pre>
49 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
50 *
51 * static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
52 *
53 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
54 * Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
55 * return new CustomTask&lt;V&gt;(r, task);
56 * }
57 *
58 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
59 * Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
60 * return new CustomTask&lt;V&gt;(c, task);
61 * }
62 * // ... add constructors, etc.
63 * }
64 * </pre>
65 * @since 1.5
66 * @author Doug Lea
67 */
68 public class ScheduledThreadPoolExecutor
69 extends ThreadPoolExecutor
70 implements ScheduledExecutorService {
71
72 /**
73 * False if should cancel/suppress periodic tasks on shutdown.
74 */
75 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
76
77 /**
78 * False if should cancel non-periodic tasks on shutdown.
79 */
80 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
81
82 /**
83 * Sequence number to break scheduling ties, and in turn to
84 * guarantee FIFO order among tied entries.
85 */
86 private static final AtomicLong sequencer = new AtomicLong(0);
87
88 /** Base of nanosecond timings, to avoid wrapping */
89 private static final long NANO_ORIGIN = System.nanoTime();
90
91 /**
92 * Returns nanosecond time offset by origin
93 */
94 final long now() {
95 return System.nanoTime() - NANO_ORIGIN;
96 }
97
98 private class ScheduledFutureTask<V>
99 extends FutureTask<V> implements RunnableScheduledFuture<V> {
100
101 /** Sequence number to break ties FIFO */
102 private final long sequenceNumber;
103 /** The time the task is enabled to execute in nanoTime units */
104 private long time;
105 /**
106 * Period in nanoseconds for repeating tasks. A positive
107 * value indicates fixed-rate execution. A negative value
108 * indicates fixed-delay execution. A value of 0 indicates a
109 * non-repeating task.
110 */
111 private final long period;
112
113 /**
114 * Creates a one-shot action with given nanoTime-based trigger time.
115 */
116 ScheduledFutureTask(Runnable r, V result, long ns) {
117 super(r, result);
118 this.time = ns;
119 this.period = 0;
120 this.sequenceNumber = sequencer.getAndIncrement();
121 }
122
123 /**
124 * Creates a periodic action with given nano time and period.
125 */
126 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
127 super(r, result);
128 this.time = ns;
129 this.period = period;
130 this.sequenceNumber = sequencer.getAndIncrement();
131 }
132
133 /**
134 * Creates a one-shot action with given nanoTime-based trigger.
135 */
136 ScheduledFutureTask(Callable<V> callable, long ns) {
137 super(callable);
138 this.time = ns;
139 this.period = 0;
140 this.sequenceNumber = sequencer.getAndIncrement();
141 }
142
143 public long getDelay(TimeUnit unit) {
144 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
145 return d;
146 }
147
148 public int compareTo(Delayed other) {
149 if (other == this) // compare zero ONLY if same object
150 return 0;
151 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
152 long diff = time - x.time;
153 if (diff < 0)
154 return -1;
155 else if (diff > 0)
156 return 1;
157 else if (sequenceNumber < x.sequenceNumber)
158 return -1;
159 else
160 return 1;
161 }
162
163 /**
164 * Returns true if this is a periodic (not a one-shot) action.
165 *
166 * @return true if periodic
167 */
168 public boolean isPeriodic() {
169 return period != 0;
170 }
171
172 /**
173 * Runs a periodic task.
174 */
175 private void runPeriodic() {
176 boolean ok = ScheduledFutureTask.super.runAndReset();
177 boolean down = isShutdown();
178 // Reschedule if not cancelled and not shutdown or policy allows
179 if (ok && (!down ||
180 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
181 !isTerminating()))) {
182 long p = period;
183 if (p > 0)
184 time += p;
185 else
186 time = now() - p;
187 ScheduledThreadPoolExecutor.super.getQueue().add(this);
188 }
189 // This might have been the final executed delayed
190 // task. Wake up threads to check.
191 else if (down)
192 interruptIdleWorkers();
193 }
194
195 /**
196 * Overrides FutureTask version so as to reset/requeue if periodic.
197 */
198 public void run() {
199 if (isPeriodic())
200 runPeriodic();
201 else
202 ScheduledFutureTask.super.run();
203 }
204 }
205
206 /**
207 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
208 */
209 private void delayedExecute(Runnable command) {
210 if (isShutdown()) {
211 reject(command);
212 return;
213 }
214 // Prestart a thread if necessary. We cannot prestart it
215 // running the task because the task (probably) shouldn't be
216 // run yet, so thread will just idle until delay elapses.
217 if (getPoolSize() < getCorePoolSize())
218 prestartCoreThread();
219
220 super.getQueue().add(command);
221 }
222
223 /**
224 * Cancels and clears the queue of all tasks that should not be run
225 * due to shutdown policy.
226 */
227 private void cancelUnwantedTasks() {
228 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
229 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
230 if (!keepDelayed && !keepPeriodic)
231 super.getQueue().clear();
232 else if (keepDelayed || keepPeriodic) {
233 Object[] entries = super.getQueue().toArray();
234 for (int i = 0; i < entries.length; ++i) {
235 Object e = entries[i];
236 if (e instanceof RunnableScheduledFuture) {
237 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
238 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
239 t.cancel(false);
240 }
241 }
242 entries = null;
243 purge();
244 }
245 }
246
247 public boolean remove(Runnable task) {
248 if (!(task instanceof RunnableScheduledFuture))
249 return false;
250 return getQueue().remove(task);
251 }
252
253 /**
254 * Modifies or replaces the task used to execute a runnable.
255 * This method can be used to override the concrete
256 * class used for managing internal tasks.
257 * The default implementation simply returns the given task.
258 *
259 * @param runnable the submitted Runnable
260 * @param task the task created to execute the runnable
261 * @return a task that can execute the runnable
262 * @since 1.6
263 */
264 protected <V> RunnableScheduledFuture<V> decorateTask(
265 Runnable runnable, RunnableScheduledFuture<V> task) {
266 return task;
267 }
268
269 /**
270 * Modifies or replaces the task used to execute a callable.
271 * This method can be used to override the concrete
272 * class used for managing internal tasks.
273 * The default implementation simply returns the given task.
274 *
275 * @param callable the submitted Callable
276 * @param task the task created to execute the callable
277 * @return a task that can execute the callable
278 * @since 1.6
279 */
280 protected <V> RunnableScheduledFuture<V> decorateTask(
281 Callable<V> callable, RunnableScheduledFuture<V> task) {
282 return task;
283 }
284
285 /**
286 * Creates a new ScheduledThreadPoolExecutor with the given core
287 * pool size.
288 *
289 * @param corePoolSize the number of threads to keep in the pool,
290 * even if they are idle
291 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
292 */
293 public ScheduledThreadPoolExecutor(int corePoolSize) {
294 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
295 new DelayedWorkQueue());
296 }
297
298 /**
299 * Creates a new ScheduledThreadPoolExecutor with the given
300 * initial parameters.
301 *
302 * @param corePoolSize the number of threads to keep in the pool,
303 * even if they are idle
304 * @param threadFactory the factory to use when the executor
305 * creates a new thread
306 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
307 * @throws NullPointerException if threadFactory is null
308 */
309 public ScheduledThreadPoolExecutor(int corePoolSize,
310 ThreadFactory threadFactory) {
311 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
312 new DelayedWorkQueue(), threadFactory);
313 }
314
315 /**
316 * Creates a new ScheduledThreadPoolExecutor with the given
317 * initial parameters.
318 *
319 * @param corePoolSize the number of threads to keep in the pool,
320 * even if they are idle
321 * @param handler the handler to use when execution is blocked
322 * because the thread bounds and queue capacities are reached
323 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
324 * @throws NullPointerException if handler is null
325 */
326 public ScheduledThreadPoolExecutor(int corePoolSize,
327 RejectedExecutionHandler handler) {
328 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
329 new DelayedWorkQueue(), handler);
330 }
331
332 /**
333 * Creates a new ScheduledThreadPoolExecutor with the given
334 * initial parameters.
335 *
336 * @param corePoolSize the number of threads to keep in the pool,
337 * even if they are idle
338 * @param threadFactory the factory to use when the executor
339 * creates a new thread
340 * @param handler the handler to use when execution is blocked
341 * because the thread bounds and queue capacities are reached.
342 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
343 * @throws NullPointerException if threadFactory or handler is null
344 */
345 public ScheduledThreadPoolExecutor(int corePoolSize,
346 ThreadFactory threadFactory,
347 RejectedExecutionHandler handler) {
348 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
349 new DelayedWorkQueue(), threadFactory, handler);
350 }
351
352 public ScheduledFuture<?> schedule(Runnable command,
353 long delay,
354 TimeUnit unit) {
355 if (command == null || unit == null)
356 throw new NullPointerException();
357 long triggerTime = now() + unit.toNanos(delay);
358 RunnableScheduledFuture<?> t = decorateTask(command,
359 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
360 delayedExecute(t);
361 return t;
362 }
363
364 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
365 long delay,
366 TimeUnit unit) {
367 if (callable == null || unit == null)
368 throw new NullPointerException();
369 if (delay < 0) delay = 0;
370 long triggerTime = now() + unit.toNanos(delay);
371 RunnableScheduledFuture<V> t = decorateTask(callable,
372 new ScheduledFutureTask<V>(callable, triggerTime));
373 delayedExecute(t);
374 return t;
375 }
376
377 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
378 long initialDelay,
379 long period,
380 TimeUnit unit) {
381 if (command == null || unit == null)
382 throw new NullPointerException();
383 if (period <= 0)
384 throw new IllegalArgumentException();
385 if (initialDelay < 0) initialDelay = 0;
386 long triggerTime = now() + unit.toNanos(initialDelay);
387 RunnableScheduledFuture<?> t = decorateTask(command,
388 new ScheduledFutureTask<Object>(command,
389 null,
390 triggerTime,
391 unit.toNanos(period)));
392 delayedExecute(t);
393 return t;
394 }
395
396 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
397 long initialDelay,
398 long delay,
399 TimeUnit unit) {
400 if (command == null || unit == null)
401 throw new NullPointerException();
402 if (delay <= 0)
403 throw new IllegalArgumentException();
404 if (initialDelay < 0) initialDelay = 0;
405 long triggerTime = now() + unit.toNanos(initialDelay);
406 RunnableScheduledFuture<?> t = decorateTask(command,
407 new ScheduledFutureTask<Boolean>(command,
408 null,
409 triggerTime,
410 unit.toNanos(-delay)));
411 delayedExecute(t);
412 return t;
413 }
414
415
416 /**
417 * Executes command with zero required delay. This has effect
418 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
419 * that inspections of the queue and of the list returned by
420 * <tt>shutdownNow</tt> will access the zero-delayed
421 * {@link ScheduledFuture}, not the <tt>command</tt> itself.
422 *
423 * @param command the task to execute
424 * @throws RejectedExecutionException at discretion of
425 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
426 * for execution because the executor has been shut down.
427 * @throws NullPointerException if command is null
428 */
429 public void execute(Runnable command) {
430 if (command == null)
431 throw new NullPointerException();
432 schedule(command, 0, TimeUnit.NANOSECONDS);
433 }
434
435 // Override AbstractExecutorService methods
436
437 public Future<?> submit(Runnable task) {
438 return schedule(task, 0, TimeUnit.NANOSECONDS);
439 }
440
441 public <T> Future<T> submit(Runnable task, T result) {
442 return schedule(Executors.callable(task, result),
443 0, TimeUnit.NANOSECONDS);
444 }
445
446 public <T> Future<T> submit(Callable<T> task) {
447 return schedule(task, 0, TimeUnit.NANOSECONDS);
448 }
449
450 /**
451 * Sets the policy on whether to continue executing existing periodic
452 * tasks even when this executor has been <tt>shutdown</tt>. In
453 * this case, these tasks will only terminate upon
454 * <tt>shutdownNow</tt>, or after setting the policy to
455 * <tt>false</tt> when already shutdown. This value is by default
456 * false.
457 *
458 * @param value if true, continue after shutdown, else don't.
459 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
460 */
461 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
462 continueExistingPeriodicTasksAfterShutdown = value;
463 if (!value && isShutdown())
464 cancelUnwantedTasks();
465 }
466
467 /**
468 * Gets the policy on whether to continue executing existing
469 * periodic tasks even when this executor has been
470 * <tt>shutdown</tt>. In this case, these tasks will only
471 * terminate upon <tt>shutdownNow</tt> or after setting the policy
472 * to <tt>false</tt> when already shutdown. This value is by
473 * default false.
474 *
475 * @return true if will continue after shutdown
476 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
477 */
478 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
479 return continueExistingPeriodicTasksAfterShutdown;
480 }
481
482 /**
483 * Sets the policy on whether to execute existing delayed
484 * tasks even when this executor has been <tt>shutdown</tt>. In
485 * this case, these tasks will only terminate upon
486 * <tt>shutdownNow</tt>, or after setting the policy to
487 * <tt>false</tt> when already shutdown. This value is by default
488 * true.
489 *
490 * @param value if true, execute after shutdown, else don't.
491 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
492 */
493 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
494 executeExistingDelayedTasksAfterShutdown = value;
495 if (!value && isShutdown())
496 cancelUnwantedTasks();
497 }
498
499 /**
500 * Gets the policy on whether to execute existing delayed
501 * tasks even when this executor has been <tt>shutdown</tt>. In
502 * this case, these tasks will only terminate upon
503 * <tt>shutdownNow</tt>, or after setting the policy to
504 * <tt>false</tt> when already shutdown. This value is by default
505 * true.
506 *
507 * @return true if will execute after shutdown
508 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
509 */
510 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
511 return executeExistingDelayedTasksAfterShutdown;
512 }
513
514
515 /**
516 * Initiates an orderly shutdown in which previously submitted
517 * tasks are executed, but no new tasks will be accepted. If the
518 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
519 * been set <tt>false</tt>, existing delayed tasks whose delays
520 * have not yet elapsed are cancelled. And unless the
521 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
522 * been set <tt>true</tt>, future executions of existing periodic
523 * tasks will be cancelled.
524 */
525 public void shutdown() {
526 cancelUnwantedTasks();
527 super.shutdown();
528 }
529
530 /**
531 * Attempts to stop all actively executing tasks, halts the
532 * processing of waiting tasks, and returns a list of the tasks
533 * that were awaiting execution.
534 *
535 * <p>There are no guarantees beyond best-effort attempts to stop
536 * processing actively executing tasks. This implementation
537 * cancels tasks via {@link Thread#interrupt}, so any task that
538 * fails to respond to interrupts may never terminate.
539 *
540 * @return list of tasks that never commenced execution. Each
541 * element of this list is a {@link ScheduledFuture},
542 * including those tasks submitted using <tt>execute</tt>, which
543 * are for scheduling purposes used as the basis of a zero-delay
544 * <tt>ScheduledFuture</tt>.
545 * @throws SecurityException {@inheritDoc}
546 */
547 public List<Runnable> shutdownNow() {
548 return super.shutdownNow();
549 }
550
551 /**
552 * Returns the task queue used by this executor. Each element of
553 * this queue is a {@link ScheduledFuture}, including those
554 * tasks submitted using <tt>execute</tt> which are for scheduling
555 * purposes used as the basis of a zero-delay
556 * <tt>ScheduledFuture</tt>. Iteration over this queue is
557 * <em>not</em> guaranteed to traverse tasks in the order in
558 * which they will execute.
559 *
560 * @return the task queue
561 */
562 public BlockingQueue<Runnable> getQueue() {
563 return super.getQueue();
564 }
565
566 /**
567 * An annoying wrapper class to convince javac to use a
568 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
569 */
570 private static class DelayedWorkQueue
571 extends AbstractCollection<Runnable>
572 implements BlockingQueue<Runnable> {
573
574 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
575 public Runnable poll() { return dq.poll(); }
576 public Runnable peek() { return dq.peek(); }
577 public Runnable take() throws InterruptedException { return dq.take(); }
578 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
579 return dq.poll(timeout, unit);
580 }
581
582 public boolean add(Runnable x) {
583 return dq.add((RunnableScheduledFuture)x);
584 }
585 public boolean offer(Runnable x) {
586 return dq.offer((RunnableScheduledFuture)x);
587 }
588 public void put(Runnable x) {
589 dq.put((RunnableScheduledFuture)x);
590 }
591 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
592 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
593 }
594
595 public Runnable remove() { return dq.remove(); }
596 public Runnable element() { return dq.element(); }
597 public void clear() { dq.clear(); }
598 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
599 public int drainTo(Collection<? super Runnable> c, int maxElements) {
600 return dq.drainTo(c, maxElements);
601 }
602
603 public int remainingCapacity() { return dq.remainingCapacity(); }
604 public boolean remove(Object x) { return dq.remove(x); }
605 public boolean contains(Object x) { return dq.contains(x); }
606 public int size() { return dq.size(); }
607 public boolean isEmpty() { return dq.isEmpty(); }
608 public Object[] toArray() { return dq.toArray(); }
609 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
610 public Iterator<Runnable> iterator() {
611 return new Iterator<Runnable>() {
612 private Iterator<RunnableScheduledFuture> it = dq.iterator();
613 public boolean hasNext() { return it.hasNext(); }
614 public Runnable next() { return it.next(); }
615 public void remove() { it.remove(); }
616 };
617 }
618 }
619 }