ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.8
Committed: Wed Jun 11 13:17:21 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.7: +2 -8 lines
Log Message:
Removed automatic queue removal on cancel; Added TPE purge; Fixed RL typo

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.4 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.atomic.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.7 * An <tt>Executor</tt> that can schedule command to run after a given
13     * delay, or to execute periodically. This class is preferable to
14 tim 1.1 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15     * or when the additional flexibility or capabilities of
16 dl 1.7 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17     * required.
18     *
19     * <p> The <tt>schedule</tt> methods create tasks with various delays
20     * and return a task object that can be used to cancel or check
21     * execution. The <tt>scheduleAtFixedRate</tt> and
22     * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23     * that run periodically until cancelled. Commands submitted using
24     * the <tt>execute</tt> method are scheduled with a requested delay of
25     * zero.
26     *
27     * <p> Delayed tasks execute no sooner than they are enabled, but
28     * without any real-time guarantees about when, after they are enabled
29     * they will commence. Tasks tied for the same execution time are
30     * enabled in first-in-first-out (FIFO) order of submission. An
31     * internal {@link DelayQueue} used for scheduling relies on relative
32     * delays, which may drift from absolute times (as returned by
33     * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 tim 1.1 *
35     * @since 1.5
36     * @see Executors
37     *
38     * @spec JSR-166
39     */
40     public class ScheduledExecutor extends ThreadPoolExecutor {
41    
42 dl 1.4 /**
43 dl 1.5 * Sequence number to break scheduling ties, and in turn to
44     * guarantee FIFO order among tied entries.
45     */
46     private static final AtomicLong sequencer = new AtomicLong(0);
47    
48     /**
49 dl 1.4 * A delayed or periodic action.
50     */
51 dl 1.8 public static class DelayedTask extends CancellableTask implements Delayed {
52 dl 1.4 private final long sequenceNumber;
53     private final long time;
54     private final long period;
55 dl 1.7 private final boolean rateBased; // true if at fixed rate;
56     // false if fixed delay
57 dl 1.4 /**
58 dl 1.5 * Creates a one-shot action with given nanoTime-based trigger time
59 dl 1.4 */
60     DelayedTask(Runnable r, long ns) {
61     super(r);
62     this.time = ns;
63     this.period = 0;
64 dl 1.7 rateBased = false;
65 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
66     }
67    
68     /**
69 dl 1.5 * Creates a periodic action with given nano time and period
70 dl 1.4 */
71 dl 1.7 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
72 dl 1.4 super(r);
73     if (period <= 0)
74     throw new IllegalArgumentException();
75     this.time = ns;
76     this.period = period;
77 dl 1.7 this.rateBased = rateBased;
78 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
79     }
80    
81    
82     public long getDelay(TimeUnit unit) {
83 dl 1.7 return unit.convert(time - TimeUnit.nanoTime(),
84     TimeUnit.NANOSECONDS);
85 dl 1.4 }
86    
87     public int compareTo(Object other) {
88     DelayedTask x = (DelayedTask)other;
89     long diff = time - x.time;
90     if (diff < 0)
91     return -1;
92     else if (diff > 0)
93     return 1;
94     else if (sequenceNumber < x.sequenceNumber)
95     return -1;
96     else
97     return 1;
98     }
99    
100     /**
101     * Return true if this is a periodic (not a one-shot) action.
102     */
103     public boolean isPeriodic() {
104     return period > 0;
105     }
106    
107     /**
108     * Returns the period, or zero if non-periodic
109     */
110     public long getPeriod(TimeUnit unit) {
111     return unit.convert(period, TimeUnit.NANOSECONDS);
112     }
113    
114     /**
115     * Return a new DelayedTask that will trigger in the period
116     * subsequent to current task, or null if non-periodic
117     * or canceled.
118     */
119 dl 1.5 DelayedTask nextTask() {
120 dl 1.4 if (period <= 0 || isCancelled())
121     return null;
122 dl 1.7 long nextTime = period + (rateBased ? time : TimeUnit.nanoTime());
123     return new DelayedTask(getRunnable(), nextTime, period, rateBased);
124 dl 1.4 }
125 dl 1.2
126 dl 1.4 }
127 dl 1.2
128 dl 1.4 /**
129     * A delayed result-bearing action.
130     */
131 dl 1.8 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
132 dl 1.4 /**
133     * Creates a Future that may trigger after the given delay.
134     */
135     DelayedFutureTask(Callable<V> callable, long delay, TimeUnit unit) {
136     // must set after super ctor call to use inner class
137     super(null, TimeUnit.nanoTime() + unit.toNanos(delay));
138     setRunnable(new InnerCancellableFuture<V>(callable));
139 dl 1.2 }
140    
141 dl 1.4 /**
142     * Creates a one-shot action that may trigger after the given date.
143     */
144     DelayedFutureTask(Callable<V> callable, Date date) {
145     super(null,
146     TimeUnit.MILLISECONDS.toNanos(date.getTime() -
147     System.currentTimeMillis()));
148     setRunnable(new InnerCancellableFuture<V>(callable));
149     }
150    
151     public V get() throws InterruptedException, ExecutionException {
152     return ((InnerCancellableFuture<V>)getRunnable()).get();
153 dl 1.2 }
154    
155 dl 1.4 public V get(long timeout, TimeUnit unit)
156     throws InterruptedException, ExecutionException, TimeoutException {
157     return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
158 dl 1.2 }
159 tim 1.1
160 dl 1.4 protected void set(V v) {
161     ((InnerCancellableFuture<V>)getRunnable()).set(v);
162 dl 1.2 }
163 tim 1.1
164 dl 1.4 protected void setException(Throwable t) {
165     ((InnerCancellableFuture<V>)getRunnable()).setException(t);
166     }
167     }
168    
169    
170     /**
171     * An annoying wrapper class to convince generics compiler to
172 dl 1.7 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
173 dl 1.4 */
174     private static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
175     private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
176     public Runnable poll() { return dq.poll(); }
177     public Runnable peek() { return dq.peek(); }
178     public Runnable take() throws InterruptedException { return dq.take(); }
179     public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
180     return dq.poll(timeout, unit);
181 dl 1.2 }
182 dl 1.4 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
183     public void put(Runnable x) throws InterruptedException {
184     dq.put((DelayedTask)x);
185 dl 1.2 }
186 dl 1.4 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
187     return dq.offer((DelayedTask)x, timeout, unit);
188 dl 1.2 }
189 dl 1.4 public int remainingCapacity() { return dq.remainingCapacity(); }
190     public boolean remove(Object x) { return dq.remove(x); }
191     public boolean contains(Object x) { return dq.contains(x); }
192     public int size() { return dq.size(); }
193     public boolean isEmpty() { return dq.isEmpty(); }
194     public Iterator<Runnable> iterator() {
195     return new Iterator<Runnable>() {
196     private Iterator<DelayedTask> it = dq.iterator();
197     public boolean hasNext() { return it.hasNext(); }
198     public Runnable next() { return it.next(); }
199     public void remove() { it.remove(); }
200     };
201 tim 1.1 }
202 dl 1.4 }
203 tim 1.1
204 dl 1.4 /**
205     * Creates a new ScheduledExecutor with the given initial parameters.
206     *
207     * @param corePoolSize the number of threads to keep in the pool,
208     * even if they are idle.
209     */
210     public ScheduledExecutor(int corePoolSize) {
211     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
212     new DelayedWorkQueue());
213     }
214 tim 1.1
215 dl 1.4 /**
216     * Creates a new ScheduledExecutor with the given initial parameters.
217     *
218     * @param corePoolSize the number of threads to keep in the pool,
219     * even if they are idle.
220     * @param threadFactory the factory to use when the executor
221     * creates a new thread.
222     */
223     public ScheduledExecutor(int corePoolSize,
224     ThreadFactory threadFactory) {
225     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
226     new DelayedWorkQueue(), threadFactory);
227 tim 1.1 }
228    
229 dl 1.4 /**
230     * Creates a new ScheduledExecutor with the given initial parameters.
231     *
232     * @param corePoolSize the number of threads to keep in the pool,
233     * even if they are idle.
234     * @param handler the handler to use when execution is blocked
235     * because the thread bounds and queue capacities are reached.
236     */
237     public ScheduledExecutor(int corePoolSize,
238     RejectedExecutionHandler handler) {
239     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
240     new DelayedWorkQueue(), handler);
241     }
242 dl 1.2
243 tim 1.1 /**
244 dl 1.2 * Creates a new ScheduledExecutor with the given initial parameters.
245     *
246     * @param corePoolSize the number of threads to keep in the pool,
247     * even if they are idle.
248 dl 1.4 * @param threadFactory the factory to use when the executor
249     * creates a new thread.
250     * @param handler the handler to use when execution is blocked
251     * because the thread bounds and queue capacities are reached.
252 tim 1.1 */
253 dl 1.4 public ScheduledExecutor(int corePoolSize,
254     ThreadFactory threadFactory,
255     RejectedExecutionHandler handler) {
256 dl 1.2 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
257 dl 1.4 new DelayedWorkQueue(), threadFactory, handler);
258     }
259    
260     /**
261 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
262 dl 1.4 * the given delay.
263 dl 1.6 * @param command the task to execute.
264     * @param delay the time from now to delay execution.
265     * @param unit the time unit of the delay parameter.
266     * @return a handle that can be used to cancel the task.
267 dl 1.4 */
268    
269 dl 1.6 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
270     DelayedTask t = new DelayedTask(command, TimeUnit.nanoTime() + unit.toNanos(delay));
271 dl 1.4 super.execute(t);
272     return t;
273     }
274    
275     /**
276 dl 1.7 * Creates and executes a one-shot action that becomes enabled after the given date.
277 dl 1.6 * @param command the task to execute.
278     * @param date the time to commence excution.
279     * @return a handle that can be used to cancel the task.
280     * @throws RejectedExecutionException if task cannot be scheduled
281     * for execution because the executor has been shut down.
282 dl 1.4 */
283 dl 1.6 public DelayedTask schedule(Runnable command, Date date) {
284 dl 1.4 DelayedTask t = new DelayedTask
285 dl 1.6 (command,
286 dl 1.4 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
287     System.currentTimeMillis()));
288     super.execute(t);
289     return t;
290     }
291    
292     /**
293 dl 1.7 * Creates and executes a periodic action that becomes enabled first
294 dl 1.4 * after the given initial delay, and subsequently with the given
295 dl 1.7 * period; that is executions will commence after
296     * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
297     * <tt>initialDelay + 2 * period</tt>, and so on.
298 dl 1.6 * @param command the task to execute.
299     * @param initialDelay the time to delay first execution.
300     * @param period the period between successive executions.
301     * @param unit the time unit of the delay and period parameters
302     * @return a handle that can be used to cancel the task.
303     * @throws RejectedExecutionException if task cannot be scheduled
304     * for execution because the executor has been shut down.
305 dl 1.4 */
306 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
307 dl 1.4 DelayedTask t = new DelayedTask
308 dl 1.6 (command, TimeUnit.nanoTime() + unit.toNanos(initialDelay),
309 dl 1.7 unit.toNanos(period), true);
310 dl 1.4 super.execute(t);
311     return t;
312     }
313    
314     /**
315 dl 1.7 * Creates a periodic action that becomes enabled first after the
316     * given date, and subsequently with the given period
317     * period; that is executions will commence after
318     * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
319     * <tt>initialDate + 2 * period</tt>, and so on.
320 dl 1.6 * @param command the task to execute.
321     * @param initialDate the time to delay first execution.
322 dl 1.7 * @param period the period between commencement of successive
323     * executions.
324 dl 1.6 * @param unit the time unit of the period parameter.
325     * @return a handle that can be used to cancel the task.
326     * @throws RejectedExecutionException if task cannot be scheduled
327     * for execution because the executor has been shut down.
328 dl 1.4 */
329 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
330     DelayedTask t = new DelayedTask
331     (command,
332     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
333     System.currentTimeMillis()),
334     unit.toNanos(period), true);
335     super.execute(t);
336     return t;
337     }
338    
339     /**
340     * Creates and executes a periodic action that becomes enabled first
341     * after the given initial delay, and and subsequently with the
342     * given delay between the termination of one execution and the
343     * commencement of the next.
344     * @param command the task to execute.
345     * @param initialDelay the time to delay first execution.
346     * @param delay the delay between the termination of one
347     * execution and the commencement of the next.
348     * @param unit the time unit of the delay and delay parameters
349     * @return a handle that can be used to cancel the task.
350     * @throws RejectedExecutionException if task cannot be scheduled
351     * for execution because the executor has been shut down.
352     */
353     public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
354     DelayedTask t = new DelayedTask
355     (command, TimeUnit.nanoTime() + unit.toNanos(initialDelay),
356     unit.toNanos(delay), false);
357     super.execute(t);
358     return t;
359     }
360    
361     /**
362     * Creates a periodic action that becomes enabled first after the
363     * given date, and subsequently with the given delay between
364     * the termination of one execution and the commencement of the
365     * next.
366     * @param command the task to execute.
367     * @param initialDate the time to delay first execution.
368     * @param delay the delay between the termination of one
369     * execution and the commencement of the next.
370     * @param unit the time unit of the delay parameter.
371     * @return a handle that can be used to cancel the task.
372     * @throws RejectedExecutionException if task cannot be scheduled
373     * for execution because the executor has been shut down.
374     */
375     public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
376 dl 1.4 DelayedTask t = new DelayedTask
377 dl 1.6 (command,
378     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
379 dl 1.4 System.currentTimeMillis()),
380 dl 1.7 unit.toNanos(delay), false);
381 dl 1.4 super.execute(t);
382     return t;
383 tim 1.1 }
384    
385    
386 dl 1.4 /**
387 dl 1.7 * Creates and executes a Future that becomes enabled after the
388     * given delay.
389 dl 1.6 * @param callable the function to execute.
390     * @param delay the time from now to delay execution.
391     * @param unit the time unit of the delay parameter.
392     * @return a Future that can be used to extract result or cancel.
393     * @throws RejectedExecutionException if task cannot be scheduled
394     * for execution because the executor has been shut down.
395 dl 1.4 */
396     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
397     DelayedFutureTask<V> t = new DelayedFutureTask<V>
398     (callable, delay, unit);
399     super.execute(t);
400     return t;
401     }
402    
403     /**
404 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
405 dl 1.4 * the given date.
406 dl 1.6 * @param callable the function to execute.
407     * @param date the time to commence excution.
408     * @return a Future that can be used to extract result or cancel.
409     * @throws RejectedExecutionException if task cannot be scheduled
410     * for execution because the executor has been shut down.
411 dl 1.4 */
412     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
413     DelayedFutureTask<V> t = new DelayedFutureTask<V>
414     (callable, date);
415     super.execute(t);
416     return t;
417 tim 1.1 }
418    
419 dl 1.4 /**
420     * Execute command with zero required delay
421 dl 1.6 * @param command the task to execute
422     * @throws RejectedExecutionException at discretion of
423     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
424     * for execution because the executor has been shut down.
425 dl 1.4 */
426     public void execute(Runnable command) {
427     schedule(command, 0, TimeUnit.NANOSECONDS);
428 tim 1.1 }
429    
430 dl 1.4 /**
431     * If executed task was periodic, cause the task for the next
432     * period to execute.
433     */
434     protected void afterExecute(Runnable r, Throwable t) {
435     if (isShutdown())
436     return;
437     super.afterExecute(r, t);
438     DelayedTask d = (DelayedTask)r;
439     DelayedTask next = d.nextTask();
440 dl 1.6 if (next == null)
441     return;
442     try {
443 dl 1.4 super.execute(next);
444 dl 1.6 }
445     catch(RejectedExecutionException ex) {
446     // lost race to detect shutdown; ignore
447     }
448 dl 1.4 }
449 tim 1.1 }
450 dl 1.4
451