ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.27
Committed: Sun Sep 7 23:28:21 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.26: +7 -4 lines
Log Message:
Allow resets

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An {@link Executor} that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * {@link java.util.Timer} when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * {@link ThreadPoolExecutor} (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled,
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission.
31 *
32 * <p>All <t>schedule</tt> methods accept <em>relative</em> delays and
33 * periods as arguments, not absolute times or dates. It is a simple
34 * matter to transform an absolute time represented as a
35 * {@link java.util.Date}, to the required form. For example, to
36 * schedule at a certain future <tt>date</tt>, you can use:
37 * <tt>schedule(task, date.getTime() - System.currentTimeMillis(),
38 * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
39 * relative delay need not coincide with the current <tt>Date</tt> at
40 * which the task is enabled due to network time synchronization
41 * protocols, clock drift, or other factors.
42 *
43 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 * of the inherited tuning methods are not especially useful for
45 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
46 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
47 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
48 * useful effect.
49 *
50 * @since 1.5
51 * @author Doug Lea
52 */
53 public class ScheduledExecutor extends ThreadPoolExecutor {
54
55 /**
56 * False if should cancel/suppress periodic tasks on shutdown.
57 */
58 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
59
60 /**
61 * False if should cancel non-periodic tasks on shutdown.
62 */
63 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
64
65
66 /**
67 * Sequence number to break scheduling ties, and in turn to
68 * guarantee FIFO order among tied entries.
69 */
70 private static final AtomicLong sequencer = new AtomicLong(0);
71
72 private static class ScheduledCancellableTask
73 extends CancellableTask implements ScheduledCancellable {
74
75 /** Sequence number to break ties FIFO */
76 private long sequenceNumber;
77 /** The time the task is enabled to execute in nanoTime units */
78 private long time;
79 /** The delay forllowing next time, or <= 0 if non-periodic */
80 private long period;
81 /** true if at fixed rate; false if fixed delay */
82 private final boolean rateBased;
83
84 /**
85 * Creates a one-shot action with given nanoTime-based trigger time
86 */
87 ScheduledCancellableTask(Runnable r, long ns) {
88 super(r);
89 this.time = ns;
90 this.period = 0;
91 rateBased = false;
92 this.sequenceNumber = sequencer.getAndIncrement();
93 }
94
95 /**
96 * Creates a periodic action with given nano time and period
97 */
98 ScheduledCancellableTask(Runnable r, long ns, long period, boolean rateBased) {
99 super(r);
100 if (period <= 0)
101 throw new IllegalArgumentException();
102 this.time = ns;
103 this.period = period;
104 this.rateBased = rateBased;
105 this.sequenceNumber = sequencer.getAndIncrement();
106 }
107
108
109 public long getDelay(TimeUnit unit) {
110 long d = unit.convert(time - System.nanoTime(),
111 TimeUnit.NANOSECONDS);
112 return d;
113 }
114
115 public int compareTo(Object other) {
116 if (other == this) // compare zero ONLY if same object
117 return 0;
118 ScheduledCancellableTask x = (ScheduledCancellableTask)other;
119 long diff = time - x.time;
120 if (diff < 0)
121 return -1;
122 else if (diff > 0)
123 return 1;
124 else if (sequenceNumber < x.sequenceNumber)
125 return -1;
126 else
127 return 1;
128 }
129
130 /**
131 * Return true if this is a periodic (not a one-shot) action.
132 * @return true if periodic
133 */
134 public boolean isPeriodic() {
135 return period > 0;
136 }
137
138 /**
139 * Returns the period, or zero if non-periodic.
140 *
141 * @return the period
142 */
143 public long getPeriod(TimeUnit unit) {
144 return unit.convert(period, TimeUnit.NANOSECONDS);
145 }
146
147 /**
148 * Return a new ScheduledCancellable that will trigger in the period
149 * subsequent to current task, or null if non-periodic
150 * or canceled.
151 */
152 ScheduledCancellableTask nextTask() {
153 if (period <= 0 || isCancelled())
154 return null;
155 long nextTime = period + (rateBased ? time : System.nanoTime());
156 this.time = nextTime;
157 this.sequenceNumber = sequencer.getAndIncrement();
158 reset();
159 return this;
160 }
161 }
162
163 private static class ScheduledFutureTask<V>
164 extends ScheduledCancellableTask implements ScheduledFuture<V> {
165
166 /**
167 * Creates a ScheduledFuture that may trigger after the given delay.
168 */
169 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
170 // must set after super ctor call to use inner class
171 super(null, triggerTime);
172 setRunnable(new InnerCancellableFuture<V>(callable));
173 }
174
175 public V get() throws InterruptedException, ExecutionException {
176 return ((InnerCancellableFuture<V>)getRunnable()).get();
177 }
178
179 public V get(long timeout, TimeUnit unit)
180 throws InterruptedException, ExecutionException, TimeoutException {
181 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
182 }
183
184 protected void set(V v) {
185 ((InnerCancellableFuture<V>)getRunnable()).set(v);
186 }
187
188 protected void setException(Throwable t) {
189 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
190 }
191 }
192
193
194 /**
195 * An annoying wrapper class to convince generics compiler to
196 * use a DelayQueue<ScheduledCancellableTask> as a BlockingQueue<Runnable>
197 */
198 private static class DelayedWorkQueue
199 extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
200
201 private final DelayQueue<ScheduledCancellableTask> dq = new DelayQueue<ScheduledCancellableTask>();
202 public Runnable poll() { return dq.poll(); }
203 public Runnable peek() { return dq.peek(); }
204 public Runnable take() throws InterruptedException { return dq.take(); }
205 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
206 return dq.poll(timeout, unit);
207 }
208
209 public boolean add(Runnable x) { return dq.add((ScheduledCancellableTask)x); }
210 public boolean offer(Runnable x) { return dq.offer((ScheduledCancellableTask)x); }
211 public void put(Runnable x) throws InterruptedException {
212 dq.put((ScheduledCancellableTask)x);
213 }
214 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
215 return dq.offer((ScheduledCancellableTask)x, timeout, unit);
216 }
217
218 public Runnable remove() { return dq.remove(); }
219 public Runnable element() { return dq.element(); }
220 public void clear() { dq.clear(); }
221
222 public int remainingCapacity() { return dq.remainingCapacity(); }
223 public boolean remove(Object x) { return dq.remove(x); }
224 public boolean contains(Object x) { return dq.contains(x); }
225 public int size() { return dq.size(); }
226 public boolean isEmpty() { return dq.isEmpty(); }
227 public Object[] toArray() { return dq.toArray(); }
228 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
229 public Iterator<Runnable> iterator() {
230 return new Iterator<Runnable>() {
231 private Iterator<ScheduledCancellableTask> it = dq.iterator();
232 public boolean hasNext() { return it.hasNext(); }
233 public Runnable next() { return it.next(); }
234 public void remove() { it.remove(); }
235 };
236 }
237 }
238
239 /**
240 * Creates a new ScheduledExecutor with the given initial parameters.
241 *
242 * @param corePoolSize the number of threads to keep in the pool,
243 * even if they are idle.
244 */
245 public ScheduledExecutor(int corePoolSize) {
246 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
247 new DelayedWorkQueue());
248 }
249
250 /**
251 * Creates a new ScheduledExecutor with the given initial parameters.
252 *
253 * @param corePoolSize the number of threads to keep in the pool,
254 * even if they are idle.
255 * @param threadFactory the factory to use when the executor
256 * creates a new thread.
257 */
258 public ScheduledExecutor(int corePoolSize,
259 ThreadFactory threadFactory) {
260 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
261 new DelayedWorkQueue(), threadFactory);
262 }
263
264 /**
265 * Creates a new ScheduledExecutor with the given initial parameters.
266 *
267 * @param corePoolSize the number of threads to keep in the pool,
268 * even if they are idle.
269 * @param handler the handler to use when execution is blocked
270 * because the thread bounds and queue capacities are reached.
271 */
272 public ScheduledExecutor(int corePoolSize,
273 RejectedExecutionHandler handler) {
274 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
275 new DelayedWorkQueue(), handler);
276 }
277
278 /**
279 * Creates a new ScheduledExecutor with the given initial parameters.
280 *
281 * @param corePoolSize the number of threads to keep in the pool,
282 * even if they are idle.
283 * @param threadFactory the factory to use when the executor
284 * creates a new thread.
285 * @param handler the handler to use when execution is blocked
286 * because the thread bounds and queue capacities are reached.
287 */
288 public ScheduledExecutor(int corePoolSize,
289 ThreadFactory threadFactory,
290 RejectedExecutionHandler handler) {
291 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
292 new DelayedWorkQueue(), threadFactory, handler);
293 }
294
295 /**
296 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
297 */
298 private void delayedExecute(Runnable command) {
299 if (isShutdown()) {
300 reject(command);
301 return;
302 }
303 // Prestart a thread if necessary. We cannot prestart it
304 // running the task because the task (probably) shouldn't be
305 // run yet, so thread will just idle until delay elapses.
306 if (getPoolSize() < getCorePoolSize())
307 prestartCoreThread();
308
309 super.getQueue().offer(command);
310 }
311
312 /**
313 * Creates and executes a one-shot action that becomes enabled after
314 * the given delay.
315 * @param command the task to execute.
316 * @param delay the time from now to delay execution.
317 * @param unit the time unit of the delay parameter.
318 * @return a handle that can be used to cancel the task.
319 * @throws RejectedExecutionException if task cannot be scheduled
320 * for execution because the executor has been shut down.
321 */
322
323 public ScheduledCancellable schedule(Runnable command, long delay, TimeUnit unit) {
324 long triggerTime = System.nanoTime() + unit.toNanos(delay);
325 ScheduledCancellableTask t = new ScheduledCancellableTask(command, triggerTime);
326 delayedExecute(t);
327 return t;
328 }
329
330 /**
331 * Creates and executes a periodic action that becomes enabled first
332 * after the given initial delay, and subsequently with the given
333 * period; that is executions will commence after
334 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
335 * <tt>initialDelay + 2 * period</tt>, and so on.
336 * @param command the task to execute.
337 * @param initialDelay the time to delay first execution.
338 * @param period the period between successive executions.
339 * @param unit the time unit of the delay and period parameters
340 * @return a handle that can be used to cancel the task.
341 * @throws RejectedExecutionException if task cannot be scheduled
342 * for execution because the executor has been shut down.
343 */
344 public ScheduledCancellable scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
345 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
346 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
347 triggerTime,
348 unit.toNanos(period),
349 true);
350 delayedExecute(t);
351 return t;
352 }
353
354
355 /**
356 * Creates and executes a periodic action that becomes enabled first
357 * after the given initial delay, and and subsequently with the
358 * given delay between the termination of one execution and the
359 * commencement of the next.
360 * @param command the task to execute.
361 * @param initialDelay the time to delay first execution.
362 * @param delay the delay between the termination of one
363 * execution and the commencement of the next.
364 * @param unit the time unit of the delay and delay parameters
365 * @return a handle that can be used to cancel the task.
366 * @throws RejectedExecutionException if task cannot be scheduled
367 * for execution because the executor has been shut down.
368 */
369 public ScheduledCancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
370 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
371 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
372 triggerTime,
373 unit.toNanos(delay),
374 false);
375 delayedExecute(t);
376 return t;
377 }
378
379 /**
380 * Creates and executes a ScheduledFuture that becomes enabled after the
381 * given delay.
382 * @param callable the function to execute.
383 * @param delay the time from now to delay execution.
384 * @param unit the time unit of the delay parameter.
385 * @return a ScheduledFuture that can be used to extract result or cancel.
386 * @throws RejectedExecutionException if task cannot be scheduled
387 * for execution because the executor has been shut down.
388 */
389 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
390 long triggerTime = System.nanoTime() + unit.toNanos(delay);
391 ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
392 delayedExecute(t);
393 return t;
394 }
395
396 /**
397 * Execute command with zero required delay. This has effect
398 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
399 * that inspections of the queue and of the list returned by
400 * <tt>shutdownNow</tt> will access the zero-delayed
401 * {@link ScheduledCancellable}, not the <tt>command</tt> itself.
402 *
403 * @param command the task to execute
404 * @throws RejectedExecutionException at discretion of
405 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
406 * for execution because the executor has been shut down.
407 */
408 public void execute(Runnable command) {
409 schedule(command, 0, TimeUnit.NANOSECONDS);
410 }
411
412
413 /**
414 * Set policy on whether to continue executing existing periodic
415 * tasks even when this executor has been <tt>shutdown</tt>. In
416 * this case, these tasks will only terminate upon
417 * <tt>shutdownNow</tt>, or after setting the policy to
418 * <tt>false</tt> when already shutdown. This value is by default
419 * false.
420 * @param value if true, continue after shutdown, else don't.
421 */
422 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
423 continueExistingPeriodicTasksAfterShutdown = value;
424 if (!value && isShutdown())
425 cancelUnwantedTasks();
426 }
427
428 /**
429 * Get the policy on whether to continue executing existing
430 * periodic tasks even when this executor has been
431 * <tt>shutdown</tt>. In this case, these tasks will only
432 * terminate upon <tt>shutdownNow</tt> or after setting the policy
433 * to <tt>false</tt> when already shutdown. This value is by
434 * default false.
435 * @return true if will continue after shutdown.
436 */
437 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
438 return continueExistingPeriodicTasksAfterShutdown;
439 }
440
441 /**
442 * Set policy on whether to execute existing delayed
443 * tasks even when this executor has been <tt>shutdown</tt>. In
444 * this case, these tasks will only terminate upon
445 * <tt>shutdownNow</tt>, or after setting the policy to
446 * <tt>false</tt> when already shutdown. This value is by default
447 * true.
448 * @param value if true, execute after shutdown, else don't.
449 */
450 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
451 executeExistingDelayedTasksAfterShutdown = value;
452 if (!value && isShutdown())
453 cancelUnwantedTasks();
454 }
455
456 /**
457 * Get policy on whether to execute existing delayed
458 * tasks even when this executor has been <tt>shutdown</tt>. In
459 * this case, these tasks will only terminate upon
460 * <tt>shutdownNow</tt>, or after setting the policy to
461 * <tt>false</tt> when already shutdown. This value is by default
462 * true.
463 * @return true if will execute after shutdown.
464 */
465 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
466 return executeExistingDelayedTasksAfterShutdown;
467 }
468
469 /**
470 * Cancel and clear the queue of all tasks that should not be run
471 * due to shutdown policy.
472 */
473 private void cancelUnwantedTasks() {
474 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
475 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
476 if (!keepDelayed && !keepPeriodic)
477 super.getQueue().clear();
478 else if (keepDelayed || keepPeriodic) {
479 Object[] entries = super.getQueue().toArray();
480 for (int i = 0; i < entries.length; ++i) {
481 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
482 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
483 t.cancel(false);
484 }
485 entries = null;
486 purge();
487 }
488 }
489
490 /**
491 * Initiates an orderly shutdown in which previously submitted
492 * tasks are executed, but no new tasks will be accepted. If the
493 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
494 * been set <tt>false</tt>, existing delayed tasks whose delays
495 * have not yet elapsed are cancelled. And unless the
496 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> hase
497 * been set <tt>true</tt>, future executions of existing periodic
498 * tasks will be cancelled.
499 */
500 public void shutdown() {
501 cancelUnwantedTasks();
502 super.shutdown();
503 }
504
505 /**
506 * Attempts to stop all actively executing tasks, halts the
507 * processing of waiting tasks, and returns a list of the tasks that were
508 * awaiting execution.
509 *
510 * <p>There are no guarantees beyond best-effort attempts to stop
511 * processing actively executing tasks. This implementations
512 * cancels via {@link Thread#interrupt}, so if any tasks mask or
513 * fail to respond to interrupts, they may never terminate.
514 *
515 * @return list of tasks that never commenced execution. Each
516 * element of this list is a {@link ScheduledCancellable},
517 * including those tasks submitted using <tt>execute</tt> which
518 * are for scheduling purposes used as the basis of a zero-delay
519 * <tt>ScheduledCancellable</tt>.
520 */
521 public List shutdownNow() {
522 return super.shutdownNow();
523 }
524
525 /**
526 * Removes this task from internal queue if it is present, thus
527 * causing it not to be run if it has not already started. This
528 * method may be useful as one part of a cancellation scheme.
529 *
530 * @param task the task to remove
531 * @return true if the task was removed
532 */
533 public boolean remove(Runnable task) {
534 if (task instanceof ScheduledCancellable)
535 return super.remove(task);
536
537 // The task might actually have been wrapped as a ScheduledCancellable
538 // in execute(), in which case we need to maually traverse
539 // looking for it.
540
541 ScheduledCancellable wrap = null;
542 Object[] entries = super.getQueue().toArray();
543 for (int i = 0; i < entries.length; ++i) {
544 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
545 Runnable r = t.getRunnable();
546 if (task.equals(r)) {
547 wrap = t;
548 break;
549 }
550 }
551 entries = null;
552 return wrap != null && super.getQueue().remove(wrap);
553 }
554
555
556 /**
557 * Returns the task queue used by this executor. Each element of
558 * this queue is a {@link ScheduledCancellable}, including those
559 * tasks submitted using <tt>execute</tt> which are for scheduling
560 * purposes used as the basis of a zero-delay
561 * <tt>ScheduledCancellable</tt>. Iteration over this queue is
562 * </em>not</em> guaranteed to travserse tasks in the order in
563 * which they will execute.
564 *
565 * @return the task queue
566 */
567 public BlockingQueue<Runnable> getQueue() {
568 return super.getQueue();
569 }
570
571 /**
572 * Override of <tt>Executor</tt> hook method to support periodic
573 * tasks. If the executed task was periodic, causes the task for
574 * the next period to execute.
575 * @param r the task (assumed to be a ScheduledCancellable)
576 * @param t the exception
577 */
578 protected void afterExecute(Runnable r, Throwable t) {
579 super.afterExecute(r, t);
580 ScheduledCancellableTask next = ((ScheduledCancellableTask)r).nextTask();
581 if (next != null &&
582 (!isShutdown() ||
583 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
584 !isTerminating())))
585 super.getQueue().offer(next);
586
587 // This might have been the final executed delayed task. Wake
588 // up threads to check.
589 else if (isShutdown())
590 interruptIdleWorkers();
591 }
592 }