ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.23
Committed: Mon Aug 25 00:03:57 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.22: +1 -1 lines
Log Message:
Fixed typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An <tt>Executor</tt> that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled,
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission.
31 *
32 * <p>All <t>schedule</tt> methods accept <em>relative</em> delays and
33 * periods as arguments, not absolute times or dates. It is a simple
34 * matter to transform an absolute time represented as a
35 * <tt>java.util.Date</tt>, to the required form. For example, to
36 * schedule at a certain future <tt>date</tt>, you can use:
37 * <tt>schedule(task, date.getTime() - System.currentTimeMillis(),
38 * TimeUnit.MILLISECONDS)</tt>. Beware however that expiration of a
39 * relative delay need not coincide with the current <tt>Date</tt> at
40 * which the task is enabled due to network time synchronization
41 * protocols, clock drift, or other factors.
42 *
43 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 * of the inherited tuning methods are not especially useful for
45 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
46 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
47 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
48 * useful effect.
49 *
50 * @since 1.5
51 * @see Executors
52 *
53 * @spec JSR-166
54 * @author Doug Lea
55 */
56 public class ScheduledExecutor extends ThreadPoolExecutor {
57
58 /**
59 * False if should cancel/suppress periodic tasks on shutdown.
60 */
61 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
62
63 /**
64 * False if should cancel non-periodic tasks on shutdown.
65 */
66 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
67
68
69 /**
70 * Sequence number to break scheduling ties, and in turn to
71 * guarantee FIFO order among tied entries.
72 */
73 private static final AtomicLong sequencer = new AtomicLong(0);
74
75 private static class ScheduledCancellableTask
76 extends CancellableTask implements ScheduledCancellable {
77
78 /** Sequence number to break ties FIFO */
79 private final long sequenceNumber;
80 /** The time the task is enabled to execute in nanoTime units */
81 private final long time;
82 /** The delay forllowing next time, or <= 0 if non-periodic */
83 private final long period;
84 /** true if at fixed rate; false if fixed delay */
85 private final boolean rateBased;
86
87 /**
88 * Creates a one-shot action with given nanoTime-based trigger time
89 */
90 ScheduledCancellableTask(Runnable r, long ns) {
91 super(r);
92 this.time = ns;
93 this.period = 0;
94 rateBased = false;
95 this.sequenceNumber = sequencer.getAndIncrement();
96 }
97
98 /**
99 * Creates a periodic action with given nano time and period
100 */
101 ScheduledCancellableTask(Runnable r, long ns, long period, boolean rateBased) {
102 super(r);
103 if (period <= 0)
104 throw new IllegalArgumentException();
105 this.time = ns;
106 this.period = period;
107 this.rateBased = rateBased;
108 this.sequenceNumber = sequencer.getAndIncrement();
109 }
110
111
112 public long getDelay(TimeUnit unit) {
113 long d = unit.convert(time - System.nanoTime(),
114 TimeUnit.NANOSECONDS);
115 return d;
116 }
117
118 public int compareTo(Object other) {
119 if (other == this) // compare zero ONLY if same object
120 return 0;
121 ScheduledCancellableTask x = (ScheduledCancellableTask)other;
122 long diff = time - x.time;
123 if (diff < 0)
124 return -1;
125 else if (diff > 0)
126 return 1;
127 else if (sequenceNumber < x.sequenceNumber)
128 return -1;
129 else
130 return 1;
131 }
132
133 /**
134 * Return true if this is a periodic (not a one-shot) action.
135 * @return true if periodic
136 */
137 public boolean isPeriodic() {
138 return period > 0;
139 }
140
141 /**
142 * Returns the period, or zero if non-periodic.
143 *
144 * @return the period
145 */
146 public long getPeriod(TimeUnit unit) {
147 return unit.convert(period, TimeUnit.NANOSECONDS);
148 }
149
150 /**
151 * Return a new ScheduledCancellable that will trigger in the period
152 * subsequent to current task, or null if non-periodic
153 * or canceled.
154 */
155 ScheduledCancellableTask nextTask() {
156 if (period <= 0 || isCancelled())
157 return null;
158 long nextTime = period + (rateBased ? time : System.nanoTime());
159 return new ScheduledCancellableTask(getRunnable(), nextTime, period, rateBased);
160 }
161 }
162
163 private static class ScheduledFutureTask<V>
164 extends ScheduledCancellableTask implements ScheduledFuture<V> {
165
166 /**
167 * Creates a ScheduledFuture that may trigger after the given delay.
168 */
169 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
170 // must set after super ctor call to use inner class
171 super(null, triggerTime);
172 setRunnable(new InnerCancellableFuture<V>(callable));
173 }
174
175 public V get() throws InterruptedException, ExecutionException {
176 return ((InnerCancellableFuture<V>)getRunnable()).get();
177 }
178
179 public V get(long timeout, TimeUnit unit)
180 throws InterruptedException, ExecutionException, TimeoutException {
181 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
182 }
183
184 protected void set(V v) {
185 ((InnerCancellableFuture<V>)getRunnable()).set(v);
186 }
187
188 protected void setException(Throwable t) {
189 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
190 }
191 }
192
193
194 /**
195 * An annoying wrapper class to convince generics compiler to
196 * use a DelayQueue<ScheduledCancellableTask> as a BlockingQueue<Runnable>
197 */
198 private static class DelayedWorkQueue
199 extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
200
201 private final DelayQueue<ScheduledCancellableTask> dq = new DelayQueue<ScheduledCancellableTask>();
202 public Runnable poll() { return dq.poll(); }
203 public Runnable peek() { return dq.peek(); }
204 public Runnable take() throws InterruptedException { return dq.take(); }
205 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
206 return dq.poll(timeout, unit);
207 }
208
209 public boolean add(Runnable x) { return dq.add((ScheduledCancellableTask)x); }
210 public boolean offer(Runnable x) { return dq.offer((ScheduledCancellableTask)x); }
211 public void put(Runnable x) throws InterruptedException {
212 dq.put((ScheduledCancellableTask)x);
213 }
214 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
215 return dq.offer((ScheduledCancellableTask)x, timeout, unit);
216 }
217
218 public Runnable remove() { return dq.remove(); }
219 public Runnable element() { return dq.element(); }
220 public void clear() { dq.clear(); }
221
222 public int remainingCapacity() { return dq.remainingCapacity(); }
223 public boolean remove(Object x) { return dq.remove(x); }
224 public boolean contains(Object x) { return dq.contains(x); }
225 public int size() { return dq.size(); }
226 public boolean isEmpty() { return dq.isEmpty(); }
227 public Iterator<Runnable> iterator() {
228 return new Iterator<Runnable>() {
229 private Iterator<ScheduledCancellableTask> it = dq.iterator();
230 public boolean hasNext() { return it.hasNext(); }
231 public Runnable next() { return it.next(); }
232 public void remove() { it.remove(); }
233 };
234 }
235 }
236
237 /**
238 * Creates a new ScheduledExecutor with the given initial parameters.
239 *
240 * @param corePoolSize the number of threads to keep in the pool,
241 * even if they are idle.
242 */
243 public ScheduledExecutor(int corePoolSize) {
244 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
245 new DelayedWorkQueue());
246 }
247
248 /**
249 * Creates a new ScheduledExecutor with the given initial parameters.
250 *
251 * @param corePoolSize the number of threads to keep in the pool,
252 * even if they are idle.
253 * @param threadFactory the factory to use when the executor
254 * creates a new thread.
255 */
256 public ScheduledExecutor(int corePoolSize,
257 ThreadFactory threadFactory) {
258 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
259 new DelayedWorkQueue(), threadFactory);
260 }
261
262 /**
263 * Creates a new ScheduledExecutor with the given initial parameters.
264 *
265 * @param corePoolSize the number of threads to keep in the pool,
266 * even if they are idle.
267 * @param handler the handler to use when execution is blocked
268 * because the thread bounds and queue capacities are reached.
269 */
270 public ScheduledExecutor(int corePoolSize,
271 RejectedExecutionHandler handler) {
272 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
273 new DelayedWorkQueue(), handler);
274 }
275
276 /**
277 * Creates a new ScheduledExecutor with the given initial parameters.
278 *
279 * @param corePoolSize the number of threads to keep in the pool,
280 * even if they are idle.
281 * @param threadFactory the factory to use when the executor
282 * creates a new thread.
283 * @param handler the handler to use when execution is blocked
284 * because the thread bounds and queue capacities are reached.
285 */
286 public ScheduledExecutor(int corePoolSize,
287 ThreadFactory threadFactory,
288 RejectedExecutionHandler handler) {
289 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
290 new DelayedWorkQueue(), threadFactory, handler);
291 }
292
293 /**
294 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
295 */
296 private void delayedExecute(Runnable command) {
297 if (isShutdown()) {
298 reject(command);
299 return;
300 }
301 // Prestart a thread if necessary. We cannot prestart it
302 // running the task because the task (probably) shouldn't be
303 // run yet, so thread will just idle until delay elapses.
304 if (getPoolSize() < getCorePoolSize())
305 prestartCoreThread();
306
307 super.getQueue().offer(command);
308 }
309
310 /**
311 * Creates and executes a one-shot action that becomes enabled after
312 * the given delay.
313 * @param command the task to execute.
314 * @param delay the time from now to delay execution.
315 * @param unit the time unit of the delay parameter.
316 * @return a handle that can be used to cancel the task.
317 * @throws RejectedExecutionException if task cannot be scheduled
318 * for execution because the executor has been shut down.
319 */
320
321 public ScheduledCancellable schedule(Runnable command, long delay, TimeUnit unit) {
322 long triggerTime = System.nanoTime() + unit.toNanos(delay);
323 ScheduledCancellableTask t = new ScheduledCancellableTask(command, triggerTime);
324 delayedExecute(t);
325 return t;
326 }
327
328 /**
329 * Creates and executes a periodic action that becomes enabled first
330 * after the given initial delay, and subsequently with the given
331 * period; that is executions will commence after
332 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
333 * <tt>initialDelay + 2 * period</tt>, and so on.
334 * @param command the task to execute.
335 * @param initialDelay the time to delay first execution.
336 * @param period the period between successive executions.
337 * @param unit the time unit of the delay and period parameters
338 * @return a handle that can be used to cancel the task.
339 * @throws RejectedExecutionException if task cannot be scheduled
340 * for execution because the executor has been shut down.
341 */
342 public ScheduledCancellable scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
343 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
344 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
345 triggerTime,
346 unit.toNanos(period),
347 true);
348 delayedExecute(t);
349 return t;
350 }
351
352
353 /**
354 * Creates and executes a periodic action that becomes enabled first
355 * after the given initial delay, and and subsequently with the
356 * given delay between the termination of one execution and the
357 * commencement of the next.
358 * @param command the task to execute.
359 * @param initialDelay the time to delay first execution.
360 * @param delay the delay between the termination of one
361 * execution and the commencement of the next.
362 * @param unit the time unit of the delay and delay parameters
363 * @return a handle that can be used to cancel the task.
364 * @throws RejectedExecutionException if task cannot be scheduled
365 * for execution because the executor has been shut down.
366 */
367 public ScheduledCancellable scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
368 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
369 ScheduledCancellableTask t = new ScheduledCancellableTask(command,
370 triggerTime,
371 unit.toNanos(delay),
372 false);
373 delayedExecute(t);
374 return t;
375 }
376
377 /**
378 * Creates and executes a ScheduledFuture that becomes enabled after the
379 * given delay.
380 * @param callable the function to execute.
381 * @param delay the time from now to delay execution.
382 * @param unit the time unit of the delay parameter.
383 * @return a ScheduledFuture that can be used to extract result or cancel.
384 * @throws RejectedExecutionException if task cannot be scheduled
385 * for execution because the executor has been shut down.
386 */
387 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
388 long triggerTime = System.nanoTime() + unit.toNanos(delay);
389 ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime);
390 delayedExecute(t);
391 return t;
392 }
393
394 /**
395 * Execute command with zero required delay. This has effect
396 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
397 * that inspections of the queue and of the list returned by
398 * <tt>shutdownNow</tt> will access the zero-delayed
399 * <tt>ScheduledCancellable</tt>, not the <tt>command</tt> itself.
400 *
401 * @param command the task to execute
402 * @throws RejectedExecutionException at discretion of
403 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
404 * for execution because the executor has been shut down.
405 */
406 public void execute(Runnable command) {
407 schedule(command, 0, TimeUnit.NANOSECONDS);
408 }
409
410
411 /**
412 * Set policy on whether to continue executing existing periodic
413 * tasks even when this executor has been <tt>shutdown</tt>. In
414 * this case, these tasks will only terminate upon
415 * <tt>shutdownNow</tt>, or after setting the policy to
416 * <tt>false</tt> when already shutdown. This value is by default
417 * false.
418 * @param value if true, continue after shutdown, else don't.
419 */
420 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
421 continueExistingPeriodicTasksAfterShutdown = value;
422 if (!value && isShutdown())
423 cancelUnwantedTasks();
424 }
425
426 /**
427 * Get the policy on whether to continue executing existing
428 * periodic tasks even when this executor has been
429 * <tt>shutdown</tt>. In this case, these tasks will only
430 * terminate upon <tt>shutdownNow</tt> or after setting the policy
431 * to <tt>false</tt> when already shutdown. This value is by
432 * default false.
433 * @return true if will continue after shutdown.
434 */
435 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
436 return continueExistingPeriodicTasksAfterShutdown;
437 }
438
439 /**
440 * Set policy on whether to execute existing delayed
441 * tasks even when this executor has been <tt>shutdown</tt>. In
442 * this case, these tasks will only terminate upon
443 * <tt>shutdownNow</tt>, or after setting the policy to
444 * <tt>false</tt> when already shutdown. This value is by default
445 * true.
446 * @param value if true, execute after shutdown, else don't.
447 */
448 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
449 executeExistingDelayedTasksAfterShutdown = value;
450 if (!value && isShutdown())
451 cancelUnwantedTasks();
452 }
453
454 /**
455 * Set policy on whether to execute existing delayed
456 * tasks even when this executor has been <tt>shutdown</tt>. In
457 * this case, these tasks will only terminate upon
458 * <tt>shutdownNow</tt>, or after setting the policy to
459 * <tt>false</tt> when already shutdown. This value is by default
460 * true.
461 * @return true if will execute after shutdown.
462 */
463 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
464 return executeExistingDelayedTasksAfterShutdown;
465 }
466
467 /**
468 * Cancel and clear the queue of all tasks that should not be run
469 * due to shutdown policy.
470 */
471 private void cancelUnwantedTasks() {
472 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
473 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
474 if (!keepDelayed && !keepPeriodic)
475 super.getQueue().clear();
476 else if (keepDelayed || keepPeriodic) {
477 Object[] entries = super.getQueue().toArray();
478 for (int i = 0; i < entries.length; ++i) {
479 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
480 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
481 t.cancel(false);
482 }
483 entries = null;
484 purge();
485 }
486 }
487
488 /**
489 * Initiates an orderly shutdown in which previously submitted
490 * tasks are executed, but no new tasks will be accepted. If the
491 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
492 * been set <tt>false</tt>, existing delayed tasks whose delays
493 * have not yet elapsed are cancelled. And unless the
494 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> hase
495 * been set <tt>true</tt>, future executions of existing periodic
496 * tasks will be cancelled.
497 */
498 public void shutdown() {
499 cancelUnwantedTasks();
500 super.shutdown();
501 }
502
503 /**
504 * Attempts to stop all actively executing tasks, halts the
505 * processing of waiting tasks, and returns a list of the tasks that were
506 * awaiting execution.
507 *
508 * <p>There are no guarantees beyond best-effort attempts to stop
509 * processing actively executing tasks. This implementations
510 * cancels via {@link Thread#interrupt}, so if any tasks mask or
511 * fail to respond to interrupts, they may never terminate.
512 *
513 * @return list of tasks that never commenced execution. Each
514 * element of this list is a <tt>ScheduledCancellable</tt>, including those
515 * tasks submitted using <tt>execute</tt> which are for scheduling
516 * purposes used as the basis of a zero-delay <tt>ScheduledCancellable</tt>.
517 */
518 public List shutdownNow() {
519 return super.shutdownNow();
520 }
521
522 /**
523 * Removes this task from internal queue if it is present, thus
524 * causing it not to be run if it has not already started. This
525 * method may be useful as one part of a cancellation scheme.
526 *
527 * @param task the task to remove
528 * @return true if the task was removed
529 */
530 public boolean remove(Runnable task) {
531 if (task instanceof ScheduledCancellable)
532 return super.remove(task);
533
534 // The task might actually have been wrapped as a ScheduledCancellable
535 // in execute(), in which case we need to maually traverse
536 // looking for it.
537
538 ScheduledCancellable wrap = null;
539 Object[] entries = super.getQueue().toArray();
540 for (int i = 0; i < entries.length; ++i) {
541 ScheduledCancellableTask t = (ScheduledCancellableTask)entries[i];
542 Runnable r = t.getRunnable();
543 if (task.equals(r)) {
544 wrap = t;
545 break;
546 }
547 }
548 entries = null;
549 return wrap != null && super.getQueue().remove(wrap);
550 }
551
552
553 /**
554 * Returns the task queue used by this executor. Each element of
555 * this queue is a <tt>ScheduledCancellable</tt>, including those
556 * tasks submitted using <tt>execute</tt> which are for scheduling
557 * purposes used as the basis of a zero-delay
558 * <tt>ScheduledCancellable</tt>. Iteration over this queue is
559 * </em>not</em> guaranteed to travserse tasks in the order in
560 * which they will execute.
561 *
562 * @return the task queue
563 */
564 public BlockingQueue<Runnable> getQueue() {
565 return super.getQueue();
566 }
567
568 /**
569 * Override of <tt>Executor</tt> hook method to support periodic
570 * tasks. If the executed task was periodic, causes the task for
571 * the next period to execute.
572 * @param r the task (assumed to be a ScheduledCancellable)
573 * @param t the exception
574 */
575 protected void afterExecute(Runnable r, Throwable t) {
576 super.afterExecute(r, t);
577 ScheduledCancellableTask next = ((ScheduledCancellableTask)r).nextTask();
578 if (next != null &&
579 (!isShutdown() ||
580 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
581 !isTerminating())))
582 super.getQueue().offer(next);
583
584 // This might have been the final executed delayed task. Wake
585 // up threads to check.
586 else if (isShutdown())
587 interruptIdleWorkers();
588 }
589 }