ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.32
Committed: Sun Oct 5 23:00:18 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_NOV3_FREEZE
Changes since 1.31: +19 -7 lines
Log Message:
added drainTo; clarified various exception specs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An {@link Executor} that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * {@link java.util.Timer} when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * {@link ThreadPoolExecutor} (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled,
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission.
31 *
32 * <p>All <t>schedule</tt> methods accept <em>relative</em> delays and
33 * periods as arguments, not absolute times or dates. It is a simple
34 * matter to transform an absolute time represented as a {@link
35 * java.util.Date}, to the required form. For example, to schedule at
36 * a certain future <tt>date</tt>, you can use: <tt>schedule(task,
37 * date.getTime() - System.currentTimeMillis(),
38 * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
39 * relative delay need not coincide with the current <tt>Date</tt> at
40 * which the task is enabled due to network time synchronization
41 * protocols, clock drift, or other factors. Negative relative delays
42 * (but not periods) are allowed in <tt>schedule</tt> methods, and are
43 * treated as requests for immediate execution.
44 *
45 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46 * of the inherited tuning methods are not especially useful for
47 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
48 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
49 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
50 * useful effect.
51 *
52 * @since 1.5
53 * @author Doug Lea
54 */
55 public class ScheduledExecutor extends ThreadPoolExecutor {
56
57 /**
58 * False if should cancel/suppress periodic tasks on shutdown.
59 */
60 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
61
62 /**
63 * False if should cancel non-periodic tasks on shutdown.
64 */
65 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
66
67
68 /**
69 * Sequence number to break scheduling ties, and in turn to
70 * guarantee FIFO order among tied entries.
71 */
72 private static final AtomicLong sequencer = new AtomicLong(0);
73
74 private static class ScheduledCancellableTask
75 extends CancellableTask implements ScheduledCancellable {
76
77 /** Sequence number to break ties FIFO */
78 private final long sequenceNumber;
79 /** The time the task is enabled to execute in nanoTime units */
80 private long time;
81 /** The delay following next time, or <= 0 if non-periodic */
82 private final long period;
83 /** true if at fixed rate; false if fixed delay */
84 private final boolean rateBased;
85
86 /**
87 * Creates a one-shot action with given nanoTime-based trigger time
88 */
89 ScheduledCancellableTask(Runnable r, long ns) {
90 super(r);
91 this.time = ns;
92 this.period = 0;
93 rateBased = false;
94 this.sequenceNumber = sequencer.getAndIncrement();
95 }
96
97 /**
98 * Creates a one-shot action with given nanoTime-based trigger
99 * time but does not establish the action. (This is needed for
100 * the Future-based subclass).
101 */
102 ScheduledCancellableTask(long ns) {
103 super();
104 this.time = ns;
105 this.period = 0;
106 rateBased = false;
107 this.sequenceNumber = sequencer.getAndIncrement();
108 }
109
110 /**
111 * Creates a periodic action with given nano time and period
112 */
113 ScheduledCancellableTask(Runnable r, long ns, long period, boolean rateBased) {
114 super(r);
115 this.time = ns;
116 this.period = period;
117 this.rateBased = rateBased;
118 this.sequenceNumber = sequencer.getAndIncrement();
119 }
120
121
122 public long getDelay(TimeUnit unit) {
123 long d = unit.convert(time - System.nanoTime(),
124 TimeUnit.NANOSECONDS);
125 return d;
126 }
127
128 public int compareTo(Object other) {
129 if (other == this) // compare zero ONLY if same object
130 return 0;
131 ScheduledCancellableTask x = (ScheduledCancellableTask)other;
132 long diff = time - x.time;
133 if (diff < 0)
134 return -1;
135 else if (diff > 0)
136 return 1;
137 else if (sequenceNumber < x.sequenceNumber)
138 return -1;
139 else
140 return 1;
141 }
142
143 /**
144 * Return true if this is a periodic (not a one-shot) action.
145 * @return true if periodic
146 */
147 public boolean isPeriodic() {
148 return period > 0;
149 }
150
151 /**
152 * Returns the period, or zero if non-periodic.
153 *
154 * @return the period
155 */
156 public long getPeriod(TimeUnit unit) {
157 return unit.convert(period, TimeUnit.NANOSECONDS);
158 }
159
160 /**
161 * Overrides CancellableTask version so as to not setDone if
162 * periodic.
163 */
164 public void run() {
165 if (setRunning()) {
166 try {
167 getRunnable().run();
168 } finally {
169 if (!isPeriodic())
170 setDone();
171 }
172 }
173 }
174
175 /**
176 * Return a ScheduledCancellable task (which may be this task)
177 * that will trigger in the period subsequent to current task,
178 * or null if non-periodic or cancelled.
179 */
180 ScheduledCancellableTask nextTask() {
181 if (period <= 0 || !reset())
182 return null;
183 time = period + (rateBased ? time : System.nanoTime());
184 return this;
185 }
186 }
187
188 private static class ScheduledFutureTask<V>
189 extends ScheduledCancellableTask implements ScheduledFuture<V> {
190
191 /**
192 * Creates a ScheduledFuture that may trigger after the given delay.
193 */
194 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
195 // must set callable after super ctor call to use inner class
196 super(triggerTime);
197 setRunnable(new InnerCancellableFuture<V>(callable));
198 }
199
200 public V get() throws InterruptedException, ExecutionException {
201 return ((InnerCancellableFuture<V>)getRunnable()).get();
202 }
203
204 public V get(long timeout, TimeUnit unit)
205 throws InterruptedException, ExecutionException, TimeoutException {
206 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
207 }
208
209 protected void set(V v) {
210 ((InnerCancellableFuture<V>)getRunnable()).set(v);
211 }
212
213 protected void setException(Throwable t) {
214 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
215 }
216 }
217
218
219 /**
220 * An annoying wrapper class to convince generics compiler to
221 * use a DelayQueue<ScheduledCancellableTask> as a BlockingQueue<Runnable>
222 */
223 private static class DelayedWorkQueue
224 extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
225
226 private final DelayQueue<ScheduledCancellableTask> dq = new DelayQueue<ScheduledCancellableTask>();
227 public Runnable poll() { return dq.poll(); }
228 public Runnable peek() { return dq.peek(); }
229 public Runnable take() throws InterruptedException { return dq.take(); }
230 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
231 return dq.poll(timeout, unit);
232 }
233
234 public boolean add(Runnable x) { return dq.add((ScheduledCancellableTask)x); }
235 public boolean offer(Runnable x) { return dq.offer((ScheduledCancellableTask)x); }
236 public void put(Runnable x) {
237 dq.put((ScheduledCancellableTask)x);
238 }
239 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
240 return dq.offer((ScheduledCancellableTask)x, timeout, unit);
241 }
242
243 public Runnable remove() { return dq.remove(); }
244 public Runnable element() { return dq.element(); }
245 public void clear() { dq.clear(); }
246 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
247 public int drainTo(Collection<? super Runnable> c, int maxElements) {
248 return dq.drainTo(c, maxElements);
249 }
250
251 public int remainingCapacity() { return dq.remainingCapacity(); }
252 public boolean remove(Object x) { return dq.remove(x); }
253 public boolean contains(Object x) { return dq.contains(x); }
254 public int size() { return dq.size(); }
255 public boolean isEmpty() { return dq.isEmpty(); }
256 public Object[] toArray() { return dq.toArray(); }
257 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
258 public Iterator<Runnable> iterator() {
259 return new Iterator<Runnable>() {
260 private Iterator<ScheduledCancellableTask> it = dq.iterator();
261 public boolean hasNext() { return it.hasNext(); }
262 public Runnable next() { return it.next(); }
263 public void remove() { it.remove(); }
264 };
265 }
266 }
267
268 /**
269 * Creates a new ScheduledExecutor with the given core pool size.
270 *
271 * @param corePoolSize the number of threads to keep in the pool,
272 * even if they are idle.
273 * @throws IllegalArgumentException if corePoolSize less than or
274 * equal to zero
275 */
276 public ScheduledExecutor(int corePoolSize) {
277 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
278 new DelayedWorkQueue());
279 }
280
281 /**
282 * Creates a new ScheduledExecutor with the given initial parameters.
283 *
284 * @param corePoolSize the number of threads to keep in the pool,
285 * even if they are idle.
286 * @param threadFactory the factory to use when the executor
287 * creates a new thread.
288 * @throws NullPointerException if threadFactory is null
289 */
290 public ScheduledExecutor(int corePoolSize,
291 ThreadFactory threadFactory) {
292 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
293 new DelayedWorkQueue(), threadFactory);
294 }
295
296 /**
297 * Creates a new ScheduledExecutor with the given initial parameters.
298 *
299 * @param corePoolSize the number of threads to keep in the pool,
300 * even if they are idle.
301 * @param handler the handler to use when execution is blocked
302 * because the thread bounds and queue capacities are reached.
303 * @throws NullPointerException if handler is null
304 */
305 public ScheduledExecutor(int corePoolSize,
306 RejectedExecutionHandler handler) {
307 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
308 new DelayedWorkQueue(), handler);
309 }
310
311 /**
312 * Creates a new ScheduledExecutor with the given initial parameters.
313 *
314 * @param corePoolSize the number of threads to keep in the pool,
315 * even if they are idle.
316 * @param threadFactory the factory to use when the executor
317 * creates a new thread.
318 * @param handler the handler to use when execution is blocked
319 * because the thread bounds and queue capacities are reached.
320 * @throws NullPointerException if threadFactory or handler is null
321 */
322 public ScheduledExecutor(int corePoolSize,
323 ThreadFactory threadFactory,
324 RejectedExecutionHandler handler) {
325 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
326 new DelayedWorkQueue(), threadFactory, handler);
327 }
328
329 /**
330 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
331 */
332 private void delayedExecute(Runnable command) {
333 if (isShutdown()) {
334 reject(command);
335 return;
336 }
337 // Prestart a thread if necessary. We cannot prestart it
338 // running the task because the task (probably) shouldn't be
339 // run yet, so thread will just idle until delay elapses.
340 if (getPoolSize() < getCorePoolSize())
341 prestartCoreThread();
342
343 super.getQueue().offer(command);
344 }
345
346 /**
347 * Creates and executes a one-shot action that becomes enabled after
348 * the given delay.
349 * @param command the task to execute.
350 * @param delay the time from now to delay execution.
351 * @param unit the time unit of the delay parameter.
352 * @return a handle that can be used to cancel the task.
353 * @throws RejectedExecutionException if task cannot be scheduled
354 * for execution because the executor has been shut down.
355 * @throws NullPointerException if command is null
356 */
357
358 public ScheduledCancellable schedule(Runnable command, long delay, TimeUnit unit) {
359 if (command == null)
360 throw new NullPointerException();
361 long triggerTime = System.nanoTime() + unit.toNanos(delay);
362 ScheduledCancellableTask t = new ScheduledCancellableTask(command, triggerTime);
363 delayedExecute(t);
364 return t;
365 }
366
367 /**
368 * Creates and executes a ScheduledFuture that becomes enabled after the
369 * given delay.
370 * @param callable the function to execute.
371 * @param delay the time from now to delay execution.
372 * @param unit the time unit of the delay parameter.
373 * @return a ScheduledFuture that can be used to extract result or cancel.
374 * @throws RejectedExecutionException if task cannot be scheduled
375 * for execution because the executor has been shut down.
376 * @throws NullPointerException if callable is null
377 */
378 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
379 if (callable == null)
380 throw new NullPointerException();
381 long triggerTime = System.nanoTime() + unit.toNanos(delay);
382 ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
383 delayedExecute(t);
384 return t;
385 }
386
387 /**
388 * Creates and executes a periodic action that becomes enabled first
389 * after the given initial delay, and subsequently with the given
390 * period; that is executions will commence after
391 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
392 * <tt>initialDelay + 2 * period</tt>, and so on. The
393 * task will only terminate via cancellation.
394 * @param command the task to execute.
395 * @param initialDelay the time to delay first execution.
396 * @param period the period between successive executions.
397 * @param unit the time unit of the delay and period parameters
398 * @return a handle that can be used to cancel the task.
399 * @throws RejectedExecutionException if task cannot be scheduled
400 * for execution because the executor has been shut down.
401 * @throws NullPointerException if command is null
402 * @throws IllegalArgumentException if period less than or equal to zero.
403 */
404 public ScheduledCancellable scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
405 if (command == null)
406 throw new NullPointerException();
407 if (period <= 0)
408 throw new IllegalArgumentException();
409 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
410 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
411 triggerTime,
412 unit.toNanos(period),
413 true);
414 delayedExecute(t);
415 return t;
416 }
417
418 /**
419 * Creates and executes a periodic action that becomes enabled first
420 * after the given initial delay, and and subsequently with the
421 * given delay between the termination of one execution and the
422 * commencement of the next.
423 * The task will only terminate via cancellation.
424 * @param command the task to execute.
425 * @param initialDelay the time to delay first execution.
426 * @param delay the delay between the termination of one
427 * execution and the commencement of the next.
428 * @param unit the time unit of the delay and delay parameters
429 * @return a handle that can be used to cancel the task.
430 * @throws RejectedExecutionException if task cannot be scheduled
431 * for execution because the executor has been shut down.
432 * @throws NullPointerException if command is null
433 * @throws IllegalArgumentException if delay less than or equal to zero.
434 */
435 public ScheduledCancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
436 if (command == null)
437 throw new NullPointerException();
438 if (delay <= 0)
439 throw new IllegalArgumentException();
440 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
441 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
442 triggerTime,
443 unit.toNanos(delay),
444 false);
445 delayedExecute(t);
446 return t;
447 }
448
449
450 /**
451 * Execute command with zero required delay. This has effect
452 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
453 * that inspections of the queue and of the list returned by
454 * <tt>shutdownNow</tt> will access the zero-delayed
455 * {@link ScheduledCancellable}, not the <tt>command</tt> itself.
456 *
457 * @param command the task to execute
458 * @throws RejectedExecutionException at discretion of
459 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
460 * for execution because the executor has been shut down.
461 * @throws NullPointerException if command is null
462 */
463 public void execute(Runnable command) {
464 if (command == null)
465 throw new NullPointerException();
466 schedule(command, 0, TimeUnit.NANOSECONDS);
467 }
468
469
470 /**
471 * Set policy on whether to continue executing existing periodic
472 * tasks even when this executor has been <tt>shutdown</tt>. In
473 * this case, these tasks will only terminate upon
474 * <tt>shutdownNow</tt>, or after setting the policy to
475 * <tt>false</tt> when already shutdown. This value is by default
476 * false.
477 * @param value if true, continue after shutdown, else don't.
478 */
479 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
480 continueExistingPeriodicTasksAfterShutdown = value;
481 if (!value && isShutdown())
482 cancelUnwantedTasks();
483 }
484
485 /**
486 * Get the policy on whether to continue executing existing
487 * periodic tasks even when this executor has been
488 * <tt>shutdown</tt>. In this case, these tasks will only
489 * terminate upon <tt>shutdownNow</tt> or after setting the policy
490 * to <tt>false</tt> when already shutdown. This value is by
491 * default false.
492 * @return true if will continue after shutdown.
493 */
494 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
495 return continueExistingPeriodicTasksAfterShutdown;
496 }
497
498 /**
499 * Set policy on whether to execute existing delayed
500 * tasks even when this executor has been <tt>shutdown</tt>. In
501 * this case, these tasks will only terminate upon
502 * <tt>shutdownNow</tt>, or after setting the policy to
503 * <tt>false</tt> when already shutdown. This value is by default
504 * true.
505 * @param value if true, execute after shutdown, else don't.
506 */
507 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
508 executeExistingDelayedTasksAfterShutdown = value;
509 if (!value && isShutdown())
510 cancelUnwantedTasks();
511 }
512
513 /**
514 * Get policy on whether to execute existing delayed
515 * tasks even when this executor has been <tt>shutdown</tt>. In
516 * this case, these tasks will only terminate upon
517 * <tt>shutdownNow</tt>, or after setting the policy to
518 * <tt>false</tt> when already shutdown. This value is by default
519 * true.
520 * @return true if will execute after shutdown.
521 */
522 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
523 return executeExistingDelayedTasksAfterShutdown;
524 }
525
526 /**
527 * Cancel and clear the queue of all tasks that should not be run
528 * due to shutdown policy.
529 */
530 private void cancelUnwantedTasks() {
531 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
532 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
533 if (!keepDelayed && !keepPeriodic)
534 super.getQueue().clear();
535 else if (keepDelayed || keepPeriodic) {
536 Object[] entries = super.getQueue().toArray();
537 for (int i = 0; i < entries.length; ++i) {
538 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
539 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
540 t.cancel(false);
541 }
542 entries = null;
543 purge();
544 }
545 }
546
547 /**
548 * Initiates an orderly shutdown in which previously submitted
549 * tasks are executed, but no new tasks will be accepted. If the
550 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
551 * been set <tt>false</tt>, existing delayed tasks whose delays
552 * have not yet elapsed are cancelled. And unless the
553 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
554 * been set <tt>true</tt>, future executions of existing periodic
555 * tasks will be cancelled.
556 */
557 public void shutdown() {
558 cancelUnwantedTasks();
559 super.shutdown();
560 }
561
562 /**
563 * Attempts to stop all actively executing tasks, halts the
564 * processing of waiting tasks, and returns a list of the tasks that were
565 * awaiting execution.
566 *
567 * <p>There are no guarantees beyond best-effort attempts to stop
568 * processing actively executing tasks. This implementations
569 * cancels via {@link Thread#interrupt}, so if any tasks mask or
570 * fail to respond to interrupts, they may never terminate.
571 *
572 * @return list of tasks that never commenced execution. Each
573 * element of this list is a {@link ScheduledCancellable},
574 * including those tasks submitted using <tt>execute</tt> which
575 * are for scheduling purposes used as the basis of a zero-delay
576 * <tt>ScheduledCancellable</tt>.
577 */
578 public List shutdownNow() {
579 return super.shutdownNow();
580 }
581
582 /**
583 * Removes this task from internal queue if it is present, thus
584 * causing it not to be run if it has not already started. This
585 * method may be useful as one part of a cancellation scheme.
586 *
587 * @param task the task to remove
588 * @return true if the task was removed
589 */
590 public boolean remove(Runnable task) {
591 if (task instanceof ScheduledCancellable)
592 return super.remove(task);
593
594 // The task might actually have been wrapped as a ScheduledCancellable
595 // in execute(), in which case we need to maually traverse
596 // looking for it.
597
598 ScheduledCancellable wrap = null;
599 Object[] entries = super.getQueue().toArray();
600 for (int i = 0; i < entries.length; ++i) {
601 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
602 Runnable r = t.getRunnable();
603 if (task.equals(r)) {
604 wrap = t;
605 break;
606 }
607 }
608 entries = null;
609 return wrap != null && super.getQueue().remove(wrap);
610 }
611
612
613 /**
614 * Returns the task queue used by this executor. Each element of
615 * this queue is a {@link ScheduledCancellable}, including those
616 * tasks submitted using <tt>execute</tt> which are for scheduling
617 * purposes used as the basis of a zero-delay
618 * <tt>ScheduledCancellable</tt>. Iteration over this queue is
619 * </em>not</em> guaranteed to traverse tasks in the order in
620 * which they will execute.
621 *
622 * @return the task queue
623 */
624 public BlockingQueue<Runnable> getQueue() {
625 return super.getQueue();
626 }
627
628 /**
629 * Override of <tt>Executor</tt> hook method to support periodic
630 * tasks. If the executed task was periodic, causes the task for
631 * the next period to execute.
632 * @param r the task (assumed to be a ScheduledCancellable)
633 * @param t the exception
634 */
635 protected void afterExecute(Runnable r, Throwable t) {
636 super.afterExecute(r, t);
637 ScheduledCancellableTask next = ((ScheduledCancellableTask)r).nextTask();
638 if (next != null &&
639 (!isShutdown() ||
640 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
641 !isTerminating())))
642 super.getQueue().offer(next);
643
644 // This might have been the final executed delayed task. Wake
645 // up threads to check.
646 else if (isShutdown())
647 interruptIdleWorkers();
648 }
649 }