ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.29
Committed: Mon Aug 15 20:45:30 2005 UTC (18 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.28: +10 -10 lines
Log Message:
typo; whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
9 import java.util.concurrent.atomic.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27 * of the inherited tuning methods are not useful for it. In
28 * particular, because it acts as a fixed-sized pool using
29 * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30 * to <tt>maximumPoolSize</tt> have no useful effect.
31 *
32 * <p>This class supports protected extension method
33 * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
34 * <tt>Callable</tt>) that can be used to customize the concrete task
35 * types used to execute commands. By default,
36 * a <tt>ScheduledThreadPoolExecutor</tt> uses
37 * a task type extending {@link FutureTask}. However, this may
38 * be modified or replaced using subclasses of the form:
39 * <pre>
40 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
41 *
42 * static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
43 *
44 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
45 * Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
46 * return new CustomTask&lt;V&gt;(r, task);
47 * }
48 *
49 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
50 * Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
51 * return new CustomTask&lt;V&gt;(c, task);
52 * }
53 * // ... add constructors, etc.
54 * }
55 * </pre>
56 * @since 1.5
57 * @author Doug Lea
58 */
59 public class ScheduledThreadPoolExecutor
60 extends ThreadPoolExecutor
61 implements ScheduledExecutorService {
62
63 /**
64 * False if should cancel/suppress periodic tasks on shutdown.
65 */
66 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
67
68 /**
69 * False if should cancel non-periodic tasks on shutdown.
70 */
71 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
72
73 /**
74 * Sequence number to break scheduling ties, and in turn to
75 * guarantee FIFO order among tied entries.
76 */
77 private static final AtomicLong sequencer = new AtomicLong(0);
78
79 /** Base of nanosecond timings, to avoid wrapping */
80 private static final long NANO_ORIGIN = System.nanoTime();
81
82 /**
83 * Returns nanosecond time offset by origin
84 */
85 final long now() {
86 return System.nanoTime() - NANO_ORIGIN;
87 }
88
89 private class ScheduledFutureTask<V>
90 extends FutureTask<V> implements RunnableScheduledFuture<V> {
91
92 /** Sequence number to break ties FIFO */
93 private final long sequenceNumber;
94 /** The time the task is enabled to execute in nanoTime units */
95 private long time;
96 /**
97 * Period in nanoseconds for repeating tasks. A positive
98 * value indicates fixed-rate execution. A negative value
99 * indicates fixed-delay execution. A value of 0 indicates a
100 * non-repeating task.
101 */
102 private final long period;
103
104 /**
105 * Creates a one-shot action with given nanoTime-based trigger time
106 */
107 ScheduledFutureTask(Runnable r, V result, long ns) {
108 super(r, result);
109 this.time = ns;
110 this.period = 0;
111 this.sequenceNumber = sequencer.getAndIncrement();
112 }
113
114 /**
115 * Creates a periodic action with given nano time and period
116 */
117 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
118 super(r, result);
119 this.time = ns;
120 this.period = period;
121 this.sequenceNumber = sequencer.getAndIncrement();
122 }
123
124 /**
125 * Creates a one-shot action with given nanoTime-based trigger
126 */
127 ScheduledFutureTask(Callable<V> callable, long ns) {
128 super(callable);
129 this.time = ns;
130 this.period = 0;
131 this.sequenceNumber = sequencer.getAndIncrement();
132 }
133
134 public long getDelay(TimeUnit unit) {
135 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
136 return d;
137 }
138
139 public int compareTo(Delayed other) {
140 if (other == this) // compare zero ONLY if same object
141 return 0;
142 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
143 long diff = time - x.time;
144 if (diff < 0)
145 return -1;
146 else if (diff > 0)
147 return 1;
148 else if (sequenceNumber < x.sequenceNumber)
149 return -1;
150 else
151 return 1;
152 }
153
154 /**
155 * Returns true if this is a periodic (not a one-shot) action.
156 * @return true if periodic
157 */
158 public boolean isPeriodic() {
159 return period != 0;
160 }
161
162 /**
163 * Run a periodic task
164 */
165 private void runPeriodic() {
166 boolean ok = ScheduledFutureTask.super.runAndReset();
167 boolean down = isShutdown();
168 // Reschedule if not cancelled and not shutdown or policy allows
169 if (ok && (!down ||
170 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
171 !isTerminating()))) {
172 long p = period;
173 if (p > 0)
174 time += p;
175 else
176 time = now() - p;
177 ScheduledThreadPoolExecutor.super.getQueue().add(this);
178 }
179 // This might have been the final executed delayed
180 // task. Wake up threads to check.
181 else if (down)
182 interruptIdleWorkers();
183 }
184
185 /**
186 * Overrides FutureTask version so as to reset/requeue if periodic.
187 */
188 public void run() {
189 if (isPeriodic())
190 runPeriodic();
191 else
192 ScheduledFutureTask.super.run();
193 }
194 }
195
196 /**
197 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
198 */
199 private void delayedExecute(Runnable command) {
200 if (isShutdown()) {
201 reject(command);
202 return;
203 }
204 // Prestart a thread if necessary. We cannot prestart it
205 // running the task because the task (probably) shouldn't be
206 // run yet, so thread will just idle until delay elapses.
207 if (getPoolSize() < getCorePoolSize())
208 prestartCoreThread();
209
210 super.getQueue().add(command);
211 }
212
213 /**
214 * Cancels and clears the queue of all tasks that should not be run
215 * due to shutdown policy.
216 */
217 private void cancelUnwantedTasks() {
218 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
219 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
220 if (!keepDelayed && !keepPeriodic)
221 super.getQueue().clear();
222 else if (keepDelayed || keepPeriodic) {
223 Object[] entries = super.getQueue().toArray();
224 for (int i = 0; i < entries.length; ++i) {
225 Object e = entries[i];
226 if (e instanceof RunnableScheduledFuture) {
227 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
228 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
229 t.cancel(false);
230 }
231 }
232 entries = null;
233 purge();
234 }
235 }
236
237 public boolean remove(Runnable task) {
238 if (!(task instanceof RunnableScheduledFuture))
239 return false;
240 return getQueue().remove(task);
241 }
242
243 /**
244 * Modify or replace the task used to execute a runnable.
245 * This method can be used to override the concrete
246 * class used for managing internal tasks.
247 * The default implementation simply returns the given
248 * task.
249 *
250 * @param runnable the submitted Runnable
251 * @param task the task created to execute the runnable
252 * @return a task that can execute the runnable
253 * @since 1.6
254 */
255 protected <V> RunnableScheduledFuture<V> decorateTask(
256 Runnable runnable, RunnableScheduledFuture<V> task) {
257 return task;
258 }
259
260 /**
261 * Modify or replace the task used to execute a callable.
262 * This method can be used to override the concrete
263 * class used for managing internal tasks.
264 * The default implementation simply returns the given
265 * task.
266 *
267 * @param callable the submitted Callable
268 * @param task the task created to execute the callable
269 * @return a task that can execute the callable
270 * @since 1.6
271 */
272 protected <V> RunnableScheduledFuture<V> decorateTask(
273 Callable<V> callable, RunnableScheduledFuture<V> task) {
274 return task;
275 }
276
277 /**
278 * Creates a new ScheduledThreadPoolExecutor with the given core
279 * pool size.
280 *
281 * @param corePoolSize the number of threads to keep in the pool,
282 * even if they are idle.
283 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
284 */
285 public ScheduledThreadPoolExecutor(int corePoolSize) {
286 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
287 new DelayedWorkQueue());
288 }
289
290 /**
291 * Creates a new ScheduledThreadPoolExecutor with the given
292 * initial parameters.
293 *
294 * @param corePoolSize the number of threads to keep in the pool,
295 * even if they are idle.
296 * @param threadFactory the factory to use when the executor
297 * creates a new thread.
298 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
299 * @throws NullPointerException if threadFactory is null
300 */
301 public ScheduledThreadPoolExecutor(int corePoolSize,
302 ThreadFactory threadFactory) {
303 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
304 new DelayedWorkQueue(), threadFactory);
305 }
306
307 /**
308 * Creates a new ScheduledThreadPoolExecutor with the given
309 * initial parameters.
310 *
311 * @param corePoolSize the number of threads to keep in the pool,
312 * even if they are idle.
313 * @param handler the handler to use when execution is blocked
314 * because the thread bounds and queue capacities are reached.
315 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
316 * @throws NullPointerException if handler is null
317 */
318 public ScheduledThreadPoolExecutor(int corePoolSize,
319 RejectedExecutionHandler handler) {
320 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
321 new DelayedWorkQueue(), handler);
322 }
323
324 /**
325 * Creates a new ScheduledThreadPoolExecutor with the given
326 * initial parameters.
327 *
328 * @param corePoolSize the number of threads to keep in the pool,
329 * even if they are idle.
330 * @param threadFactory the factory to use when the executor
331 * creates a new thread.
332 * @param handler the handler to use when execution is blocked
333 * because the thread bounds and queue capacities are reached.
334 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
335 * @throws NullPointerException if threadFactory or handler is null
336 */
337 public ScheduledThreadPoolExecutor(int corePoolSize,
338 ThreadFactory threadFactory,
339 RejectedExecutionHandler handler) {
340 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
341 new DelayedWorkQueue(), threadFactory, handler);
342 }
343
344 public ScheduledFuture<?> schedule(Runnable command,
345 long delay,
346 TimeUnit unit) {
347 if (command == null || unit == null)
348 throw new NullPointerException();
349 long triggerTime = now() + unit.toNanos(delay);
350 RunnableScheduledFuture<?> t = decorateTask(command,
351 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
352 delayedExecute(t);
353 return t;
354 }
355
356 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
357 long delay,
358 TimeUnit unit) {
359 if (callable == null || unit == null)
360 throw new NullPointerException();
361 if (delay < 0) delay = 0;
362 long triggerTime = now() + unit.toNanos(delay);
363 RunnableScheduledFuture<V> t = decorateTask(callable,
364 new ScheduledFutureTask<V>(callable, triggerTime));
365 delayedExecute(t);
366 return t;
367 }
368
369 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
370 long initialDelay,
371 long period,
372 TimeUnit unit) {
373 if (command == null || unit == null)
374 throw new NullPointerException();
375 if (period <= 0)
376 throw new IllegalArgumentException();
377 if (initialDelay < 0) initialDelay = 0;
378 long triggerTime = now() + unit.toNanos(initialDelay);
379 RunnableScheduledFuture<?> t = decorateTask(command,
380 new ScheduledFutureTask<Object>(command,
381 null,
382 triggerTime,
383 unit.toNanos(period)));
384 delayedExecute(t);
385 return t;
386 }
387
388 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
389 long initialDelay,
390 long delay,
391 TimeUnit unit) {
392 if (command == null || unit == null)
393 throw new NullPointerException();
394 if (delay <= 0)
395 throw new IllegalArgumentException();
396 if (initialDelay < 0) initialDelay = 0;
397 long triggerTime = now() + unit.toNanos(initialDelay);
398 RunnableScheduledFuture<?> t = decorateTask(command,
399 new ScheduledFutureTask<Boolean>(command,
400 null,
401 triggerTime,
402 unit.toNanos(-delay)));
403 delayedExecute(t);
404 return t;
405 }
406
407
408 /**
409 * Executes command with zero required delay. This has effect
410 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
411 * that inspections of the queue and of the list returned by
412 * <tt>shutdownNow</tt> will access the zero-delayed
413 * {@link ScheduledFuture}, not the <tt>command</tt> itself.
414 *
415 * @param command the task to execute
416 * @throws RejectedExecutionException at discretion of
417 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
418 * for execution because the executor has been shut down.
419 * @throws NullPointerException if command is null
420 */
421 public void execute(Runnable command) {
422 if (command == null)
423 throw new NullPointerException();
424 schedule(command, 0, TimeUnit.NANOSECONDS);
425 }
426
427 // Override AbstractExecutorService methods
428
429 public Future<?> submit(Runnable task) {
430 return schedule(task, 0, TimeUnit.NANOSECONDS);
431 }
432
433 public <T> Future<T> submit(Runnable task, T result) {
434 return schedule(Executors.callable(task, result),
435 0, TimeUnit.NANOSECONDS);
436 }
437
438 public <T> Future<T> submit(Callable<T> task) {
439 return schedule(task, 0, TimeUnit.NANOSECONDS);
440 }
441
442 /**
443 * Sets the policy on whether to continue executing existing periodic
444 * tasks even when this executor has been <tt>shutdown</tt>. In
445 * this case, these tasks will only terminate upon
446 * <tt>shutdownNow</tt>, or after setting the policy to
447 * <tt>false</tt> when already shutdown. This value is by default
448 * false.
449 * @param value if true, continue after shutdown, else don't.
450 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
451 */
452 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
453 continueExistingPeriodicTasksAfterShutdown = value;
454 if (!value && isShutdown())
455 cancelUnwantedTasks();
456 }
457
458 /**
459 * Gets the policy on whether to continue executing existing
460 * periodic tasks even when this executor has been
461 * <tt>shutdown</tt>. In this case, these tasks will only
462 * terminate upon <tt>shutdownNow</tt> or after setting the policy
463 * to <tt>false</tt> when already shutdown. This value is by
464 * default false.
465 * @return true if will continue after shutdown.
466 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
467 */
468 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
469 return continueExistingPeriodicTasksAfterShutdown;
470 }
471
472 /**
473 * Sets the policy on whether to execute existing delayed
474 * tasks even when this executor has been <tt>shutdown</tt>. In
475 * this case, these tasks will only terminate upon
476 * <tt>shutdownNow</tt>, or after setting the policy to
477 * <tt>false</tt> when already shutdown. This value is by default
478 * true.
479 * @param value if true, execute after shutdown, else don't.
480 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
481 */
482 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
483 executeExistingDelayedTasksAfterShutdown = value;
484 if (!value && isShutdown())
485 cancelUnwantedTasks();
486 }
487
488 /**
489 * Gets the policy on whether to execute existing delayed
490 * tasks even when this executor has been <tt>shutdown</tt>. In
491 * this case, these tasks will only terminate upon
492 * <tt>shutdownNow</tt>, or after setting the policy to
493 * <tt>false</tt> when already shutdown. This value is by default
494 * true.
495 * @return true if will execute after shutdown.
496 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
497 */
498 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
499 return executeExistingDelayedTasksAfterShutdown;
500 }
501
502
503 /**
504 * Initiates an orderly shutdown in which previously submitted
505 * tasks are executed, but no new tasks will be accepted. If the
506 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
507 * been set <tt>false</tt>, existing delayed tasks whose delays
508 * have not yet elapsed are cancelled. And unless the
509 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
510 * been set <tt>true</tt>, future executions of existing periodic
511 * tasks will be cancelled.
512 */
513 public void shutdown() {
514 cancelUnwantedTasks();
515 super.shutdown();
516 }
517
518 /**
519 * Attempts to stop all actively executing tasks, halts the
520 * processing of waiting tasks, and returns a list of the tasks that were
521 * awaiting execution.
522 *
523 * <p>There are no guarantees beyond best-effort attempts to stop
524 * processing actively executing tasks. This implementation
525 * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
526 * fail to respond to interrupts, they may never terminate.
527 *
528 * @return list of tasks that never commenced execution. Each
529 * element of this list is a {@link ScheduledFuture},
530 * including those tasks submitted using <tt>execute</tt>, which
531 * are for scheduling purposes used as the basis of a zero-delay
532 * <tt>ScheduledFuture</tt>.
533 */
534 public List<Runnable> shutdownNow() {
535 return super.shutdownNow();
536 }
537
538 /**
539 * Returns the task queue used by this executor. Each element of
540 * this queue is a {@link ScheduledFuture}, including those
541 * tasks submitted using <tt>execute</tt> which are for scheduling
542 * purposes used as the basis of a zero-delay
543 * <tt>ScheduledFuture</tt>. Iteration over this queue is
544 * <em>not</em> guaranteed to traverse tasks in the order in
545 * which they will execute.
546 *
547 * @return the task queue
548 */
549 public BlockingQueue<Runnable> getQueue() {
550 return super.getQueue();
551 }
552
553 /**
554 * An annoying wrapper class to convince javac to use a
555 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
556 */
557 private static class DelayedWorkQueue
558 extends AbstractCollection<Runnable>
559 implements BlockingQueue<Runnable> {
560
561 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
562 public Runnable poll() { return dq.poll(); }
563 public Runnable peek() { return dq.peek(); }
564 public Runnable take() throws InterruptedException { return dq.take(); }
565 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
566 return dq.poll(timeout, unit);
567 }
568
569 public boolean add(Runnable x) {
570 return dq.add((RunnableScheduledFuture)x);
571 }
572 public boolean offer(Runnable x) {
573 return dq.offer((RunnableScheduledFuture)x);
574 }
575 public void put(Runnable x) {
576 dq.put((RunnableScheduledFuture)x);
577 }
578 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
579 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
580 }
581
582 public Runnable remove() { return dq.remove(); }
583 public Runnable element() { return dq.element(); }
584 public void clear() { dq.clear(); }
585 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
586 public int drainTo(Collection<? super Runnable> c, int maxElements) {
587 return dq.drainTo(c, maxElements);
588 }
589
590 public int remainingCapacity() { return dq.remainingCapacity(); }
591 public boolean remove(Object x) { return dq.remove(x); }
592 public boolean contains(Object x) { return dq.contains(x); }
593 public int size() { return dq.size(); }
594 public boolean isEmpty() { return dq.isEmpty(); }
595 public Object[] toArray() { return dq.toArray(); }
596 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
597 public Iterator<Runnable> iterator() {
598 return new Iterator<Runnable>() {
599 private Iterator<RunnableScheduledFuture> it = dq.iterator();
600 public boolean hasNext() { return it.hasNext(); }
601 public Runnable next() { return it.next(); }
602 public void remove() { it.remove(); }
603 };
604 }
605 }
606 }