ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.28
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.27: +43 -15 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An {@link Executor} that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * {@link java.util.Timer} when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * {@link ThreadPoolExecutor} (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled,
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission.
31 *
32 * <p>All <t>schedule</tt> methods accept <em>relative</em> delays and
33 * periods as arguments, not absolute times or dates. It is a simple
34 * matter to transform an absolute time represented as a
35 * {@link java.util.Date}, to the required form. For example, to
36 * schedule at a certain future <tt>date</tt>, you can use:
37 * <tt>schedule(task, date.getTime() - System.currentTimeMillis(),
38 * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
39 * relative delay need not coincide with the current <tt>Date</tt> at
40 * which the task is enabled due to network time synchronization
41 * protocols, clock drift, or other factors.
42 *
43 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 * of the inherited tuning methods are not especially useful for
45 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
46 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
47 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
48 * useful effect.
49 *
50 * @since 1.5
51 * @author Doug Lea
52 */
53 public class ScheduledExecutor extends ThreadPoolExecutor {
54
55 /**
56 * False if should cancel/suppress periodic tasks on shutdown.
57 */
58 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
59
60 /**
61 * False if should cancel non-periodic tasks on shutdown.
62 */
63 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
64
65
66 /**
67 * Sequence number to break scheduling ties, and in turn to
68 * guarantee FIFO order among tied entries.
69 */
70 private static final AtomicLong sequencer = new AtomicLong(0);
71
72 private static class ScheduledCancellableTask
73 extends CancellableTask implements ScheduledCancellable {
74
75 /** Sequence number to break ties FIFO */
76 private long sequenceNumber;
77 /** The time the task is enabled to execute in nanoTime units */
78 private long time;
79 /** The delay following next time, or <= 0 if non-periodic */
80 private final long period;
81 /** true if at fixed rate; false if fixed delay */
82 private final boolean rateBased;
83
84 /**
85 * Creates a one-shot action with given nanoTime-based trigger time
86 */
87 ScheduledCancellableTask(Runnable r, long ns) {
88 super(r);
89 this.time = ns;
90 this.period = 0;
91 rateBased = false;
92 this.sequenceNumber = sequencer.getAndIncrement();
93 }
94
95 /**
96 * Creates a one-shot action with given nanoTime-based trigger time
97 * but does not establish the action (need for Future subclass).
98 */
99 ScheduledCancellableTask(long ns) {
100 super();
101 this.time = ns;
102 this.period = 0;
103 rateBased = false;
104 this.sequenceNumber = sequencer.getAndIncrement();
105 }
106
107 /**
108 * Creates a periodic action with given nano time and period
109 */
110 ScheduledCancellableTask(Runnable r, long ns, long period, boolean rateBased) {
111 super(r);
112 if (period <= 0)
113 throw new IllegalArgumentException();
114 this.time = ns;
115 this.period = period;
116 this.rateBased = rateBased;
117 this.sequenceNumber = sequencer.getAndIncrement();
118 }
119
120
121 public long getDelay(TimeUnit unit) {
122 long d = unit.convert(time - System.nanoTime(),
123 TimeUnit.NANOSECONDS);
124 return d;
125 }
126
127 public int compareTo(Object other) {
128 if (other == this) // compare zero ONLY if same object
129 return 0;
130 ScheduledCancellableTask x = (ScheduledCancellableTask)other;
131 long diff = time - x.time;
132 if (diff < 0)
133 return -1;
134 else if (diff > 0)
135 return 1;
136 else if (sequenceNumber < x.sequenceNumber)
137 return -1;
138 else
139 return 1;
140 }
141
142 /**
143 * Return true if this is a periodic (not a one-shot) action.
144 * @return true if periodic
145 */
146 public boolean isPeriodic() {
147 return period > 0;
148 }
149
150 /**
151 * Returns the period, or zero if non-periodic.
152 *
153 * @return the period
154 */
155 public long getPeriod(TimeUnit unit) {
156 return unit.convert(period, TimeUnit.NANOSECONDS);
157 }
158
159 /**
160 * Override run method not to setDone if periodic
161 */
162 public void run() {
163 if (setRunning()) {
164 try {
165 getRunnable().run();
166 } finally {
167 if (!isPeriodic())
168 setDone();
169 }
170 }
171 }
172
173 /**
174 * Return a new ScheduledCancellable task (which may be the
175 * this task) that will trigger in the period subsequent to
176 * current task, or null if non-periodic or cancelled.
177 */
178 ScheduledCancellableTask nextTask() {
179 if (period <= 0 || !reset())
180 return null;
181 time = period + (rateBased ? time : System.nanoTime());
182 sequenceNumber = sequencer.getAndIncrement();
183 return this;
184 }
185 }
186
187 private static class ScheduledFutureTask<V>
188 extends ScheduledCancellableTask implements ScheduledFuture<V> {
189
190 /**
191 * Creates a ScheduledFuture that may trigger after the given delay.
192 */
193 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
194 // must set callable after super ctor call to use inner class
195 super(triggerTime);
196 setRunnable(new InnerCancellableFuture<V>(callable));
197 }
198
199 public V get() throws InterruptedException, ExecutionException {
200 return ((InnerCancellableFuture<V>)getRunnable()).get();
201 }
202
203 public V get(long timeout, TimeUnit unit)
204 throws InterruptedException, ExecutionException, TimeoutException {
205 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
206 }
207
208 protected void set(V v) {
209 ((InnerCancellableFuture<V>)getRunnable()).set(v);
210 }
211
212 protected void setException(Throwable t) {
213 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
214 }
215 }
216
217
218 /**
219 * An annoying wrapper class to convince generics compiler to
220 * use a DelayQueue<ScheduledCancellableTask> as a BlockingQueue<Runnable>
221 */
222 private static class DelayedWorkQueue
223 extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
224
225 private final DelayQueue<ScheduledCancellableTask> dq = new DelayQueue<ScheduledCancellableTask>();
226 public Runnable poll() { return dq.poll(); }
227 public Runnable peek() { return dq.peek(); }
228 public Runnable take() throws InterruptedException { return dq.take(); }
229 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
230 return dq.poll(timeout, unit);
231 }
232
233 public boolean add(Runnable x) { return dq.add((ScheduledCancellableTask)x); }
234 public boolean offer(Runnable x) { return dq.offer((ScheduledCancellableTask)x); }
235 public void put(Runnable x) throws InterruptedException {
236 dq.put((ScheduledCancellableTask)x);
237 }
238 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
239 return dq.offer((ScheduledCancellableTask)x, timeout, unit);
240 }
241
242 public Runnable remove() { return dq.remove(); }
243 public Runnable element() { return dq.element(); }
244 public void clear() { dq.clear(); }
245
246 public int remainingCapacity() { return dq.remainingCapacity(); }
247 public boolean remove(Object x) { return dq.remove(x); }
248 public boolean contains(Object x) { return dq.contains(x); }
249 public int size() { return dq.size(); }
250 public boolean isEmpty() { return dq.isEmpty(); }
251 public Object[] toArray() { return dq.toArray(); }
252 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
253 public Iterator<Runnable> iterator() {
254 return new Iterator<Runnable>() {
255 private Iterator<ScheduledCancellableTask> it = dq.iterator();
256 public boolean hasNext() { return it.hasNext(); }
257 public Runnable next() { return it.next(); }
258 public void remove() { it.remove(); }
259 };
260 }
261 }
262
263 /**
264 * Creates a new ScheduledExecutor with the given initial parameters.
265 *
266 * @param corePoolSize the number of threads to keep in the pool,
267 * even if they are idle.
268 */
269 public ScheduledExecutor(int corePoolSize) {
270 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
271 new DelayedWorkQueue());
272 }
273
274 /**
275 * Creates a new ScheduledExecutor with the given initial parameters.
276 *
277 * @param corePoolSize the number of threads to keep in the pool,
278 * even if they are idle.
279 * @param threadFactory the factory to use when the executor
280 * creates a new thread.
281 * @throws NullPointerException if threadFactory is null
282 */
283 public ScheduledExecutor(int corePoolSize,
284 ThreadFactory threadFactory) {
285 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
286 new DelayedWorkQueue(), threadFactory);
287 }
288
289 /**
290 * Creates a new ScheduledExecutor with the given initial parameters.
291 *
292 * @param corePoolSize the number of threads to keep in the pool,
293 * even if they are idle.
294 * @param handler the handler to use when execution is blocked
295 * because the thread bounds and queue capacities are reached.
296 * @throws NullPointerException if handler is null
297 */
298 public ScheduledExecutor(int corePoolSize,
299 RejectedExecutionHandler handler) {
300 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
301 new DelayedWorkQueue(), handler);
302 }
303
304 /**
305 * Creates a new ScheduledExecutor with the given initial parameters.
306 *
307 * @param corePoolSize the number of threads to keep in the pool,
308 * even if they are idle.
309 * @param threadFactory the factory to use when the executor
310 * creates a new thread.
311 * @param handler the handler to use when execution is blocked
312 * because the thread bounds and queue capacities are reached.
313 * @throws NullPointerException if threadFactory or handler is null
314 */
315 public ScheduledExecutor(int corePoolSize,
316 ThreadFactory threadFactory,
317 RejectedExecutionHandler handler) {
318 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
319 new DelayedWorkQueue(), threadFactory, handler);
320 }
321
322 /**
323 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
324 */
325 private void delayedExecute(Runnable command) {
326 if (isShutdown()) {
327 reject(command);
328 return;
329 }
330 // Prestart a thread if necessary. We cannot prestart it
331 // running the task because the task (probably) shouldn't be
332 // run yet, so thread will just idle until delay elapses.
333 if (getPoolSize() < getCorePoolSize())
334 prestartCoreThread();
335
336 super.getQueue().offer(command);
337 }
338
339 /**
340 * Creates and executes a one-shot action that becomes enabled after
341 * the given delay.
342 * @param command the task to execute.
343 * @param delay the time from now to delay execution.
344 * @param unit the time unit of the delay parameter.
345 * @return a handle that can be used to cancel the task.
346 * @throws RejectedExecutionException if task cannot be scheduled
347 * for execution because the executor has been shut down.
348 */
349
350 public ScheduledCancellable schedule(Runnable command, long delay, TimeUnit unit) {
351 long triggerTime = System.nanoTime() + unit.toNanos(delay);
352 ScheduledCancellableTask t = new ScheduledCancellableTask(command, triggerTime);
353 delayedExecute(t);
354 return t;
355 }
356
357 /**
358 * Creates and executes a periodic action that becomes enabled first
359 * after the given initial delay, and subsequently with the given
360 * period; that is executions will commence after
361 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
362 * <tt>initialDelay + 2 * period</tt>, and so on. The
363 * task will only terminate via cancellation.
364 * @param command the task to execute.
365 * @param initialDelay the time to delay first execution.
366 * @param period the period between successive executions.
367 * @param unit the time unit of the delay and period parameters
368 * @return a handle that can be used to cancel the task.
369 * @throws RejectedExecutionException if task cannot be scheduled
370 * for execution because the executor has been shut down.
371 */
372 public ScheduledCancellable scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
373 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
374 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
375 triggerTime,
376 unit.toNanos(period),
377 true);
378 delayedExecute(t);
379 return t;
380 }
381
382 /**
383 * Creates and executes a periodic action that becomes enabled first
384 * after the given initial delay, and and subsequently with the
385 * given delay between the termination of one execution and the
386 * commencement of the next.
387 * The task will only terminate via cancellation.
388 * @param command the task to execute.
389 * @param initialDelay the time to delay first execution.
390 * @param delay the delay between the termination of one
391 * execution and the commencement of the next.
392 * @param unit the time unit of the delay and delay parameters
393 * @return a handle that can be used to cancel the task.
394 * @throws RejectedExecutionException if task cannot be scheduled
395 * for execution because the executor has been shut down.
396 */
397 public ScheduledCancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
398 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
399 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
400 triggerTime,
401 unit.toNanos(delay),
402 false);
403 delayedExecute(t);
404 return t;
405 }
406
407 /**
408 * Creates and executes a ScheduledFuture that becomes enabled after the
409 * given delay.
410 * @param callable the function to execute.
411 * @param delay the time from now to delay execution.
412 * @param unit the time unit of the delay parameter.
413 * @return a ScheduledFuture that can be used to extract result or cancel.
414 * @throws RejectedExecutionException if task cannot be scheduled
415 * for execution because the executor has been shut down.
416 */
417 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
418 long triggerTime = System.nanoTime() + unit.toNanos(delay);
419 ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
420 delayedExecute(t);
421 return t;
422 }
423
424 /**
425 * Execute command with zero required delay. This has effect
426 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
427 * that inspections of the queue and of the list returned by
428 * <tt>shutdownNow</tt> will access the zero-delayed
429 * {@link ScheduledCancellable}, not the <tt>command</tt> itself.
430 *
431 * @param command the task to execute
432 * @throws RejectedExecutionException at discretion of
433 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
434 * for execution because the executor has been shut down.
435 */
436 public void execute(Runnable command) {
437 schedule(command, 0, TimeUnit.NANOSECONDS);
438 }
439
440
441 /**
442 * Set policy on whether to continue executing existing periodic
443 * tasks even when this executor has been <tt>shutdown</tt>. In
444 * this case, these tasks will only terminate upon
445 * <tt>shutdownNow</tt>, or after setting the policy to
446 * <tt>false</tt> when already shutdown. This value is by default
447 * false.
448 * @param value if true, continue after shutdown, else don't.
449 */
450 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
451 continueExistingPeriodicTasksAfterShutdown = value;
452 if (!value && isShutdown())
453 cancelUnwantedTasks();
454 }
455
456 /**
457 * Get the policy on whether to continue executing existing
458 * periodic tasks even when this executor has been
459 * <tt>shutdown</tt>. In this case, these tasks will only
460 * terminate upon <tt>shutdownNow</tt> or after setting the policy
461 * to <tt>false</tt> when already shutdown. This value is by
462 * default false.
463 * @return true if will continue after shutdown.
464 */
465 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
466 return continueExistingPeriodicTasksAfterShutdown;
467 }
468
469 /**
470 * Set policy on whether to execute existing delayed
471 * tasks even when this executor has been <tt>shutdown</tt>. In
472 * this case, these tasks will only terminate upon
473 * <tt>shutdownNow</tt>, or after setting the policy to
474 * <tt>false</tt> when already shutdown. This value is by default
475 * true.
476 * @param value if true, execute after shutdown, else don't.
477 */
478 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
479 executeExistingDelayedTasksAfterShutdown = value;
480 if (!value && isShutdown())
481 cancelUnwantedTasks();
482 }
483
484 /**
485 * Get policy on whether to execute existing delayed
486 * tasks even when this executor has been <tt>shutdown</tt>. In
487 * this case, these tasks will only terminate upon
488 * <tt>shutdownNow</tt>, or after setting the policy to
489 * <tt>false</tt> when already shutdown. This value is by default
490 * true.
491 * @return true if will execute after shutdown.
492 */
493 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
494 return executeExistingDelayedTasksAfterShutdown;
495 }
496
497 /**
498 * Cancel and clear the queue of all tasks that should not be run
499 * due to shutdown policy.
500 */
501 private void cancelUnwantedTasks() {
502 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
503 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
504 if (!keepDelayed && !keepPeriodic)
505 super.getQueue().clear();
506 else if (keepDelayed || keepPeriodic) {
507 Object[] entries = super.getQueue().toArray();
508 for (int i = 0; i < entries.length; ++i) {
509 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
510 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
511 t.cancel(false);
512 }
513 entries = null;
514 purge();
515 }
516 }
517
518 /**
519 * Initiates an orderly shutdown in which previously submitted
520 * tasks are executed, but no new tasks will be accepted. If the
521 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
522 * been set <tt>false</tt>, existing delayed tasks whose delays
523 * have not yet elapsed are cancelled. And unless the
524 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> hase
525 * been set <tt>true</tt>, future executions of existing periodic
526 * tasks will be cancelled.
527 */
528 public void shutdown() {
529 cancelUnwantedTasks();
530 super.shutdown();
531 }
532
533 /**
534 * Attempts to stop all actively executing tasks, halts the
535 * processing of waiting tasks, and returns a list of the tasks that were
536 * awaiting execution.
537 *
538 * <p>There are no guarantees beyond best-effort attempts to stop
539 * processing actively executing tasks. This implementations
540 * cancels via {@link Thread#interrupt}, so if any tasks mask or
541 * fail to respond to interrupts, they may never terminate.
542 *
543 * @return list of tasks that never commenced execution. Each
544 * element of this list is a {@link ScheduledCancellable},
545 * including those tasks submitted using <tt>execute</tt> which
546 * are for scheduling purposes used as the basis of a zero-delay
547 * <tt>ScheduledCancellable</tt>.
548 */
549 public List shutdownNow() {
550 return super.shutdownNow();
551 }
552
553 /**
554 * Removes this task from internal queue if it is present, thus
555 * causing it not to be run if it has not already started. This
556 * method may be useful as one part of a cancellation scheme.
557 *
558 * @param task the task to remove
559 * @return true if the task was removed
560 */
561 public boolean remove(Runnable task) {
562 if (task instanceof ScheduledCancellable)
563 return super.remove(task);
564
565 // The task might actually have been wrapped as a ScheduledCancellable
566 // in execute(), in which case we need to maually traverse
567 // looking for it.
568
569 ScheduledCancellable wrap = null;
570 Object[] entries = super.getQueue().toArray();
571 for (int i = 0; i < entries.length; ++i) {
572 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
573 Runnable r = t.getRunnable();
574 if (task.equals(r)) {
575 wrap = t;
576 break;
577 }
578 }
579 entries = null;
580 return wrap != null && super.getQueue().remove(wrap);
581 }
582
583
584 /**
585 * Returns the task queue used by this executor. Each element of
586 * this queue is a {@link ScheduledCancellable}, including those
587 * tasks submitted using <tt>execute</tt> which are for scheduling
588 * purposes used as the basis of a zero-delay
589 * <tt>ScheduledCancellable</tt>. Iteration over this queue is
590 * </em>not</em> guaranteed to travserse tasks in the order in
591 * which they will execute.
592 *
593 * @return the task queue
594 */
595 public BlockingQueue<Runnable> getQueue() {
596 return super.getQueue();
597 }
598
599 /**
600 * Override of <tt>Executor</tt> hook method to support periodic
601 * tasks. If the executed task was periodic, causes the task for
602 * the next period to execute.
603 * @param r the task (assumed to be a ScheduledCancellable)
604 * @param t the exception
605 */
606 protected void afterExecute(Runnable r, Throwable t) {
607 super.afterExecute(r, t);
608 ScheduledCancellableTask next = ((ScheduledCancellableTask)r).nextTask();
609 if (next != null &&
610 (!isShutdown() ||
611 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
612 !isTerminating())))
613 super.getQueue().offer(next);
614
615 // This might have been the final executed delayed task. Wake
616 // up threads to check.
617 else if (isShutdown())
618 interruptIdleWorkers();
619 }
620 }