ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.17
Committed: Tue Aug 12 11:12:11 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.16: +43 -38 lines
Log Message:
Fix constructor calls

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.4 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.atomic.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.17 * An <tt>Executor</tt> that can schedule commands to run after a given
13 dl 1.7 * delay, or to execute periodically. This class is preferable to
14 tim 1.1 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15     * or when the additional flexibility or capabilities of
16 dl 1.7 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17     * required.
18     *
19     * <p> The <tt>schedule</tt> methods create tasks with various delays
20     * and return a task object that can be used to cancel or check
21     * execution. The <tt>scheduleAtFixedRate</tt> and
22     * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23     * that run periodically until cancelled. Commands submitted using
24     * the <tt>execute</tt> method are scheduled with a requested delay of
25     * zero.
26     *
27     * <p> Delayed tasks execute no sooner than they are enabled, but
28     * without any real-time guarantees about when, after they are enabled
29     * they will commence. Tasks tied for the same execution time are
30     * enabled in first-in-first-out (FIFO) order of submission. An
31     * internal {@link DelayQueue} used for scheduling relies on relative
32     * delays, which may drift from absolute times (as returned by
33     * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 tim 1.1 *
35 dl 1.13 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36     * of the inherited tuning methods are not especially useful for
37     * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
38     * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
39     * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
40     * useful effect.
41     *
42 tim 1.1 * @since 1.5
43     * @see Executors
44     *
45     * @spec JSR-166
46 dl 1.9 * @author Doug Lea
47 tim 1.1 */
48     public class ScheduledExecutor extends ThreadPoolExecutor {
49    
50 dl 1.4 /**
51 dl 1.5 * Sequence number to break scheduling ties, and in turn to
52     * guarantee FIFO order among tied entries.
53     */
54     private static final AtomicLong sequencer = new AtomicLong(0);
55    
56     /**
57 dl 1.4 * A delayed or periodic action.
58     */
59 dl 1.8 public static class DelayedTask extends CancellableTask implements Delayed {
60 dl 1.9 /** Sequence number to break ties FIFO */
61 dl 1.4 private final long sequenceNumber;
62 dl 1.9 /** The time the task is enabled to execute in nanoTime units */
63 dl 1.4 private final long time;
64 dl 1.9 /** The delay forllowing next time, or <= 0 if non-periodic */
65 dl 1.4 private final long period;
66 dl 1.9 /** true if at fixed rate; false if fixed delay */
67     private final boolean rateBased;
68    
69 dl 1.4 /**
70 dl 1.5 * Creates a one-shot action with given nanoTime-based trigger time
71 dl 1.4 */
72     DelayedTask(Runnable r, long ns) {
73     super(r);
74     this.time = ns;
75     this.period = 0;
76 dl 1.7 rateBased = false;
77 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
78     }
79    
80     /**
81 dl 1.5 * Creates a periodic action with given nano time and period
82 dl 1.4 */
83 dl 1.7 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
84 dl 1.4 super(r);
85     if (period <= 0)
86     throw new IllegalArgumentException();
87     this.time = ns;
88     this.period = period;
89 dl 1.7 this.rateBased = rateBased;
90 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
91     }
92    
93    
94     public long getDelay(TimeUnit unit) {
95 dl 1.12 long d = unit.convert(time - System.nanoTime(),
96 dl 1.7 TimeUnit.NANOSECONDS);
97 dl 1.12 return d;
98 dl 1.4 }
99    
100     public int compareTo(Object other) {
101 dl 1.15 if (other == this)
102     return 0;
103 dl 1.4 DelayedTask x = (DelayedTask)other;
104     long diff = time - x.time;
105     if (diff < 0)
106     return -1;
107     else if (diff > 0)
108     return 1;
109     else if (sequenceNumber < x.sequenceNumber)
110     return -1;
111     else
112     return 1;
113     }
114    
115     /**
116     * Return true if this is a periodic (not a one-shot) action.
117 dl 1.9 * @return true if periodic
118 dl 1.4 */
119     public boolean isPeriodic() {
120     return period > 0;
121     }
122    
123     /**
124 tim 1.11 * Returns the period, or zero if non-periodic.
125     *
126 dl 1.9 * @return the period
127 dl 1.4 */
128     public long getPeriod(TimeUnit unit) {
129     return unit.convert(period, TimeUnit.NANOSECONDS);
130     }
131    
132     /**
133     * Return a new DelayedTask that will trigger in the period
134     * subsequent to current task, or null if non-periodic
135     * or canceled.
136     */
137 dl 1.5 DelayedTask nextTask() {
138 dl 1.4 if (period <= 0 || isCancelled())
139     return null;
140 dl 1.10 long nextTime = period + (rateBased ? time : System.nanoTime());
141 dl 1.7 return new DelayedTask(getRunnable(), nextTime, period, rateBased);
142 dl 1.4 }
143 dl 1.2
144 dl 1.4 }
145 dl 1.2
146 dl 1.4 /**
147     * A delayed result-bearing action.
148     */
149 dl 1.8 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
150 dl 1.4 /**
151     * Creates a Future that may trigger after the given delay.
152     */
153 dl 1.17 DelayedFutureTask(Callable<V> callable, long triggerTime) {
154 dl 1.4 // must set after super ctor call to use inner class
155 dl 1.17 super(null, triggerTime);
156 dl 1.4 setRunnable(new InnerCancellableFuture<V>(callable));
157 dl 1.2 }
158    
159 dl 1.4 public V get() throws InterruptedException, ExecutionException {
160     return ((InnerCancellableFuture<V>)getRunnable()).get();
161 dl 1.2 }
162    
163 dl 1.4 public V get(long timeout, TimeUnit unit)
164     throws InterruptedException, ExecutionException, TimeoutException {
165     return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
166 dl 1.2 }
167 tim 1.1
168 dl 1.4 protected void set(V v) {
169     ((InnerCancellableFuture<V>)getRunnable()).set(v);
170 dl 1.2 }
171 tim 1.1
172 dl 1.4 protected void setException(Throwable t) {
173     ((InnerCancellableFuture<V>)getRunnable()).setException(t);
174     }
175     }
176    
177    
178     /**
179     * An annoying wrapper class to convince generics compiler to
180 dl 1.7 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
181 dl 1.4 */
182 dl 1.12 private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
183 dl 1.4 private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
184     public Runnable poll() { return dq.poll(); }
185     public Runnable peek() { return dq.peek(); }
186     public Runnable take() throws InterruptedException { return dq.take(); }
187     public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
188     return dq.poll(timeout, unit);
189 dl 1.2 }
190 dl 1.12
191     public boolean add(Runnable x) { return dq.add((DelayedTask)x); }
192 dl 1.4 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
193     public void put(Runnable x) throws InterruptedException {
194     dq.put((DelayedTask)x);
195 dl 1.2 }
196 dl 1.4 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
197     return dq.offer((DelayedTask)x, timeout, unit);
198 dl 1.2 }
199 dl 1.12
200     public Runnable remove() { return dq.remove(); }
201     public Runnable element() { return dq.element(); }
202     public void clear() { dq.clear(); }
203    
204 dl 1.4 public int remainingCapacity() { return dq.remainingCapacity(); }
205     public boolean remove(Object x) { return dq.remove(x); }
206     public boolean contains(Object x) { return dq.contains(x); }
207     public int size() { return dq.size(); }
208 tim 1.11 public boolean isEmpty() { return dq.isEmpty(); }
209 dl 1.4 public Iterator<Runnable> iterator() {
210     return new Iterator<Runnable>() {
211     private Iterator<DelayedTask> it = dq.iterator();
212     public boolean hasNext() { return it.hasNext(); }
213     public Runnable next() { return it.next(); }
214     public void remove() { it.remove(); }
215     };
216 tim 1.1 }
217 dl 1.4 }
218 tim 1.1
219 dl 1.4 /**
220     * Creates a new ScheduledExecutor with the given initial parameters.
221     *
222     * @param corePoolSize the number of threads to keep in the pool,
223     * even if they are idle.
224     */
225     public ScheduledExecutor(int corePoolSize) {
226     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
227     new DelayedWorkQueue());
228     }
229 tim 1.1
230 dl 1.4 /**
231     * Creates a new ScheduledExecutor with the given initial parameters.
232     *
233     * @param corePoolSize the number of threads to keep in the pool,
234     * even if they are idle.
235     * @param threadFactory the factory to use when the executor
236     * creates a new thread.
237     */
238     public ScheduledExecutor(int corePoolSize,
239     ThreadFactory threadFactory) {
240     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
241     new DelayedWorkQueue(), threadFactory);
242 tim 1.1 }
243    
244 dl 1.4 /**
245     * Creates a new ScheduledExecutor with the given initial parameters.
246     *
247     * @param corePoolSize the number of threads to keep in the pool,
248     * even if they are idle.
249     * @param handler the handler to use when execution is blocked
250     * because the thread bounds and queue capacities are reached.
251     */
252     public ScheduledExecutor(int corePoolSize,
253     RejectedExecutionHandler handler) {
254     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
255     new DelayedWorkQueue(), handler);
256     }
257 dl 1.2
258 tim 1.1 /**
259 dl 1.2 * Creates a new ScheduledExecutor with the given initial parameters.
260     *
261     * @param corePoolSize the number of threads to keep in the pool,
262     * even if they are idle.
263 dl 1.4 * @param threadFactory the factory to use when the executor
264     * creates a new thread.
265     * @param handler the handler to use when execution is blocked
266     * because the thread bounds and queue capacities are reached.
267 tim 1.1 */
268 dl 1.4 public ScheduledExecutor(int corePoolSize,
269     ThreadFactory threadFactory,
270     RejectedExecutionHandler handler) {
271 dl 1.2 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
272 dl 1.4 new DelayedWorkQueue(), threadFactory, handler);
273 dl 1.13 }
274    
275     /**
276     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
277     */
278     void delayedExecute(Runnable command) {
279     if (isShutdown()) {
280     reject(command);
281     return;
282     }
283     // Prestart thread if necessary. We cannot prestart it running
284     // the task because the task (probably) shouldn't be run yet,
285     // so thread will just idle until delay elapses.
286     if (getPoolSize() < getCorePoolSize())
287     addIfUnderCorePoolSize(null);
288    
289     getQueue().offer(command);
290 dl 1.4 }
291    
292     /**
293 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
294 dl 1.4 * the given delay.
295 dl 1.6 * @param command the task to execute.
296     * @param delay the time from now to delay execution.
297     * @param unit the time unit of the delay parameter.
298     * @return a handle that can be used to cancel the task.
299 dl 1.17 * @throws RejectedExecutionException if task cannot be scheduled
300     * for execution because the executor has been shut down.
301 dl 1.4 */
302    
303 dl 1.6 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
304 dl 1.17 long triggerTime = System.nanoTime() + unit.toNanos(delay);
305     DelayedTask t = new DelayedTask(command, triggerTime);
306 dl 1.13 delayedExecute(t);
307 dl 1.4 return t;
308     }
309    
310     /**
311 dl 1.9 * Creates and executes a one-shot action that becomes enabled
312     * after the given date.
313 dl 1.6 * @param command the task to execute.
314     * @param date the time to commence excution.
315     * @return a handle that can be used to cancel the task.
316     * @throws RejectedExecutionException if task cannot be scheduled
317     * for execution because the executor has been shut down.
318 dl 1.4 */
319 dl 1.6 public DelayedTask schedule(Runnable command, Date date) {
320 dl 1.17 long triggerTime = System.nanoTime() +
321     TimeUnit.MILLISECONDS.toNanos(date.getTime() -
322     System.currentTimeMillis());
323     DelayedTask t = new DelayedTask(command, triggerTime);
324 dl 1.13 delayedExecute(t);
325 dl 1.4 return t;
326     }
327    
328     /**
329 dl 1.7 * Creates and executes a periodic action that becomes enabled first
330 dl 1.4 * after the given initial delay, and subsequently with the given
331 dl 1.7 * period; that is executions will commence after
332     * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
333     * <tt>initialDelay + 2 * period</tt>, and so on.
334 dl 1.6 * @param command the task to execute.
335     * @param initialDelay the time to delay first execution.
336     * @param period the period between successive executions.
337     * @param unit the time unit of the delay and period parameters
338     * @return a handle that can be used to cancel the task.
339     * @throws RejectedExecutionException if task cannot be scheduled
340     * for execution because the executor has been shut down.
341 dl 1.4 */
342 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
343 dl 1.17 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
344     DelayedTask t = new DelayedTask(command,
345     triggerTime,
346     unit.toNanos(period),
347     true);
348 dl 1.13 delayedExecute(t);
349 dl 1.4 return t;
350     }
351    
352     /**
353 dl 1.7 * Creates a periodic action that becomes enabled first after the
354     * given date, and subsequently with the given period
355     * period; that is executions will commence after
356     * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
357     * <tt>initialDate + 2 * period</tt>, and so on.
358 dl 1.6 * @param command the task to execute.
359     * @param initialDate the time to delay first execution.
360 dl 1.7 * @param period the period between commencement of successive
361     * executions.
362 dl 1.6 * @param unit the time unit of the period parameter.
363     * @return a handle that can be used to cancel the task.
364     * @throws RejectedExecutionException if task cannot be scheduled
365     * for execution because the executor has been shut down.
366 dl 1.4 */
367 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
368 dl 1.17 long triggerTime = System.nanoTime() +
369     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
370     System.currentTimeMillis());
371     DelayedTask t = new DelayedTask(command,
372     triggerTime,
373     unit.toNanos(period),
374     true);
375 dl 1.13 delayedExecute(t);
376 dl 1.7 return t;
377     }
378    
379     /**
380     * Creates and executes a periodic action that becomes enabled first
381     * after the given initial delay, and and subsequently with the
382     * given delay between the termination of one execution and the
383     * commencement of the next.
384     * @param command the task to execute.
385     * @param initialDelay the time to delay first execution.
386     * @param delay the delay between the termination of one
387     * execution and the commencement of the next.
388     * @param unit the time unit of the delay and delay parameters
389     * @return a handle that can be used to cancel the task.
390     * @throws RejectedExecutionException if task cannot be scheduled
391     * for execution because the executor has been shut down.
392     */
393     public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
394 dl 1.17 long triggerTime = System.nanoTime() + unit.toNanos(initialDelay);
395     DelayedTask t = new DelayedTask(command,
396     triggerTime,
397     unit.toNanos(delay),
398     false);
399 dl 1.13 delayedExecute(t);
400 dl 1.7 return t;
401     }
402    
403     /**
404     * Creates a periodic action that becomes enabled first after the
405     * given date, and subsequently with the given delay between
406     * the termination of one execution and the commencement of the
407     * next.
408     * @param command the task to execute.
409     * @param initialDate the time to delay first execution.
410     * @param delay the delay between the termination of one
411     * execution and the commencement of the next.
412     * @param unit the time unit of the delay parameter.
413     * @return a handle that can be used to cancel the task.
414     * @throws RejectedExecutionException if task cannot be scheduled
415     * for execution because the executor has been shut down.
416     */
417     public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
418 dl 1.17 long triggerTime = System.nanoTime() +
419     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
420     System.currentTimeMillis());
421     DelayedTask t = new DelayedTask(command,
422     triggerTime,
423     unit.toNanos(delay),
424     false);
425 dl 1.13 delayedExecute(t);
426 dl 1.4 return t;
427 tim 1.1 }
428    
429    
430 dl 1.4 /**
431 dl 1.7 * Creates and executes a Future that becomes enabled after the
432     * given delay.
433 dl 1.6 * @param callable the function to execute.
434     * @param delay the time from now to delay execution.
435     * @param unit the time unit of the delay parameter.
436     * @return a Future that can be used to extract result or cancel.
437     * @throws RejectedExecutionException if task cannot be scheduled
438     * for execution because the executor has been shut down.
439 dl 1.4 */
440     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
441 dl 1.17 long triggerTime = System.nanoTime() + unit.toNanos(delay);
442     DelayedFutureTask<V> t = new DelayedFutureTask<V>(callable,
443     triggerTime);
444 dl 1.13 delayedExecute(t);
445 dl 1.4 return t;
446     }
447    
448     /**
449 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
450 dl 1.4 * the given date.
451 dl 1.6 * @param callable the function to execute.
452     * @param date the time to commence excution.
453     * @return a Future that can be used to extract result or cancel.
454     * @throws RejectedExecutionException if task cannot be scheduled
455     * for execution because the executor has been shut down.
456 dl 1.4 */
457     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
458 dl 1.17 long triggerTime = System.nanoTime() +
459     TimeUnit.MILLISECONDS.toNanos(date.getTime() -
460     System.currentTimeMillis());
461     DelayedFutureTask<V> t = new DelayedFutureTask<V>(callable,
462     triggerTime);
463 dl 1.13 delayedExecute(t);
464 dl 1.4 return t;
465 tim 1.1 }
466    
467 dl 1.4 /**
468 dl 1.16 * Execute command with zero required delay. This has effect
469     * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
470     * that inspections of the queue and of the list returned by
471     * <tt>shutdownNow</tt> will access the zero-delayed
472     * <tt>DelayedTask</tt>, not the <tt>command</tt> itself.
473 tim 1.11 *
474 dl 1.6 * @param command the task to execute
475     * @throws RejectedExecutionException at discretion of
476     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
477     * for execution because the executor has been shut down.
478 dl 1.4 */
479     public void execute(Runnable command) {
480     schedule(command, 0, TimeUnit.NANOSECONDS);
481 tim 1.1 }
482    
483 dl 1.4 /**
484 dl 1.15 * Removes this task from internal queue if it is present, thus
485     * causing it not to be run if it has not already started. This
486     * method may be useful as one part of a cancellation scheme.
487     *
488     * @param task the task to remove
489     * @return true if the task was removed
490     */
491     public boolean remove(Runnable task) {
492     if (task instanceof DelayedTask && getQueue().remove(task))
493     return true;
494    
495     // The task might actually have been wrapped as a DelayedTask
496     // in execute(), in which case we need to maually traverse
497     // looking for it.
498    
499     DelayedTask wrap = null;
500     Object[] entries = getQueue().toArray();
501     for (int i = 0; i < entries.length; ++i) {
502     DelayedTask t = (DelayedTask)entries[i];
503     Runnable r = t.getRunnable();
504     if (task.equals(r)) {
505     wrap = t;
506     break;
507     }
508     }
509     entries = null;
510     return wrap != null && getQueue().remove(wrap);
511     }
512    
513     /**
514 dl 1.4 * If executed task was periodic, cause the task for the next
515     * period to execute.
516 dl 1.9 * @param r the task (assumed to be a DelayedTask)
517     * @param t the exception
518 dl 1.4 */
519     protected void afterExecute(Runnable r, Throwable t) {
520     if (isShutdown())
521     return;
522     super.afterExecute(r, t);
523     DelayedTask d = (DelayedTask)r;
524     DelayedTask next = d.nextTask();
525 dl 1.6 if (next == null)
526     return;
527     try {
528 dl 1.15 delayedExecute(next);
529 tim 1.14 } catch(RejectedExecutionException ex) {
530 dl 1.6 // lost race to detect shutdown; ignore
531     }
532 dl 1.4 }
533 tim 1.1 }
534 dl 1.4
535