ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.29
Committed: Sat Sep 13 18:51:11 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.28: +31 -29 lines
Log Message:
Proofreading pass -- many minor adjustments

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An {@link Executor} that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * {@link java.util.Timer} when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * {@link ThreadPoolExecutor} (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled,
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission.
31 *
32 * <p>All <t>schedule</tt> methods accept <em>relative</em> delays and
33 * periods as arguments, not absolute times or dates. It is a simple
34 * matter to transform an absolute time represented as a
35 * {@link java.util.Date}, to the required form. For example, to
36 * schedule at a certain future <tt>date</tt>, you can use:
37 * <tt>schedule(task, date.getTime() - System.currentTimeMillis(),
38 * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
39 * relative delay need not coincide with the current <tt>Date</tt> at
40 * which the task is enabled due to network time synchronization
41 * protocols, clock drift, or other factors.
42 *
43 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 * of the inherited tuning methods are not especially useful for
45 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
46 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
47 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
48 * useful effect.
49 *
50 * @since 1.5
51 * @author Doug Lea
52 */
53 public class ScheduledExecutor extends ThreadPoolExecutor {
54
55 /**
56 * False if should cancel/suppress periodic tasks on shutdown.
57 */
58 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
59
60 /**
61 * False if should cancel non-periodic tasks on shutdown.
62 */
63 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
64
65
66 /**
67 * Sequence number to break scheduling ties, and in turn to
68 * guarantee FIFO order among tied entries.
69 */
70 private static final AtomicLong sequencer = new AtomicLong(0);
71
72 private static class ScheduledCancellableTask
73 extends CancellableTask implements ScheduledCancellable {
74
75 /** Sequence number to break ties FIFO */
76 private final long sequenceNumber;
77 /** The time the task is enabled to execute in nanoTime units */
78 private long time;
79 /** The delay following next time, or <= 0 if non-periodic */
80 private final long period;
81 /** true if at fixed rate; false if fixed delay */
82 private final boolean rateBased;
83
84 /**
85 * Creates a one-shot action with given nanoTime-based trigger time
86 */
87 ScheduledCancellableTask(Runnable r, long ns) {
88 super(r);
89 this.time = ns;
90 this.period = 0;
91 rateBased = false;
92 this.sequenceNumber = sequencer.getAndIncrement();
93 }
94
95 /**
96 * Creates a one-shot action with given nanoTime-based trigger
97 * time but does not establish the action. (This is needed for
98 * the Future-based subclass).
99 */
100 ScheduledCancellableTask(long ns) {
101 super();
102 this.time = ns;
103 this.period = 0;
104 rateBased = false;
105 this.sequenceNumber = sequencer.getAndIncrement();
106 }
107
108 /**
109 * Creates a periodic action with given nano time and period
110 */
111 ScheduledCancellableTask(Runnable r, long ns, long period, boolean rateBased) {
112 super(r);
113 if (period <= 0)
114 throw new IllegalArgumentException();
115 this.time = ns;
116 this.period = period;
117 this.rateBased = rateBased;
118 this.sequenceNumber = sequencer.getAndIncrement();
119 }
120
121
122 public long getDelay(TimeUnit unit) {
123 long d = unit.convert(time - System.nanoTime(),
124 TimeUnit.NANOSECONDS);
125 return d;
126 }
127
128 public int compareTo(Object other) {
129 if (other == this) // compare zero ONLY if same object
130 return 0;
131 ScheduledCancellableTask x = (ScheduledCancellableTask)other;
132 long diff = time - x.time;
133 if (diff < 0)
134 return -1;
135 else if (diff > 0)
136 return 1;
137 else if (sequenceNumber < x.sequenceNumber)
138 return -1;
139 else
140 return 1;
141 }
142
143 /**
144 * Return true if this is a periodic (not a one-shot) action.
145 * @return true if periodic
146 */
147 public boolean isPeriodic() {
148 return period > 0;
149 }
150
151 /**
152 * Returns the period, or zero if non-periodic.
153 *
154 * @return the period
155 */
156 public long getPeriod(TimeUnit unit) {
157 return unit.convert(period, TimeUnit.NANOSECONDS);
158 }
159
160 /**
161 * Overrides CancellableTask version so as to not setDone if
162 * periodic.
163 */
164 public void run() {
165 if (setRunning()) {
166 try {
167 getRunnable().run();
168 } finally {
169 if (!isPeriodic())
170 setDone();
171 }
172 }
173 }
174
175 /**
176 * Return a ScheduledCancellable task (which may be this task)
177 * that will trigger in the period subsequent to current task,
178 * or null if non-periodic or cancelled.
179 */
180 ScheduledCancellableTask nextTask() {
181 if (period <= 0 || !reset())
182 return null;
183 time = period + (rateBased ? time : System.nanoTime());
184 return this;
185 }
186 }
187
188 private static class ScheduledFutureTask<V>
189 extends ScheduledCancellableTask implements ScheduledFuture<V> {
190
191 /**
192 * Creates a ScheduledFuture that may trigger after the given delay.
193 */
194 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
195 // must set callable after super ctor call to use inner class
196 super(triggerTime);
197 setRunnable(new InnerCancellableFuture<V>(callable));
198 }
199
200 public V get() throws InterruptedException, ExecutionException {
201 return ((InnerCancellableFuture<V>)getRunnable()).get();
202 }
203
204 public V get(long timeout, TimeUnit unit)
205 throws InterruptedException, ExecutionException, TimeoutException {
206 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
207 }
208
209 protected void set(V v) {
210 ((InnerCancellableFuture<V>)getRunnable()).set(v);
211 }
212
213 protected void setException(Throwable t) {
214 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
215 }
216 }
217
218
219 /**
220 * An annoying wrapper class to convince generics compiler to
221 * use a DelayQueue<ScheduledCancellableTask> as a BlockingQueue<Runnable>
222 */
223 private static class DelayedWorkQueue
224 extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
225
226 private final DelayQueue<ScheduledCancellableTask> dq = new DelayQueue<ScheduledCancellableTask>();
227 public Runnable poll() { return dq.poll(); }
228 public Runnable peek() { return dq.peek(); }
229 public Runnable take() throws InterruptedException { return dq.take(); }
230 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
231 return dq.poll(timeout, unit);
232 }
233
234 public boolean add(Runnable x) { return dq.add((ScheduledCancellableTask)x); }
235 public boolean offer(Runnable x) { return dq.offer((ScheduledCancellableTask)x); }
236 public void put(Runnable x) {
237 dq.put((ScheduledCancellableTask)x);
238 }
239 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
240 return dq.offer((ScheduledCancellableTask)x, timeout, unit);
241 }
242
243 public Runnable remove() { return dq.remove(); }
244 public Runnable element() { return dq.element(); }
245 public void clear() { dq.clear(); }
246
247 public int remainingCapacity() { return dq.remainingCapacity(); }
248 public boolean remove(Object x) { return dq.remove(x); }
249 public boolean contains(Object x) { return dq.contains(x); }
250 public int size() { return dq.size(); }
251 public boolean isEmpty() { return dq.isEmpty(); }
252 public Object[] toArray() { return dq.toArray(); }
253 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
254 public Iterator<Runnable> iterator() {
255 return new Iterator<Runnable>() {
256 private Iterator<ScheduledCancellableTask> it = dq.iterator();
257 public boolean hasNext() { return it.hasNext(); }
258 public Runnable next() { return it.next(); }
259 public void remove() { it.remove(); }
260 };
261 }
262 }
263
264 /**
265 * Creates a new ScheduledExecutor with the given core pool size.
266 *
267 * @param corePoolSize the number of threads to keep in the pool,
268 * even if they are idle.
269 */
270 public ScheduledExecutor(int corePoolSize) {
271 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
272 new DelayedWorkQueue());
273 }
274
275 /**
276 * Creates a new ScheduledExecutor with the given initial parameters.
277 *
278 * @param corePoolSize the number of threads to keep in the pool,
279 * even if they are idle.
280 * @param threadFactory the factory to use when the executor
281 * creates a new thread.
282 * @throws NullPointerException if threadFactory is null
283 */
284 public ScheduledExecutor(int corePoolSize,
285 ThreadFactory threadFactory) {
286 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
287 new DelayedWorkQueue(), threadFactory);
288 }
289
290 /**
291 * Creates a new ScheduledExecutor with the given initial parameters.
292 *
293 * @param corePoolSize the number of threads to keep in the pool,
294 * even if they are idle.
295 * @param handler the handler to use when execution is blocked
296 * because the thread bounds and queue capacities are reached.
297 * @throws NullPointerException if handler is null
298 */
299 public ScheduledExecutor(int corePoolSize,
300 RejectedExecutionHandler handler) {
301 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
302 new DelayedWorkQueue(), handler);
303 }
304
305 /**
306 * Creates a new ScheduledExecutor with the given initial parameters.
307 *
308 * @param corePoolSize the number of threads to keep in the pool,
309 * even if they are idle.
310 * @param threadFactory the factory to use when the executor
311 * creates a new thread.
312 * @param handler the handler to use when execution is blocked
313 * because the thread bounds and queue capacities are reached.
314 * @throws NullPointerException if threadFactory or handler is null
315 */
316 public ScheduledExecutor(int corePoolSize,
317 ThreadFactory threadFactory,
318 RejectedExecutionHandler handler) {
319 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
320 new DelayedWorkQueue(), threadFactory, handler);
321 }
322
323 /**
324 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
325 */
326 private void delayedExecute(Runnable command) {
327 if (isShutdown()) {
328 reject(command);
329 return;
330 }
331 // Prestart a thread if necessary. We cannot prestart it
332 // running the task because the task (probably) shouldn't be
333 // run yet, so thread will just idle until delay elapses.
334 if (getPoolSize() < getCorePoolSize())
335 prestartCoreThread();
336
337 super.getQueue().offer(command);
338 }
339
340 /**
341 * Creates and executes a one-shot action that becomes enabled after
342 * the given delay.
343 * @param command the task to execute.
344 * @param delay the time from now to delay execution.
345 * @param unit the time unit of the delay parameter.
346 * @return a handle that can be used to cancel the task.
347 * @throws RejectedExecutionException if task cannot be scheduled
348 * for execution because the executor has been shut down.
349 */
350
351 public ScheduledCancellable schedule(Runnable command, long delay, TimeUnit unit) {
352 long triggerTime = System.nanoTime() + unit.toNanos(delay);
353 ScheduledCancellableTask t = new ScheduledCancellableTask(command, triggerTime);
354 delayedExecute(t);
355 return t;
356 }
357
358 /**
359 * Creates and executes a ScheduledFuture that becomes enabled after the
360 * given delay.
361 * @param callable the function to execute.
362 * @param delay the time from now to delay execution.
363 * @param unit the time unit of the delay parameter.
364 * @return a ScheduledFuture that can be used to extract result or cancel.
365 * @throws RejectedExecutionException if task cannot be scheduled
366 * for execution because the executor has been shut down.
367 */
368 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
369 long triggerTime = System.nanoTime() + unit.toNanos(delay);
370 ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
371 delayedExecute(t);
372 return t;
373 }
374
375 /**
376 * Creates and executes a periodic action that becomes enabled first
377 * after the given initial delay, and subsequently with the given
378 * period; that is executions will commence after
379 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
380 * <tt>initialDelay + 2 * period</tt>, and so on. The
381 * task will only terminate via cancellation.
382 * @param command the task to execute.
383 * @param initialDelay the time to delay first execution.
384 * @param period the period between successive executions.
385 * @param unit the time unit of the delay and period parameters
386 * @return a handle that can be used to cancel the task.
387 * @throws RejectedExecutionException if task cannot be scheduled
388 * for execution because the executor has been shut down.
389 */
390 public ScheduledCancellable scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
391 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
392 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
393 triggerTime,
394 unit.toNanos(period),
395 true);
396 delayedExecute(t);
397 return t;
398 }
399
400 /**
401 * Creates and executes a periodic action that becomes enabled first
402 * after the given initial delay, and and subsequently with the
403 * given delay between the termination of one execution and the
404 * commencement of the next.
405 * The task will only terminate via cancellation.
406 * @param command the task to execute.
407 * @param initialDelay the time to delay first execution.
408 * @param delay the delay between the termination of one
409 * execution and the commencement of the next.
410 * @param unit the time unit of the delay and delay parameters
411 * @return a handle that can be used to cancel the task.
412 * @throws RejectedExecutionException if task cannot be scheduled
413 * for execution because the executor has been shut down.
414 */
415 public ScheduledCancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
416 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
417 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
418 triggerTime,
419 unit.toNanos(delay),
420 false);
421 delayedExecute(t);
422 return t;
423 }
424
425
426 /**
427 * Execute command with zero required delay. This has effect
428 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
429 * that inspections of the queue and of the list returned by
430 * <tt>shutdownNow</tt> will access the zero-delayed
431 * {@link ScheduledCancellable}, not the <tt>command</tt> itself.
432 *
433 * @param command the task to execute
434 * @throws RejectedExecutionException at discretion of
435 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
436 * for execution because the executor has been shut down.
437 */
438 public void execute(Runnable command) {
439 schedule(command, 0, TimeUnit.NANOSECONDS);
440 }
441
442
443 /**
444 * Set policy on whether to continue executing existing periodic
445 * tasks even when this executor has been <tt>shutdown</tt>. In
446 * this case, these tasks will only terminate upon
447 * <tt>shutdownNow</tt>, or after setting the policy to
448 * <tt>false</tt> when already shutdown. This value is by default
449 * false.
450 * @param value if true, continue after shutdown, else don't.
451 */
452 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
453 continueExistingPeriodicTasksAfterShutdown = value;
454 if (!value && isShutdown())
455 cancelUnwantedTasks();
456 }
457
458 /**
459 * Get the policy on whether to continue executing existing
460 * periodic tasks even when this executor has been
461 * <tt>shutdown</tt>. In this case, these tasks will only
462 * terminate upon <tt>shutdownNow</tt> or after setting the policy
463 * to <tt>false</tt> when already shutdown. This value is by
464 * default false.
465 * @return true if will continue after shutdown.
466 */
467 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
468 return continueExistingPeriodicTasksAfterShutdown;
469 }
470
471 /**
472 * Set policy on whether to execute existing delayed
473 * tasks even when this executor has been <tt>shutdown</tt>. In
474 * this case, these tasks will only terminate upon
475 * <tt>shutdownNow</tt>, or after setting the policy to
476 * <tt>false</tt> when already shutdown. This value is by default
477 * true.
478 * @param value if true, execute after shutdown, else don't.
479 */
480 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
481 executeExistingDelayedTasksAfterShutdown = value;
482 if (!value && isShutdown())
483 cancelUnwantedTasks();
484 }
485
486 /**
487 * Get policy on whether to execute existing delayed
488 * tasks even when this executor has been <tt>shutdown</tt>. In
489 * this case, these tasks will only terminate upon
490 * <tt>shutdownNow</tt>, or after setting the policy to
491 * <tt>false</tt> when already shutdown. This value is by default
492 * true.
493 * @return true if will execute after shutdown.
494 */
495 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
496 return executeExistingDelayedTasksAfterShutdown;
497 }
498
499 /**
500 * Cancel and clear the queue of all tasks that should not be run
501 * due to shutdown policy.
502 */
503 private void cancelUnwantedTasks() {
504 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
505 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
506 if (!keepDelayed && !keepPeriodic)
507 super.getQueue().clear();
508 else if (keepDelayed || keepPeriodic) {
509 Object[] entries = super.getQueue().toArray();
510 for (int i = 0; i < entries.length; ++i) {
511 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
512 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
513 t.cancel(false);
514 }
515 entries = null;
516 purge();
517 }
518 }
519
520 /**
521 * Initiates an orderly shutdown in which previously submitted
522 * tasks are executed, but no new tasks will be accepted. If the
523 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
524 * been set <tt>false</tt>, existing delayed tasks whose delays
525 * have not yet elapsed are cancelled. And unless the
526 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
527 * been set <tt>true</tt>, future executions of existing periodic
528 * tasks will be cancelled.
529 */
530 public void shutdown() {
531 cancelUnwantedTasks();
532 super.shutdown();
533 }
534
535 /**
536 * Attempts to stop all actively executing tasks, halts the
537 * processing of waiting tasks, and returns a list of the tasks that were
538 * awaiting execution.
539 *
540 * <p>There are no guarantees beyond best-effort attempts to stop
541 * processing actively executing tasks. This implementations
542 * cancels via {@link Thread#interrupt}, so if any tasks mask or
543 * fail to respond to interrupts, they may never terminate.
544 *
545 * @return list of tasks that never commenced execution. Each
546 * element of this list is a {@link ScheduledCancellable},
547 * including those tasks submitted using <tt>execute</tt> which
548 * are for scheduling purposes used as the basis of a zero-delay
549 * <tt>ScheduledCancellable</tt>.
550 */
551 public List shutdownNow() {
552 return super.shutdownNow();
553 }
554
555 /**
556 * Removes this task from internal queue if it is present, thus
557 * causing it not to be run if it has not already started. This
558 * method may be useful as one part of a cancellation scheme.
559 *
560 * @param task the task to remove
561 * @return true if the task was removed
562 */
563 public boolean remove(Runnable task) {
564 if (task instanceof ScheduledCancellable)
565 return super.remove(task);
566
567 // The task might actually have been wrapped as a ScheduledCancellable
568 // in execute(), in which case we need to maually traverse
569 // looking for it.
570
571 ScheduledCancellable wrap = null;
572 Object[] entries = super.getQueue().toArray();
573 for (int i = 0; i < entries.length; ++i) {
574 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
575 Runnable r = t.getRunnable();
576 if (task.equals(r)) {
577 wrap = t;
578 break;
579 }
580 }
581 entries = null;
582 return wrap != null && super.getQueue().remove(wrap);
583 }
584
585
586 /**
587 * Returns the task queue used by this executor. Each element of
588 * this queue is a {@link ScheduledCancellable}, including those
589 * tasks submitted using <tt>execute</tt> which are for scheduling
590 * purposes used as the basis of a zero-delay
591 * <tt>ScheduledCancellable</tt>. Iteration over this queue is
592 * </em>not</em> guaranteed to travserse tasks in the order in
593 * which they will execute.
594 *
595 * @return the task queue
596 */
597 public BlockingQueue<Runnable> getQueue() {
598 return super.getQueue();
599 }
600
601 /**
602 * Override of <tt>Executor</tt> hook method to support periodic
603 * tasks. If the executed task was periodic, causes the task for
604 * the next period to execute.
605 * @param r the task (assumed to be a ScheduledCancellable)
606 * @param t the exception
607 */
608 protected void afterExecute(Runnable r, Throwable t) {
609 super.afterExecute(r, t);
610 ScheduledCancellableTask next = ((ScheduledCancellableTask)r).nextTask();
611 if (next != null &&
612 (!isShutdown() ||
613 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
614 !isTerminating())))
615 super.getQueue().offer(next);
616
617 // This might have been the final executed delayed task. Wake
618 // up threads to check.
619 else if (isShutdown())
620 interruptIdleWorkers();
621 }
622 }