ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.12
Committed: Thu Aug 7 16:00:28 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.11: +15 -3 lines
Log Message:
ScheduledExecutor must prestart core threads

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.4 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.atomic.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.7 * An <tt>Executor</tt> that can schedule command to run after a given
13     * delay, or to execute periodically. This class is preferable to
14 tim 1.1 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15     * or when the additional flexibility or capabilities of
16 dl 1.7 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17     * required.
18     *
19     * <p> The <tt>schedule</tt> methods create tasks with various delays
20     * and return a task object that can be used to cancel or check
21     * execution. The <tt>scheduleAtFixedRate</tt> and
22     * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23     * that run periodically until cancelled. Commands submitted using
24     * the <tt>execute</tt> method are scheduled with a requested delay of
25     * zero.
26     *
27     * <p> Delayed tasks execute no sooner than they are enabled, but
28     * without any real-time guarantees about when, after they are enabled
29     * they will commence. Tasks tied for the same execution time are
30     * enabled in first-in-first-out (FIFO) order of submission. An
31     * internal {@link DelayQueue} used for scheduling relies on relative
32     * delays, which may drift from absolute times (as returned by
33     * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 tim 1.1 *
35     * @since 1.5
36     * @see Executors
37     *
38     * @spec JSR-166
39 dl 1.9 * @author Doug Lea
40 tim 1.1 */
41     public class ScheduledExecutor extends ThreadPoolExecutor {
42    
43 dl 1.4 /**
44 dl 1.5 * Sequence number to break scheduling ties, and in turn to
45     * guarantee FIFO order among tied entries.
46     */
47     private static final AtomicLong sequencer = new AtomicLong(0);
48    
49     /**
50 dl 1.4 * A delayed or periodic action.
51     */
52 dl 1.8 public static class DelayedTask extends CancellableTask implements Delayed {
53 dl 1.9 /** Sequence number to break ties FIFO */
54 dl 1.4 private final long sequenceNumber;
55 dl 1.9 /** The time the task is enabled to execute in nanoTime units */
56 dl 1.4 private final long time;
57 dl 1.9 /** The delay forllowing next time, or <= 0 if non-periodic */
58 dl 1.4 private final long period;
59 dl 1.9 /** true if at fixed rate; false if fixed delay */
60     private final boolean rateBased;
61    
62 dl 1.4 /**
63 dl 1.5 * Creates a one-shot action with given nanoTime-based trigger time
64 dl 1.4 */
65     DelayedTask(Runnable r, long ns) {
66     super(r);
67     this.time = ns;
68     this.period = 0;
69 dl 1.7 rateBased = false;
70 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
71     }
72    
73     /**
74 dl 1.5 * Creates a periodic action with given nano time and period
75 dl 1.4 */
76 dl 1.7 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
77 dl 1.4 super(r);
78     if (period <= 0)
79     throw new IllegalArgumentException();
80     this.time = ns;
81     this.period = period;
82 dl 1.7 this.rateBased = rateBased;
83 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
84     }
85    
86    
87     public long getDelay(TimeUnit unit) {
88 dl 1.12 long d = unit.convert(time - System.nanoTime(),
89 dl 1.7 TimeUnit.NANOSECONDS);
90 dl 1.12 return d;
91 dl 1.4 }
92    
93     public int compareTo(Object other) {
94     DelayedTask x = (DelayedTask)other;
95     long diff = time - x.time;
96     if (diff < 0)
97     return -1;
98     else if (diff > 0)
99     return 1;
100     else if (sequenceNumber < x.sequenceNumber)
101     return -1;
102     else
103     return 1;
104     }
105    
106     /**
107     * Return true if this is a periodic (not a one-shot) action.
108 dl 1.9 * @return true if periodic
109 dl 1.4 */
110     public boolean isPeriodic() {
111     return period > 0;
112     }
113    
114     /**
115 tim 1.11 * Returns the period, or zero if non-periodic.
116     *
117 dl 1.9 * @return the period
118 dl 1.4 */
119     public long getPeriod(TimeUnit unit) {
120     return unit.convert(period, TimeUnit.NANOSECONDS);
121     }
122    
123     /**
124     * Return a new DelayedTask that will trigger in the period
125     * subsequent to current task, or null if non-periodic
126     * or canceled.
127     */
128 dl 1.5 DelayedTask nextTask() {
129 dl 1.4 if (period <= 0 || isCancelled())
130     return null;
131 dl 1.10 long nextTime = period + (rateBased ? time : System.nanoTime());
132 dl 1.7 return new DelayedTask(getRunnable(), nextTime, period, rateBased);
133 dl 1.4 }
134 dl 1.2
135 dl 1.4 }
136 dl 1.2
137 dl 1.4 /**
138     * A delayed result-bearing action.
139     */
140 dl 1.8 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
141 dl 1.4 /**
142     * Creates a Future that may trigger after the given delay.
143     */
144     DelayedFutureTask(Callable<V> callable, long delay, TimeUnit unit) {
145     // must set after super ctor call to use inner class
146 dl 1.10 super(null, System.nanoTime() + unit.toNanos(delay));
147 dl 1.4 setRunnable(new InnerCancellableFuture<V>(callable));
148 dl 1.2 }
149    
150 dl 1.4 /**
151     * Creates a one-shot action that may trigger after the given date.
152     */
153     DelayedFutureTask(Callable<V> callable, Date date) {
154     super(null,
155     TimeUnit.MILLISECONDS.toNanos(date.getTime() -
156     System.currentTimeMillis()));
157     setRunnable(new InnerCancellableFuture<V>(callable));
158     }
159    
160     public V get() throws InterruptedException, ExecutionException {
161     return ((InnerCancellableFuture<V>)getRunnable()).get();
162 dl 1.2 }
163    
164 dl 1.4 public V get(long timeout, TimeUnit unit)
165     throws InterruptedException, ExecutionException, TimeoutException {
166     return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
167 dl 1.2 }
168 tim 1.1
169 dl 1.4 protected void set(V v) {
170     ((InnerCancellableFuture<V>)getRunnable()).set(v);
171 dl 1.2 }
172 tim 1.1
173 dl 1.4 protected void setException(Throwable t) {
174     ((InnerCancellableFuture<V>)getRunnable()).setException(t);
175     }
176     }
177    
178    
179     /**
180     * An annoying wrapper class to convince generics compiler to
181 dl 1.7 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
182 dl 1.4 */
183 dl 1.12 private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
184 dl 1.4 private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
185     public Runnable poll() { return dq.poll(); }
186     public Runnable peek() { return dq.peek(); }
187     public Runnable take() throws InterruptedException { return dq.take(); }
188     public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
189     return dq.poll(timeout, unit);
190 dl 1.2 }
191 dl 1.12
192     public boolean add(Runnable x) { return dq.add((DelayedTask)x); }
193 dl 1.4 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
194     public void put(Runnable x) throws InterruptedException {
195     dq.put((DelayedTask)x);
196 dl 1.2 }
197 dl 1.4 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
198     return dq.offer((DelayedTask)x, timeout, unit);
199 dl 1.2 }
200 dl 1.12
201     public Runnable remove() { return dq.remove(); }
202     public Runnable element() { return dq.element(); }
203     public void clear() { dq.clear(); }
204    
205 dl 1.4 public int remainingCapacity() { return dq.remainingCapacity(); }
206     public boolean remove(Object x) { return dq.remove(x); }
207     public boolean contains(Object x) { return dq.contains(x); }
208     public int size() { return dq.size(); }
209 tim 1.11 public boolean isEmpty() { return dq.isEmpty(); }
210 dl 1.4 public Iterator<Runnable> iterator() {
211     return new Iterator<Runnable>() {
212     private Iterator<DelayedTask> it = dq.iterator();
213     public boolean hasNext() { return it.hasNext(); }
214     public Runnable next() { return it.next(); }
215     public void remove() { it.remove(); }
216     };
217 tim 1.1 }
218 dl 1.4 }
219 tim 1.1
220 dl 1.4 /**
221     * Creates a new ScheduledExecutor with the given initial parameters.
222     *
223     * @param corePoolSize the number of threads to keep in the pool,
224     * even if they are idle.
225     */
226     public ScheduledExecutor(int corePoolSize) {
227     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
228     new DelayedWorkQueue());
229 dl 1.12 prestartCoreThreads();
230 dl 1.4 }
231 tim 1.1
232 dl 1.4 /**
233     * Creates a new ScheduledExecutor with the given initial parameters.
234     *
235     * @param corePoolSize the number of threads to keep in the pool,
236     * even if they are idle.
237     * @param threadFactory the factory to use when the executor
238     * creates a new thread.
239     */
240     public ScheduledExecutor(int corePoolSize,
241     ThreadFactory threadFactory) {
242     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
243     new DelayedWorkQueue(), threadFactory);
244 dl 1.12 prestartCoreThreads();
245 tim 1.1 }
246    
247 dl 1.4 /**
248     * Creates a new ScheduledExecutor with the given initial parameters.
249     *
250     * @param corePoolSize the number of threads to keep in the pool,
251     * even if they are idle.
252     * @param handler the handler to use when execution is blocked
253     * because the thread bounds and queue capacities are reached.
254     */
255     public ScheduledExecutor(int corePoolSize,
256     RejectedExecutionHandler handler) {
257     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
258     new DelayedWorkQueue(), handler);
259 dl 1.12 prestartCoreThreads();
260 dl 1.4 }
261 dl 1.2
262 tim 1.1 /**
263 dl 1.2 * Creates a new ScheduledExecutor with the given initial parameters.
264     *
265     * @param corePoolSize the number of threads to keep in the pool,
266     * even if they are idle.
267 dl 1.4 * @param threadFactory the factory to use when the executor
268     * creates a new thread.
269     * @param handler the handler to use when execution is blocked
270     * because the thread bounds and queue capacities are reached.
271 tim 1.1 */
272 dl 1.4 public ScheduledExecutor(int corePoolSize,
273     ThreadFactory threadFactory,
274     RejectedExecutionHandler handler) {
275 dl 1.2 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
276 dl 1.4 new DelayedWorkQueue(), threadFactory, handler);
277 dl 1.12 prestartCoreThreads();
278 dl 1.4 }
279    
280     /**
281 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
282 dl 1.4 * the given delay.
283 dl 1.6 * @param command the task to execute.
284     * @param delay the time from now to delay execution.
285     * @param unit the time unit of the delay parameter.
286     * @return a handle that can be used to cancel the task.
287 dl 1.4 */
288    
289 dl 1.6 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
290 dl 1.10 DelayedTask t = new DelayedTask(command, System.nanoTime() + unit.toNanos(delay));
291 dl 1.4 super.execute(t);
292     return t;
293     }
294    
295     /**
296 dl 1.9 * Creates and executes a one-shot action that becomes enabled
297     * after the given date.
298 dl 1.6 * @param command the task to execute.
299     * @param date the time to commence excution.
300     * @return a handle that can be used to cancel the task.
301     * @throws RejectedExecutionException if task cannot be scheduled
302     * for execution because the executor has been shut down.
303 dl 1.4 */
304 dl 1.6 public DelayedTask schedule(Runnable command, Date date) {
305 dl 1.4 DelayedTask t = new DelayedTask
306 dl 1.6 (command,
307 dl 1.4 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
308     System.currentTimeMillis()));
309     super.execute(t);
310     return t;
311     }
312    
313     /**
314 dl 1.7 * Creates and executes a periodic action that becomes enabled first
315 dl 1.4 * after the given initial delay, and subsequently with the given
316 dl 1.7 * period; that is executions will commence after
317     * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
318     * <tt>initialDelay + 2 * period</tt>, and so on.
319 dl 1.6 * @param command the task to execute.
320     * @param initialDelay the time to delay first execution.
321     * @param period the period between successive executions.
322     * @param unit the time unit of the delay and period parameters
323     * @return a handle that can be used to cancel the task.
324     * @throws RejectedExecutionException if task cannot be scheduled
325     * for execution because the executor has been shut down.
326 dl 1.4 */
327 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
328 dl 1.4 DelayedTask t = new DelayedTask
329 dl 1.10 (command, System.nanoTime() + unit.toNanos(initialDelay),
330 dl 1.7 unit.toNanos(period), true);
331 dl 1.4 super.execute(t);
332     return t;
333     }
334    
335     /**
336 dl 1.7 * Creates a periodic action that becomes enabled first after the
337     * given date, and subsequently with the given period
338     * period; that is executions will commence after
339     * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
340     * <tt>initialDate + 2 * period</tt>, and so on.
341 dl 1.6 * @param command the task to execute.
342     * @param initialDate the time to delay first execution.
343 dl 1.7 * @param period the period between commencement of successive
344     * executions.
345 dl 1.6 * @param unit the time unit of the period parameter.
346     * @return a handle that can be used to cancel the task.
347     * @throws RejectedExecutionException if task cannot be scheduled
348     * for execution because the executor has been shut down.
349 dl 1.4 */
350 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
351     DelayedTask t = new DelayedTask
352     (command,
353     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
354     System.currentTimeMillis()),
355     unit.toNanos(period), true);
356     super.execute(t);
357     return t;
358     }
359    
360     /**
361     * Creates and executes a periodic action that becomes enabled first
362     * after the given initial delay, and and subsequently with the
363     * given delay between the termination of one execution and the
364     * commencement of the next.
365     * @param command the task to execute.
366     * @param initialDelay the time to delay first execution.
367     * @param delay the delay between the termination of one
368     * execution and the commencement of the next.
369     * @param unit the time unit of the delay and delay parameters
370     * @return a handle that can be used to cancel the task.
371     * @throws RejectedExecutionException if task cannot be scheduled
372     * for execution because the executor has been shut down.
373     */
374     public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
375     DelayedTask t = new DelayedTask
376 dl 1.10 (command, System.nanoTime() + unit.toNanos(initialDelay),
377 dl 1.7 unit.toNanos(delay), false);
378     super.execute(t);
379     return t;
380     }
381    
382     /**
383     * Creates a periodic action that becomes enabled first after the
384     * given date, and subsequently with the given delay between
385     * the termination of one execution and the commencement of the
386     * next.
387     * @param command the task to execute.
388     * @param initialDate the time to delay first execution.
389     * @param delay the delay between the termination of one
390     * execution and the commencement of the next.
391     * @param unit the time unit of the delay parameter.
392     * @return a handle that can be used to cancel the task.
393     * @throws RejectedExecutionException if task cannot be scheduled
394     * for execution because the executor has been shut down.
395     */
396     public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
397 dl 1.4 DelayedTask t = new DelayedTask
398 dl 1.6 (command,
399     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
400 dl 1.4 System.currentTimeMillis()),
401 dl 1.7 unit.toNanos(delay), false);
402 dl 1.4 super.execute(t);
403     return t;
404 tim 1.1 }
405    
406    
407 dl 1.4 /**
408 dl 1.7 * Creates and executes a Future that becomes enabled after the
409     * given delay.
410 dl 1.6 * @param callable the function to execute.
411     * @param delay the time from now to delay execution.
412     * @param unit the time unit of the delay parameter.
413     * @return a Future that can be used to extract result or cancel.
414     * @throws RejectedExecutionException if task cannot be scheduled
415     * for execution because the executor has been shut down.
416 dl 1.4 */
417     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
418     DelayedFutureTask<V> t = new DelayedFutureTask<V>
419     (callable, delay, unit);
420     super.execute(t);
421     return t;
422     }
423    
424     /**
425 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
426 dl 1.4 * the given date.
427 dl 1.6 * @param callable the function to execute.
428     * @param date the time to commence excution.
429     * @return a Future that can be used to extract result or cancel.
430     * @throws RejectedExecutionException if task cannot be scheduled
431     * for execution because the executor has been shut down.
432 dl 1.4 */
433     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
434     DelayedFutureTask<V> t = new DelayedFutureTask<V>
435     (callable, date);
436     super.execute(t);
437     return t;
438 tim 1.1 }
439    
440 dl 1.4 /**
441 tim 1.11 * Execute command with zero required delay.
442     *
443 dl 1.6 * @param command the task to execute
444     * @throws RejectedExecutionException at discretion of
445     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
446     * for execution because the executor has been shut down.
447 dl 1.4 */
448     public void execute(Runnable command) {
449     schedule(command, 0, TimeUnit.NANOSECONDS);
450 tim 1.1 }
451    
452 dl 1.4 /**
453     * If executed task was periodic, cause the task for the next
454     * period to execute.
455 dl 1.9 * @param r the task (assumed to be a DelayedTask)
456     * @param t the exception
457 dl 1.4 */
458     protected void afterExecute(Runnable r, Throwable t) {
459     if (isShutdown())
460     return;
461     super.afterExecute(r, t);
462     DelayedTask d = (DelayedTask)r;
463     DelayedTask next = d.nextTask();
464 dl 1.6 if (next == null)
465     return;
466     try {
467 dl 1.12 execute(next);
468 dl 1.6 }
469     catch(RejectedExecutionException ex) {
470     // lost race to detect shutdown; ignore
471     }
472 dl 1.4 }
473 tim 1.1 }
474 dl 1.4
475