ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.17
Committed: Tue Aug 12 11:12:11 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.16: +43 -38 lines
Log Message:
Fix constructor calls

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An <tt>Executor</tt> that can schedule commands to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission. An
31 * internal {@link DelayQueue} used for scheduling relies on relative
32 * delays, which may drift from absolute times (as returned by
33 * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 *
35 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36 * of the inherited tuning methods are not especially useful for
37 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
38 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
39 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
40 * useful effect.
41 *
42 * @since 1.5
43 * @see Executors
44 *
45 * @spec JSR-166
46 * @author Doug Lea
47 */
48 public class ScheduledExecutor extends ThreadPoolExecutor {
49
50 /**
51 * Sequence number to break scheduling ties, and in turn to
52 * guarantee FIFO order among tied entries.
53 */
54 private static final AtomicLong sequencer = new AtomicLong(0);
55
56 /**
57 * A delayed or periodic action.
58 */
59 public static class DelayedTask extends CancellableTask implements Delayed {
60 /** Sequence number to break ties FIFO */
61 private final long sequenceNumber;
62 /** The time the task is enabled to execute in nanoTime units */
63 private final long time;
64 /** The delay forllowing next time, or <= 0 if non-periodic */
65 private final long period;
66 /** true if at fixed rate; false if fixed delay */
67 private final boolean rateBased;
68
69 /**
70 * Creates a one-shot action with given nanoTime-based trigger time
71 */
72 DelayedTask(Runnable r, long ns) {
73 super(r);
74 this.time = ns;
75 this.period = 0;
76 rateBased = false;
77 this.sequenceNumber = sequencer.getAndIncrement();
78 }
79
80 /**
81 * Creates a periodic action with given nano time and period
82 */
83 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
84 super(r);
85 if (period <= 0)
86 throw new IllegalArgumentException();
87 this.time = ns;
88 this.period = period;
89 this.rateBased = rateBased;
90 this.sequenceNumber = sequencer.getAndIncrement();
91 }
92
93
94 public long getDelay(TimeUnit unit) {
95 long d = unit.convert(time - System.nanoTime(),
96 TimeUnit.NANOSECONDS);
97 return d;
98 }
99
100 public int compareTo(Object other) {
101 if (other == this)
102 return 0;
103 DelayedTask x = (DelayedTask)other;
104 long diff = time - x.time;
105 if (diff < 0)
106 return -1;
107 else if (diff > 0)
108 return 1;
109 else if (sequenceNumber < x.sequenceNumber)
110 return -1;
111 else
112 return 1;
113 }
114
115 /**
116 * Return true if this is a periodic (not a one-shot) action.
117 * @return true if periodic
118 */
119 public boolean isPeriodic() {
120 return period > 0;
121 }
122
123 /**
124 * Returns the period, or zero if non-periodic.
125 *
126 * @return the period
127 */
128 public long getPeriod(TimeUnit unit) {
129 return unit.convert(period, TimeUnit.NANOSECONDS);
130 }
131
132 /**
133 * Return a new DelayedTask that will trigger in the period
134 * subsequent to current task, or null if non-periodic
135 * or canceled.
136 */
137 DelayedTask nextTask() {
138 if (period <= 0 || isCancelled())
139 return null;
140 long nextTime = period + (rateBased ? time : System.nanoTime());
141 return new DelayedTask(getRunnable(), nextTime, period, rateBased);
142 }
143
144 }
145
146 /**
147 * A delayed result-bearing action.
148 */
149 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
150 /**
151 * Creates a Future that may trigger after the given delay.
152 */
153 DelayedFutureTask(Callable<V> callable, long triggerTime) {
154 // must set after super ctor call to use inner class
155 super(null, triggerTime);
156 setRunnable(new InnerCancellableFuture<V>(callable));
157 }
158
159 public V get() throws InterruptedException, ExecutionException {
160 return ((InnerCancellableFuture<V>)getRunnable()).get();
161 }
162
163 public V get(long timeout, TimeUnit unit)
164 throws InterruptedException, ExecutionException, TimeoutException {
165 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
166 }
167
168 protected void set(V v) {
169 ((InnerCancellableFuture<V>)getRunnable()).set(v);
170 }
171
172 protected void setException(Throwable t) {
173 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
174 }
175 }
176
177
178 /**
179 * An annoying wrapper class to convince generics compiler to
180 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
181 */
182 private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
183 private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
184 public Runnable poll() { return dq.poll(); }
185 public Runnable peek() { return dq.peek(); }
186 public Runnable take() throws InterruptedException { return dq.take(); }
187 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
188 return dq.poll(timeout, unit);
189 }
190
191 public boolean add(Runnable x) { return dq.add((DelayedTask)x); }
192 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
193 public void put(Runnable x) throws InterruptedException {
194 dq.put((DelayedTask)x);
195 }
196 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
197 return dq.offer((DelayedTask)x, timeout, unit);
198 }
199
200 public Runnable remove() { return dq.remove(); }
201 public Runnable element() { return dq.element(); }
202 public void clear() { dq.clear(); }
203
204 public int remainingCapacity() { return dq.remainingCapacity(); }
205 public boolean remove(Object x) { return dq.remove(x); }
206 public boolean contains(Object x) { return dq.contains(x); }
207 public int size() { return dq.size(); }
208 public boolean isEmpty() { return dq.isEmpty(); }
209 public Iterator<Runnable> iterator() {
210 return new Iterator<Runnable>() {
211 private Iterator<DelayedTask> it = dq.iterator();
212 public boolean hasNext() { return it.hasNext(); }
213 public Runnable next() { return it.next(); }
214 public void remove() { it.remove(); }
215 };
216 }
217 }
218
219 /**
220 * Creates a new ScheduledExecutor with the given initial parameters.
221 *
222 * @param corePoolSize the number of threads to keep in the pool,
223 * even if they are idle.
224 */
225 public ScheduledExecutor(int corePoolSize) {
226 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
227 new DelayedWorkQueue());
228 }
229
230 /**
231 * Creates a new ScheduledExecutor with the given initial parameters.
232 *
233 * @param corePoolSize the number of threads to keep in the pool,
234 * even if they are idle.
235 * @param threadFactory the factory to use when the executor
236 * creates a new thread.
237 */
238 public ScheduledExecutor(int corePoolSize,
239 ThreadFactory threadFactory) {
240 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
241 new DelayedWorkQueue(), threadFactory);
242 }
243
244 /**
245 * Creates a new ScheduledExecutor with the given initial parameters.
246 *
247 * @param corePoolSize the number of threads to keep in the pool,
248 * even if they are idle.
249 * @param handler the handler to use when execution is blocked
250 * because the thread bounds and queue capacities are reached.
251 */
252 public ScheduledExecutor(int corePoolSize,
253 RejectedExecutionHandler handler) {
254 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
255 new DelayedWorkQueue(), handler);
256 }
257
258 /**
259 * Creates a new ScheduledExecutor with the given initial parameters.
260 *
261 * @param corePoolSize the number of threads to keep in the pool,
262 * even if they are idle.
263 * @param threadFactory the factory to use when the executor
264 * creates a new thread.
265 * @param handler the handler to use when execution is blocked
266 * because the thread bounds and queue capacities are reached.
267 */
268 public ScheduledExecutor(int corePoolSize,
269 ThreadFactory threadFactory,
270 RejectedExecutionHandler handler) {
271 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
272 new DelayedWorkQueue(), threadFactory, handler);
273 }
274
275 /**
276 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
277 */
278 void delayedExecute(Runnable command) {
279 if (isShutdown()) {
280 reject(command);
281 return;
282 }
283 // Prestart thread if necessary. We cannot prestart it running
284 // the task because the task (probably) shouldn't be run yet,
285 // so thread will just idle until delay elapses.
286 if (getPoolSize() < getCorePoolSize())
287 addIfUnderCorePoolSize(null);
288
289 getQueue().offer(command);
290 }
291
292 /**
293 * Creates and executes a one-shot action that becomes enabled after
294 * the given delay.
295 * @param command the task to execute.
296 * @param delay the time from now to delay execution.
297 * @param unit the time unit of the delay parameter.
298 * @return a handle that can be used to cancel the task.
299 * @throws RejectedExecutionException if task cannot be scheduled
300 * for execution because the executor has been shut down.
301 */
302
303 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
304 long triggerTime = System.nanoTime() + unit.toNanos(delay);
305 DelayedTask t = new DelayedTask(command, triggerTime);
306 delayedExecute(t);
307 return t;
308 }
309
310 /**
311 * Creates and executes a one-shot action that becomes enabled
312 * after the given date.
313 * @param command the task to execute.
314 * @param date the time to commence excution.
315 * @return a handle that can be used to cancel the task.
316 * @throws RejectedExecutionException if task cannot be scheduled
317 * for execution because the executor has been shut down.
318 */
319 public DelayedTask schedule(Runnable command, Date date) {
320 long triggerTime = System.nanoTime() +
321 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
322 System.currentTimeMillis());
323 DelayedTask t = new DelayedTask(command, triggerTime);
324 delayedExecute(t);
325 return t;
326 }
327
328 /**
329 * Creates and executes a periodic action that becomes enabled first
330 * after the given initial delay, and subsequently with the given
331 * period; that is executions will commence after
332 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
333 * <tt>initialDelay + 2 * period</tt>, and so on.
334 * @param command the task to execute.
335 * @param initialDelay the time to delay first execution.
336 * @param period the period between successive executions.
337 * @param unit the time unit of the delay and period parameters
338 * @return a handle that can be used to cancel the task.
339 * @throws RejectedExecutionException if task cannot be scheduled
340 * for execution because the executor has been shut down.
341 */
342 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
343 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
344 DelayedTask t = new DelayedTask(command,
345 triggerTime,
346 unit.toNanos(period),
347 true);
348 delayedExecute(t);
349 return t;
350 }
351
352 /**
353 * Creates a periodic action that becomes enabled first after the
354 * given date, and subsequently with the given period
355 * period; that is executions will commence after
356 * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
357 * <tt>initialDate + 2 * period</tt>, and so on.
358 * @param command the task to execute.
359 * @param initialDate the time to delay first execution.
360 * @param period the period between commencement of successive
361 * executions.
362 * @param unit the time unit of the period parameter.
363 * @return a handle that can be used to cancel the task.
364 * @throws RejectedExecutionException if task cannot be scheduled
365 * for execution because the executor has been shut down.
366 */
367 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
368 long triggerTime = System.nanoTime() +
369 TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
370 System.currentTimeMillis());
371 DelayedTask t = new DelayedTask(command,
372 triggerTime,
373 unit.toNanos(period),
374 true);
375 delayedExecute(t);
376 return t;
377 }
378
379 /**
380 * Creates and executes a periodic action that becomes enabled first
381 * after the given initial delay, and and subsequently with the
382 * given delay between the termination of one execution and the
383 * commencement of the next.
384 * @param command the task to execute.
385 * @param initialDelay the time to delay first execution.
386 * @param delay the delay between the termination of one
387 * execution and the commencement of the next.
388 * @param unit the time unit of the delay and delay parameters
389 * @return a handle that can be used to cancel the task.
390 * @throws RejectedExecutionException if task cannot be scheduled
391 * for execution because the executor has been shut down.
392 */
393 public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
394 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
395 DelayedTask t = new DelayedTask(command,
396 triggerTime,
397 unit.toNanos(delay),
398 false);
399 delayedExecute(t);
400 return t;
401 }
402
403 /**
404 * Creates a periodic action that becomes enabled first after the
405 * given date, and subsequently with the given delay between
406 * the termination of one execution and the commencement of the
407 * next.
408 * @param command the task to execute.
409 * @param initialDate the time to delay first execution.
410 * @param delay the delay between the termination of one
411 * execution and the commencement of the next.
412 * @param unit the time unit of the delay parameter.
413 * @return a handle that can be used to cancel the task.
414 * @throws RejectedExecutionException if task cannot be scheduled
415 * for execution because the executor has been shut down.
416 */
417 public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
418 long triggerTime = System.nanoTime() +
419 TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
420 System.currentTimeMillis());
421 DelayedTask t = new DelayedTask(command,
422 triggerTime,
423 unit.toNanos(delay),
424 false);
425 delayedExecute(t);
426 return t;
427 }
428
429
430 /**
431 * Creates and executes a Future that becomes enabled after the
432 * given delay.
433 * @param callable the function to execute.
434 * @param delay the time from now to delay execution.
435 * @param unit the time unit of the delay parameter.
436 * @return a Future that can be used to extract result or cancel.
437 * @throws RejectedExecutionException if task cannot be scheduled
438 * for execution because the executor has been shut down.
439 */
440 public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
441 long triggerTime = System.nanoTime() + unit.toNanos(delay);
442 DelayedFutureTask<V> t = new DelayedFutureTask<V>(callable,
443 triggerTime);
444 delayedExecute(t);
445 return t;
446 }
447
448 /**
449 * Creates and executes a one-shot action that becomes enabled after
450 * the given date.
451 * @param callable the function to execute.
452 * @param date the time to commence excution.
453 * @return a Future that can be used to extract result or cancel.
454 * @throws RejectedExecutionException if task cannot be scheduled
455 * for execution because the executor has been shut down.
456 */
457 public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
458 long triggerTime = System.nanoTime() +
459 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
460 System.currentTimeMillis());
461 DelayedFutureTask<V> t = new DelayedFutureTask<V>(callable,
462 triggerTime);
463 delayedExecute(t);
464 return t;
465 }
466
467 /**
468 * Execute command with zero required delay. This has effect
469 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
470 * that inspections of the queue and of the list returned by
471 * <tt>shutdownNow</tt> will access the zero-delayed
472 * <tt>DelayedTask</tt>, not the <tt>command</tt> itself.
473 *
474 * @param command the task to execute
475 * @throws RejectedExecutionException at discretion of
476 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
477 * for execution because the executor has been shut down.
478 */
479 public void execute(Runnable command) {
480 schedule(command, 0, TimeUnit.NANOSECONDS);
481 }
482
483 /**
484 * Removes this task from internal queue if it is present, thus
485 * causing it not to be run if it has not already started. This
486 * method may be useful as one part of a cancellation scheme.
487 *
488 * @param task the task to remove
489 * @return true if the task was removed
490 */
491 public boolean remove(Runnable task) {
492 if (task instanceof DelayedTask && getQueue().remove(task))
493 return true;
494
495 // The task might actually have been wrapped as a DelayedTask
496 // in execute(), in which case we need to maually traverse
497 // looking for it.
498
499 DelayedTask wrap = null;
500 Object[] entries = getQueue().toArray();
501 for (int i = 0; i < entries.length; ++i) {
502 DelayedTask t = (DelayedTask)entries[i];
503 Runnable r = t.getRunnable();
504 if (task.equals(r)) {
505 wrap = t;
506 break;
507 }
508 }
509 entries = null;
510 return wrap != null && getQueue().remove(wrap);
511 }
512
513 /**
514 * If executed task was periodic, cause the task for the next
515 * period to execute.
516 * @param r the task (assumed to be a DelayedTask)
517 * @param t the exception
518 */
519 protected void afterExecute(Runnable r, Throwable t) {
520 if (isShutdown())
521 return;
522 super.afterExecute(r, t);
523 DelayedTask d = (DelayedTask)r;
524 DelayedTask next = d.nextTask();
525 if (next == null)
526 return;
527 try {
528 delayedExecute(next);
529 } catch(RejectedExecutionException ex) {
530 // lost race to detect shutdown; ignore
531 }
532 }
533 }
534
535