ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.30
Committed: Thu Sep 25 11:01:22 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.29: +15 -0 lines
Log Message:
Consistently throw NPE for execute(null)

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An {@link Executor} that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * {@link java.util.Timer} when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * {@link ThreadPoolExecutor} (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled,
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission.
31 *
32 * <p>All <t>schedule</tt> methods accept <em>relative</em> delays and
33 * periods as arguments, not absolute times or dates. It is a simple
34 * matter to transform an absolute time represented as a
35 * {@link java.util.Date}, to the required form. For example, to
36 * schedule at a certain future <tt>date</tt>, you can use:
37 * <tt>schedule(task, date.getTime() - System.currentTimeMillis(),
38 * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
39 * relative delay need not coincide with the current <tt>Date</tt> at
40 * which the task is enabled due to network time synchronization
41 * protocols, clock drift, or other factors.
42 *
43 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 * of the inherited tuning methods are not especially useful for
45 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
46 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
47 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
48 * useful effect.
49 *
50 * @since 1.5
51 * @author Doug Lea
52 */
53 public class ScheduledExecutor extends ThreadPoolExecutor {
54
55 /**
56 * False if should cancel/suppress periodic tasks on shutdown.
57 */
58 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
59
60 /**
61 * False if should cancel non-periodic tasks on shutdown.
62 */
63 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
64
65
66 /**
67 * Sequence number to break scheduling ties, and in turn to
68 * guarantee FIFO order among tied entries.
69 */
70 private static final AtomicLong sequencer = new AtomicLong(0);
71
72 private static class ScheduledCancellableTask
73 extends CancellableTask implements ScheduledCancellable {
74
75 /** Sequence number to break ties FIFO */
76 private final long sequenceNumber;
77 /** The time the task is enabled to execute in nanoTime units */
78 private long time;
79 /** The delay following next time, or <= 0 if non-periodic */
80 private final long period;
81 /** true if at fixed rate; false if fixed delay */
82 private final boolean rateBased;
83
84 /**
85 * Creates a one-shot action with given nanoTime-based trigger time
86 */
87 ScheduledCancellableTask(Runnable r, long ns) {
88 super(r);
89 this.time = ns;
90 this.period = 0;
91 rateBased = false;
92 this.sequenceNumber = sequencer.getAndIncrement();
93 }
94
95 /**
96 * Creates a one-shot action with given nanoTime-based trigger
97 * time but does not establish the action. (This is needed for
98 * the Future-based subclass).
99 */
100 ScheduledCancellableTask(long ns) {
101 super();
102 this.time = ns;
103 this.period = 0;
104 rateBased = false;
105 this.sequenceNumber = sequencer.getAndIncrement();
106 }
107
108 /**
109 * Creates a periodic action with given nano time and period
110 */
111 ScheduledCancellableTask(Runnable r, long ns, long period, boolean rateBased) {
112 super(r);
113 if (period <= 0)
114 throw new IllegalArgumentException();
115 this.time = ns;
116 this.period = period;
117 this.rateBased = rateBased;
118 this.sequenceNumber = sequencer.getAndIncrement();
119 }
120
121
122 public long getDelay(TimeUnit unit) {
123 long d = unit.convert(time - System.nanoTime(),
124 TimeUnit.NANOSECONDS);
125 return d;
126 }
127
128 public int compareTo(Object other) {
129 if (other == this) // compare zero ONLY if same object
130 return 0;
131 ScheduledCancellableTask x = (ScheduledCancellableTask)other;
132 long diff = time - x.time;
133 if (diff < 0)
134 return -1;
135 else if (diff > 0)
136 return 1;
137 else if (sequenceNumber < x.sequenceNumber)
138 return -1;
139 else
140 return 1;
141 }
142
143 /**
144 * Return true if this is a periodic (not a one-shot) action.
145 * @return true if periodic
146 */
147 public boolean isPeriodic() {
148 return period > 0;
149 }
150
151 /**
152 * Returns the period, or zero if non-periodic.
153 *
154 * @return the period
155 */
156 public long getPeriod(TimeUnit unit) {
157 return unit.convert(period, TimeUnit.NANOSECONDS);
158 }
159
160 /**
161 * Overrides CancellableTask version so as to not setDone if
162 * periodic.
163 */
164 public void run() {
165 if (setRunning()) {
166 try {
167 getRunnable().run();
168 } finally {
169 if (!isPeriodic())
170 setDone();
171 }
172 }
173 }
174
175 /**
176 * Return a ScheduledCancellable task (which may be this task)
177 * that will trigger in the period subsequent to current task,
178 * or null if non-periodic or cancelled.
179 */
180 ScheduledCancellableTask nextTask() {
181 if (period <= 0 || !reset())
182 return null;
183 time = period + (rateBased ? time : System.nanoTime());
184 return this;
185 }
186 }
187
188 private static class ScheduledFutureTask<V>
189 extends ScheduledCancellableTask implements ScheduledFuture<V> {
190
191 /**
192 * Creates a ScheduledFuture that may trigger after the given delay.
193 */
194 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
195 // must set callable after super ctor call to use inner class
196 super(triggerTime);
197 setRunnable(new InnerCancellableFuture<V>(callable));
198 }
199
200 public V get() throws InterruptedException, ExecutionException {
201 return ((InnerCancellableFuture<V>)getRunnable()).get();
202 }
203
204 public V get(long timeout, TimeUnit unit)
205 throws InterruptedException, ExecutionException, TimeoutException {
206 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
207 }
208
209 protected void set(V v) {
210 ((InnerCancellableFuture<V>)getRunnable()).set(v);
211 }
212
213 protected void setException(Throwable t) {
214 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
215 }
216 }
217
218
219 /**
220 * An annoying wrapper class to convince generics compiler to
221 * use a DelayQueue<ScheduledCancellableTask> as a BlockingQueue<Runnable>
222 */
223 private static class DelayedWorkQueue
224 extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
225
226 private final DelayQueue<ScheduledCancellableTask> dq = new DelayQueue<ScheduledCancellableTask>();
227 public Runnable poll() { return dq.poll(); }
228 public Runnable peek() { return dq.peek(); }
229 public Runnable take() throws InterruptedException { return dq.take(); }
230 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
231 return dq.poll(timeout, unit);
232 }
233
234 public boolean add(Runnable x) { return dq.add((ScheduledCancellableTask)x); }
235 public boolean offer(Runnable x) { return dq.offer((ScheduledCancellableTask)x); }
236 public void put(Runnable x) {
237 dq.put((ScheduledCancellableTask)x);
238 }
239 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
240 return dq.offer((ScheduledCancellableTask)x, timeout, unit);
241 }
242
243 public Runnable remove() { return dq.remove(); }
244 public Runnable element() { return dq.element(); }
245 public void clear() { dq.clear(); }
246
247 public int remainingCapacity() { return dq.remainingCapacity(); }
248 public boolean remove(Object x) { return dq.remove(x); }
249 public boolean contains(Object x) { return dq.contains(x); }
250 public int size() { return dq.size(); }
251 public boolean isEmpty() { return dq.isEmpty(); }
252 public Object[] toArray() { return dq.toArray(); }
253 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
254 public Iterator<Runnable> iterator() {
255 return new Iterator<Runnable>() {
256 private Iterator<ScheduledCancellableTask> it = dq.iterator();
257 public boolean hasNext() { return it.hasNext(); }
258 public Runnable next() { return it.next(); }
259 public void remove() { it.remove(); }
260 };
261 }
262 }
263
264 /**
265 * Creates a new ScheduledExecutor with the given core pool size.
266 *
267 * @param corePoolSize the number of threads to keep in the pool,
268 * even if they are idle.
269 */
270 public ScheduledExecutor(int corePoolSize) {
271 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
272 new DelayedWorkQueue());
273 }
274
275 /**
276 * Creates a new ScheduledExecutor with the given initial parameters.
277 *
278 * @param corePoolSize the number of threads to keep in the pool,
279 * even if they are idle.
280 * @param threadFactory the factory to use when the executor
281 * creates a new thread.
282 * @throws NullPointerException if threadFactory is null
283 */
284 public ScheduledExecutor(int corePoolSize,
285 ThreadFactory threadFactory) {
286 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
287 new DelayedWorkQueue(), threadFactory);
288 }
289
290 /**
291 * Creates a new ScheduledExecutor with the given initial parameters.
292 *
293 * @param corePoolSize the number of threads to keep in the pool,
294 * even if they are idle.
295 * @param handler the handler to use when execution is blocked
296 * because the thread bounds and queue capacities are reached.
297 * @throws NullPointerException if handler is null
298 */
299 public ScheduledExecutor(int corePoolSize,
300 RejectedExecutionHandler handler) {
301 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
302 new DelayedWorkQueue(), handler);
303 }
304
305 /**
306 * Creates a new ScheduledExecutor with the given initial parameters.
307 *
308 * @param corePoolSize the number of threads to keep in the pool,
309 * even if they are idle.
310 * @param threadFactory the factory to use when the executor
311 * creates a new thread.
312 * @param handler the handler to use when execution is blocked
313 * because the thread bounds and queue capacities are reached.
314 * @throws NullPointerException if threadFactory or handler is null
315 */
316 public ScheduledExecutor(int corePoolSize,
317 ThreadFactory threadFactory,
318 RejectedExecutionHandler handler) {
319 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
320 new DelayedWorkQueue(), threadFactory, handler);
321 }
322
323 /**
324 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
325 */
326 private void delayedExecute(Runnable command) {
327 if (isShutdown()) {
328 reject(command);
329 return;
330 }
331 // Prestart a thread if necessary. We cannot prestart it
332 // running the task because the task (probably) shouldn't be
333 // run yet, so thread will just idle until delay elapses.
334 if (getPoolSize() < getCorePoolSize())
335 prestartCoreThread();
336
337 super.getQueue().offer(command);
338 }
339
340 /**
341 * Creates and executes a one-shot action that becomes enabled after
342 * the given delay.
343 * @param command the task to execute.
344 * @param delay the time from now to delay execution.
345 * @param unit the time unit of the delay parameter.
346 * @return a handle that can be used to cancel the task.
347 * @throws RejectedExecutionException if task cannot be scheduled
348 * for execution because the executor has been shut down.
349 * @throws NullPointerException if command is null
350 */
351
352 public ScheduledCancellable schedule(Runnable command, long delay, TimeUnit unit) {
353 if (command == null)
354 throw new NullPointerException();
355 long triggerTime = System.nanoTime() + unit.toNanos(delay);
356 ScheduledCancellableTask t = new ScheduledCancellableTask(command, triggerTime);
357 delayedExecute(t);
358 return t;
359 }
360
361 /**
362 * Creates and executes a ScheduledFuture that becomes enabled after the
363 * given delay.
364 * @param callable the function to execute.
365 * @param delay the time from now to delay execution.
366 * @param unit the time unit of the delay parameter.
367 * @return a ScheduledFuture that can be used to extract result or cancel.
368 * @throws RejectedExecutionException if task cannot be scheduled
369 * for execution because the executor has been shut down.
370 * @throws NullPointerException if callable is null
371 */
372 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
373 if (callable == null)
374 throw new NullPointerException();
375 long triggerTime = System.nanoTime() + unit.toNanos(delay);
376 ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
377 delayedExecute(t);
378 return t;
379 }
380
381 /**
382 * Creates and executes a periodic action that becomes enabled first
383 * after the given initial delay, and subsequently with the given
384 * period; that is executions will commence after
385 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
386 * <tt>initialDelay + 2 * period</tt>, and so on. The
387 * task will only terminate via cancellation.
388 * @param command the task to execute.
389 * @param initialDelay the time to delay first execution.
390 * @param period the period between successive executions.
391 * @param unit the time unit of the delay and period parameters
392 * @return a handle that can be used to cancel the task.
393 * @throws RejectedExecutionException if task cannot be scheduled
394 * for execution because the executor has been shut down.
395 * @throws NullPointerException if command is null
396 */
397 public ScheduledCancellable scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
398 if (command == null)
399 throw new NullPointerException();
400 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
401 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
402 triggerTime,
403 unit.toNanos(period),
404 true);
405 delayedExecute(t);
406 return t;
407 }
408
409 /**
410 * Creates and executes a periodic action that becomes enabled first
411 * after the given initial delay, and and subsequently with the
412 * given delay between the termination of one execution and the
413 * commencement of the next.
414 * The task will only terminate via cancellation.
415 * @param command the task to execute.
416 * @param initialDelay the time to delay first execution.
417 * @param delay the delay between the termination of one
418 * execution and the commencement of the next.
419 * @param unit the time unit of the delay and delay parameters
420 * @return a handle that can be used to cancel the task.
421 * @throws RejectedExecutionException if task cannot be scheduled
422 * for execution because the executor has been shut down.
423 * @throws NullPointerException if command is null
424 */
425 public ScheduledCancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
426 if (command == null)
427 throw new NullPointerException();
428 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
429 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
430 triggerTime,
431 unit.toNanos(delay),
432 false);
433 delayedExecute(t);
434 return t;
435 }
436
437
438 /**
439 * Execute command with zero required delay. This has effect
440 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
441 * that inspections of the queue and of the list returned by
442 * <tt>shutdownNow</tt> will access the zero-delayed
443 * {@link ScheduledCancellable}, not the <tt>command</tt> itself.
444 *
445 * @param command the task to execute
446 * @throws RejectedExecutionException at discretion of
447 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
448 * for execution because the executor has been shut down.
449 * @throws NullPointerException if command is null
450 */
451 public void execute(Runnable command) {
452 if (command == null)
453 throw new NullPointerException();
454 schedule(command, 0, TimeUnit.NANOSECONDS);
455 }
456
457
458 /**
459 * Set policy on whether to continue executing existing periodic
460 * tasks even when this executor has been <tt>shutdown</tt>. In
461 * this case, these tasks will only terminate upon
462 * <tt>shutdownNow</tt>, or after setting the policy to
463 * <tt>false</tt> when already shutdown. This value is by default
464 * false.
465 * @param value if true, continue after shutdown, else don't.
466 */
467 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
468 continueExistingPeriodicTasksAfterShutdown = value;
469 if (!value && isShutdown())
470 cancelUnwantedTasks();
471 }
472
473 /**
474 * Get the policy on whether to continue executing existing
475 * periodic tasks even when this executor has been
476 * <tt>shutdown</tt>. In this case, these tasks will only
477 * terminate upon <tt>shutdownNow</tt> or after setting the policy
478 * to <tt>false</tt> when already shutdown. This value is by
479 * default false.
480 * @return true if will continue after shutdown.
481 */
482 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
483 return continueExistingPeriodicTasksAfterShutdown;
484 }
485
486 /**
487 * Set policy on whether to execute existing delayed
488 * tasks even when this executor has been <tt>shutdown</tt>. In
489 * this case, these tasks will only terminate upon
490 * <tt>shutdownNow</tt>, or after setting the policy to
491 * <tt>false</tt> when already shutdown. This value is by default
492 * true.
493 * @param value if true, execute after shutdown, else don't.
494 */
495 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
496 executeExistingDelayedTasksAfterShutdown = value;
497 if (!value && isShutdown())
498 cancelUnwantedTasks();
499 }
500
501 /**
502 * Get policy on whether to execute existing delayed
503 * tasks even when this executor has been <tt>shutdown</tt>. In
504 * this case, these tasks will only terminate upon
505 * <tt>shutdownNow</tt>, or after setting the policy to
506 * <tt>false</tt> when already shutdown. This value is by default
507 * true.
508 * @return true if will execute after shutdown.
509 */
510 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
511 return executeExistingDelayedTasksAfterShutdown;
512 }
513
514 /**
515 * Cancel and clear the queue of all tasks that should not be run
516 * due to shutdown policy.
517 */
518 private void cancelUnwantedTasks() {
519 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
520 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
521 if (!keepDelayed && !keepPeriodic)
522 super.getQueue().clear();
523 else if (keepDelayed || keepPeriodic) {
524 Object[] entries = super.getQueue().toArray();
525 for (int i = 0; i < entries.length; ++i) {
526 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
527 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
528 t.cancel(false);
529 }
530 entries = null;
531 purge();
532 }
533 }
534
535 /**
536 * Initiates an orderly shutdown in which previously submitted
537 * tasks are executed, but no new tasks will be accepted. If the
538 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
539 * been set <tt>false</tt>, existing delayed tasks whose delays
540 * have not yet elapsed are cancelled. And unless the
541 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
542 * been set <tt>true</tt>, future executions of existing periodic
543 * tasks will be cancelled.
544 */
545 public void shutdown() {
546 cancelUnwantedTasks();
547 super.shutdown();
548 }
549
550 /**
551 * Attempts to stop all actively executing tasks, halts the
552 * processing of waiting tasks, and returns a list of the tasks that were
553 * awaiting execution.
554 *
555 * <p>There are no guarantees beyond best-effort attempts to stop
556 * processing actively executing tasks. This implementations
557 * cancels via {@link Thread#interrupt}, so if any tasks mask or
558 * fail to respond to interrupts, they may never terminate.
559 *
560 * @return list of tasks that never commenced execution. Each
561 * element of this list is a {@link ScheduledCancellable},
562 * including those tasks submitted using <tt>execute</tt> which
563 * are for scheduling purposes used as the basis of a zero-delay
564 * <tt>ScheduledCancellable</tt>.
565 */
566 public List shutdownNow() {
567 return super.shutdownNow();
568 }
569
570 /**
571 * Removes this task from internal queue if it is present, thus
572 * causing it not to be run if it has not already started. This
573 * method may be useful as one part of a cancellation scheme.
574 *
575 * @param task the task to remove
576 * @return true if the task was removed
577 */
578 public boolean remove(Runnable task) {
579 if (task instanceof ScheduledCancellable)
580 return super.remove(task);
581
582 // The task might actually have been wrapped as a ScheduledCancellable
583 // in execute(), in which case we need to maually traverse
584 // looking for it.
585
586 ScheduledCancellable wrap = null;
587 Object[] entries = super.getQueue().toArray();
588 for (int i = 0; i < entries.length; ++i) {
589 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
590 Runnable r = t.getRunnable();
591 if (task.equals(r)) {
592 wrap = t;
593 break;
594 }
595 }
596 entries = null;
597 return wrap != null && super.getQueue().remove(wrap);
598 }
599
600
601 /**
602 * Returns the task queue used by this executor. Each element of
603 * this queue is a {@link ScheduledCancellable}, including those
604 * tasks submitted using <tt>execute</tt> which are for scheduling
605 * purposes used as the basis of a zero-delay
606 * <tt>ScheduledCancellable</tt>. Iteration over this queue is
607 * </em>not</em> guaranteed to travserse tasks in the order in
608 * which they will execute.
609 *
610 * @return the task queue
611 */
612 public BlockingQueue<Runnable> getQueue() {
613 return super.getQueue();
614 }
615
616 /**
617 * Override of <tt>Executor</tt> hook method to support periodic
618 * tasks. If the executed task was periodic, causes the task for
619 * the next period to execute.
620 * @param r the task (assumed to be a ScheduledCancellable)
621 * @param t the exception
622 */
623 protected void afterExecute(Runnable r, Throwable t) {
624 super.afterExecute(r, t);
625 ScheduledCancellableTask next = ((ScheduledCancellableTask)r).nextTask();
626 if (next != null &&
627 (!isShutdown() ||
628 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
629 !isTerminating())))
630 super.getQueue().offer(next);
631
632 // This might have been the final executed delayed task. Wake
633 // up threads to check.
634 else if (isShutdown())
635 interruptIdleWorkers();
636 }
637 }