ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.13
Committed: Fri Aug 8 17:48:44 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.12: +32 -12 lines
Log Message:
Semi-lazily prestart core threads in SE

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.4 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.4 import java.util.concurrent.atomic.*;
9 dl 1.2 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.7 * An <tt>Executor</tt> that can schedule command to run after a given
13     * delay, or to execute periodically. This class is preferable to
14 tim 1.1 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15     * or when the additional flexibility or capabilities of
16 dl 1.7 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17     * required.
18     *
19     * <p> The <tt>schedule</tt> methods create tasks with various delays
20     * and return a task object that can be used to cancel or check
21     * execution. The <tt>scheduleAtFixedRate</tt> and
22     * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23     * that run periodically until cancelled. Commands submitted using
24     * the <tt>execute</tt> method are scheduled with a requested delay of
25     * zero.
26     *
27     * <p> Delayed tasks execute no sooner than they are enabled, but
28     * without any real-time guarantees about when, after they are enabled
29     * they will commence. Tasks tied for the same execution time are
30     * enabled in first-in-first-out (FIFO) order of submission. An
31     * internal {@link DelayQueue} used for scheduling relies on relative
32     * delays, which may drift from absolute times (as returned by
33     * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 tim 1.1 *
35 dl 1.13 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36     * of the inherited tuning methods are not especially useful for
37     * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
38     * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
39     * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
40     * useful effect.
41     *
42 tim 1.1 * @since 1.5
43     * @see Executors
44     *
45     * @spec JSR-166
46 dl 1.9 * @author Doug Lea
47 tim 1.1 */
48     public class ScheduledExecutor extends ThreadPoolExecutor {
49    
50 dl 1.4 /**
51 dl 1.5 * Sequence number to break scheduling ties, and in turn to
52     * guarantee FIFO order among tied entries.
53     */
54     private static final AtomicLong sequencer = new AtomicLong(0);
55    
56     /**
57 dl 1.4 * A delayed or periodic action.
58     */
59 dl 1.8 public static class DelayedTask extends CancellableTask implements Delayed {
60 dl 1.9 /** Sequence number to break ties FIFO */
61 dl 1.4 private final long sequenceNumber;
62 dl 1.9 /** The time the task is enabled to execute in nanoTime units */
63 dl 1.4 private final long time;
64 dl 1.9 /** The delay forllowing next time, or <= 0 if non-periodic */
65 dl 1.4 private final long period;
66 dl 1.9 /** true if at fixed rate; false if fixed delay */
67     private final boolean rateBased;
68    
69 dl 1.4 /**
70 dl 1.5 * Creates a one-shot action with given nanoTime-based trigger time
71 dl 1.4 */
72     DelayedTask(Runnable r, long ns) {
73     super(r);
74     this.time = ns;
75     this.period = 0;
76 dl 1.7 rateBased = false;
77 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
78     }
79    
80     /**
81 dl 1.5 * Creates a periodic action with given nano time and period
82 dl 1.4 */
83 dl 1.7 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
84 dl 1.4 super(r);
85     if (period <= 0)
86     throw new IllegalArgumentException();
87     this.time = ns;
88     this.period = period;
89 dl 1.7 this.rateBased = rateBased;
90 dl 1.4 this.sequenceNumber = sequencer.getAndIncrement();
91     }
92    
93    
94     public long getDelay(TimeUnit unit) {
95 dl 1.12 long d = unit.convert(time - System.nanoTime(),
96 dl 1.7 TimeUnit.NANOSECONDS);
97 dl 1.12 return d;
98 dl 1.4 }
99    
100     public int compareTo(Object other) {
101     DelayedTask x = (DelayedTask)other;
102     long diff = time - x.time;
103     if (diff < 0)
104     return -1;
105     else if (diff > 0)
106     return 1;
107     else if (sequenceNumber < x.sequenceNumber)
108     return -1;
109     else
110     return 1;
111     }
112    
113     /**
114     * Return true if this is a periodic (not a one-shot) action.
115 dl 1.9 * @return true if periodic
116 dl 1.4 */
117     public boolean isPeriodic() {
118     return period > 0;
119     }
120    
121     /**
122 tim 1.11 * Returns the period, or zero if non-periodic.
123     *
124 dl 1.9 * @return the period
125 dl 1.4 */
126     public long getPeriod(TimeUnit unit) {
127     return unit.convert(period, TimeUnit.NANOSECONDS);
128     }
129    
130     /**
131     * Return a new DelayedTask that will trigger in the period
132     * subsequent to current task, or null if non-periodic
133     * or canceled.
134     */
135 dl 1.5 DelayedTask nextTask() {
136 dl 1.4 if (period <= 0 || isCancelled())
137     return null;
138 dl 1.10 long nextTime = period + (rateBased ? time : System.nanoTime());
139 dl 1.7 return new DelayedTask(getRunnable(), nextTime, period, rateBased);
140 dl 1.4 }
141 dl 1.2
142 dl 1.4 }
143 dl 1.2
144 dl 1.4 /**
145     * A delayed result-bearing action.
146     */
147 dl 1.8 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
148 dl 1.4 /**
149     * Creates a Future that may trigger after the given delay.
150     */
151     DelayedFutureTask(Callable<V> callable, long delay, TimeUnit unit) {
152     // must set after super ctor call to use inner class
153 dl 1.10 super(null, System.nanoTime() + unit.toNanos(delay));
154 dl 1.4 setRunnable(new InnerCancellableFuture<V>(callable));
155 dl 1.2 }
156    
157 dl 1.4 /**
158     * Creates a one-shot action that may trigger after the given date.
159     */
160     DelayedFutureTask(Callable<V> callable, Date date) {
161     super(null,
162     TimeUnit.MILLISECONDS.toNanos(date.getTime() -
163     System.currentTimeMillis()));
164     setRunnable(new InnerCancellableFuture<V>(callable));
165     }
166    
167     public V get() throws InterruptedException, ExecutionException {
168     return ((InnerCancellableFuture<V>)getRunnable()).get();
169 dl 1.2 }
170    
171 dl 1.4 public V get(long timeout, TimeUnit unit)
172     throws InterruptedException, ExecutionException, TimeoutException {
173     return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
174 dl 1.2 }
175 tim 1.1
176 dl 1.4 protected void set(V v) {
177     ((InnerCancellableFuture<V>)getRunnable()).set(v);
178 dl 1.2 }
179 tim 1.1
180 dl 1.4 protected void setException(Throwable t) {
181     ((InnerCancellableFuture<V>)getRunnable()).setException(t);
182     }
183     }
184    
185    
186     /**
187     * An annoying wrapper class to convince generics compiler to
188 dl 1.7 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
189 dl 1.4 */
190 dl 1.12 private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
191 dl 1.4 private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
192     public Runnable poll() { return dq.poll(); }
193     public Runnable peek() { return dq.peek(); }
194     public Runnable take() throws InterruptedException { return dq.take(); }
195     public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
196     return dq.poll(timeout, unit);
197 dl 1.2 }
198 dl 1.12
199     public boolean add(Runnable x) { return dq.add((DelayedTask)x); }
200 dl 1.4 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
201     public void put(Runnable x) throws InterruptedException {
202     dq.put((DelayedTask)x);
203 dl 1.2 }
204 dl 1.4 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
205     return dq.offer((DelayedTask)x, timeout, unit);
206 dl 1.2 }
207 dl 1.12
208     public Runnable remove() { return dq.remove(); }
209     public Runnable element() { return dq.element(); }
210     public void clear() { dq.clear(); }
211    
212 dl 1.4 public int remainingCapacity() { return dq.remainingCapacity(); }
213     public boolean remove(Object x) { return dq.remove(x); }
214     public boolean contains(Object x) { return dq.contains(x); }
215     public int size() { return dq.size(); }
216 tim 1.11 public boolean isEmpty() { return dq.isEmpty(); }
217 dl 1.4 public Iterator<Runnable> iterator() {
218     return new Iterator<Runnable>() {
219     private Iterator<DelayedTask> it = dq.iterator();
220     public boolean hasNext() { return it.hasNext(); }
221     public Runnable next() { return it.next(); }
222     public void remove() { it.remove(); }
223     };
224 tim 1.1 }
225 dl 1.4 }
226 tim 1.1
227 dl 1.4 /**
228     * Creates a new ScheduledExecutor with the given initial parameters.
229     *
230     * @param corePoolSize the number of threads to keep in the pool,
231     * even if they are idle.
232     */
233     public ScheduledExecutor(int corePoolSize) {
234     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
235     new DelayedWorkQueue());
236     }
237 tim 1.1
238 dl 1.4 /**
239     * Creates a new ScheduledExecutor with the given initial parameters.
240     *
241     * @param corePoolSize the number of threads to keep in the pool,
242     * even if they are idle.
243     * @param threadFactory the factory to use when the executor
244     * creates a new thread.
245     */
246     public ScheduledExecutor(int corePoolSize,
247     ThreadFactory threadFactory) {
248     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
249     new DelayedWorkQueue(), threadFactory);
250 tim 1.1 }
251    
252 dl 1.4 /**
253     * Creates a new ScheduledExecutor with the given initial parameters.
254     *
255     * @param corePoolSize the number of threads to keep in the pool,
256     * even if they are idle.
257     * @param handler the handler to use when execution is blocked
258     * because the thread bounds and queue capacities are reached.
259     */
260     public ScheduledExecutor(int corePoolSize,
261     RejectedExecutionHandler handler) {
262     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
263     new DelayedWorkQueue(), handler);
264     }
265 dl 1.2
266 tim 1.1 /**
267 dl 1.2 * Creates a new ScheduledExecutor with the given initial parameters.
268     *
269     * @param corePoolSize the number of threads to keep in the pool,
270     * even if they are idle.
271 dl 1.4 * @param threadFactory the factory to use when the executor
272     * creates a new thread.
273     * @param handler the handler to use when execution is blocked
274     * because the thread bounds and queue capacities are reached.
275 tim 1.1 */
276 dl 1.4 public ScheduledExecutor(int corePoolSize,
277     ThreadFactory threadFactory,
278     RejectedExecutionHandler handler) {
279 dl 1.2 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
280 dl 1.4 new DelayedWorkQueue(), threadFactory, handler);
281 dl 1.13 }
282    
283     /**
284     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
285     */
286     void delayedExecute(Runnable command) {
287     if (isShutdown()) {
288     reject(command);
289     return;
290     }
291     // Prestart thread if necessary. We cannot prestart it running
292     // the task because the task (probably) shouldn't be run yet,
293     // so thread will just idle until delay elapses.
294     if (getPoolSize() < getCorePoolSize())
295     addIfUnderCorePoolSize(null);
296    
297     getQueue().offer(command);
298 dl 1.4 }
299    
300     /**
301 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
302 dl 1.4 * the given delay.
303 dl 1.6 * @param command the task to execute.
304     * @param delay the time from now to delay execution.
305     * @param unit the time unit of the delay parameter.
306     * @return a handle that can be used to cancel the task.
307 dl 1.4 */
308    
309 dl 1.6 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
310 dl 1.10 DelayedTask t = new DelayedTask(command, System.nanoTime() + unit.toNanos(delay));
311 dl 1.13 delayedExecute(t);
312 dl 1.4 return t;
313     }
314    
315     /**
316 dl 1.9 * Creates and executes a one-shot action that becomes enabled
317     * after the given date.
318 dl 1.6 * @param command the task to execute.
319     * @param date the time to commence excution.
320     * @return a handle that can be used to cancel the task.
321     * @throws RejectedExecutionException if task cannot be scheduled
322     * for execution because the executor has been shut down.
323 dl 1.4 */
324 dl 1.6 public DelayedTask schedule(Runnable command, Date date) {
325 dl 1.4 DelayedTask t = new DelayedTask
326 dl 1.6 (command,
327 dl 1.4 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
328     System.currentTimeMillis()));
329 dl 1.13 delayedExecute(t);
330 dl 1.4 return t;
331     }
332    
333     /**
334 dl 1.7 * Creates and executes a periodic action that becomes enabled first
335 dl 1.4 * after the given initial delay, and subsequently with the given
336 dl 1.7 * period; that is executions will commence after
337     * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
338     * <tt>initialDelay + 2 * period</tt>, and so on.
339 dl 1.6 * @param command the task to execute.
340     * @param initialDelay the time to delay first execution.
341     * @param period the period between successive executions.
342     * @param unit the time unit of the delay and period parameters
343     * @return a handle that can be used to cancel the task.
344     * @throws RejectedExecutionException if task cannot be scheduled
345     * for execution because the executor has been shut down.
346 dl 1.4 */
347 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
348 dl 1.4 DelayedTask t = new DelayedTask
349 dl 1.10 (command, System.nanoTime() + unit.toNanos(initialDelay),
350 dl 1.7 unit.toNanos(period), true);
351 dl 1.13 delayedExecute(t);
352 dl 1.4 return t;
353     }
354    
355     /**
356 dl 1.7 * Creates a periodic action that becomes enabled first after the
357     * given date, and subsequently with the given period
358     * period; that is executions will commence after
359     * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
360     * <tt>initialDate + 2 * period</tt>, and so on.
361 dl 1.6 * @param command the task to execute.
362     * @param initialDate the time to delay first execution.
363 dl 1.7 * @param period the period between commencement of successive
364     * executions.
365 dl 1.6 * @param unit the time unit of the period parameter.
366     * @return a handle that can be used to cancel the task.
367     * @throws RejectedExecutionException if task cannot be scheduled
368     * for execution because the executor has been shut down.
369 dl 1.4 */
370 dl 1.7 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
371     DelayedTask t = new DelayedTask
372     (command,
373     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
374     System.currentTimeMillis()),
375     unit.toNanos(period), true);
376 dl 1.13 delayedExecute(t);
377 dl 1.7 return t;
378     }
379    
380     /**
381     * Creates and executes a periodic action that becomes enabled first
382     * after the given initial delay, and and subsequently with the
383     * given delay between the termination of one execution and the
384     * commencement of the next.
385     * @param command the task to execute.
386     * @param initialDelay the time to delay first execution.
387     * @param delay the delay between the termination of one
388     * execution and the commencement of the next.
389     * @param unit the time unit of the delay and delay parameters
390     * @return a handle that can be used to cancel the task.
391     * @throws RejectedExecutionException if task cannot be scheduled
392     * for execution because the executor has been shut down.
393     */
394     public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
395     DelayedTask t = new DelayedTask
396 dl 1.10 (command, System.nanoTime() + unit.toNanos(initialDelay),
397 dl 1.7 unit.toNanos(delay), false);
398 dl 1.13 delayedExecute(t);
399 dl 1.7 return t;
400     }
401    
402     /**
403     * Creates a periodic action that becomes enabled first after the
404     * given date, and subsequently with the given delay between
405     * the termination of one execution and the commencement of the
406     * next.
407     * @param command the task to execute.
408     * @param initialDate the time to delay first execution.
409     * @param delay the delay between the termination of one
410     * execution and the commencement of the next.
411     * @param unit the time unit of the delay parameter.
412     * @return a handle that can be used to cancel the task.
413     * @throws RejectedExecutionException if task cannot be scheduled
414     * for execution because the executor has been shut down.
415     */
416     public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
417 dl 1.4 DelayedTask t = new DelayedTask
418 dl 1.6 (command,
419     TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
420 dl 1.4 System.currentTimeMillis()),
421 dl 1.7 unit.toNanos(delay), false);
422 dl 1.13 delayedExecute(t);
423 dl 1.4 return t;
424 tim 1.1 }
425    
426    
427 dl 1.4 /**
428 dl 1.7 * Creates and executes a Future that becomes enabled after the
429     * given delay.
430 dl 1.6 * @param callable the function to execute.
431     * @param delay the time from now to delay execution.
432     * @param unit the time unit of the delay parameter.
433     * @return a Future that can be used to extract result or cancel.
434     * @throws RejectedExecutionException if task cannot be scheduled
435     * for execution because the executor has been shut down.
436 dl 1.4 */
437     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
438     DelayedFutureTask<V> t = new DelayedFutureTask<V>
439     (callable, delay, unit);
440 dl 1.13 delayedExecute(t);
441 dl 1.4 return t;
442     }
443    
444     /**
445 dl 1.7 * Creates and executes a one-shot action that becomes enabled after
446 dl 1.4 * the given date.
447 dl 1.6 * @param callable the function to execute.
448     * @param date the time to commence excution.
449     * @return a Future that can be used to extract result or cancel.
450     * @throws RejectedExecutionException if task cannot be scheduled
451     * for execution because the executor has been shut down.
452 dl 1.4 */
453     public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
454     DelayedFutureTask<V> t = new DelayedFutureTask<V>
455     (callable, date);
456 dl 1.13 delayedExecute(t);
457 dl 1.4 return t;
458 tim 1.1 }
459    
460 dl 1.4 /**
461 tim 1.11 * Execute command with zero required delay.
462     *
463 dl 1.6 * @param command the task to execute
464     * @throws RejectedExecutionException at discretion of
465     * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
466     * for execution because the executor has been shut down.
467 dl 1.4 */
468     public void execute(Runnable command) {
469     schedule(command, 0, TimeUnit.NANOSECONDS);
470 tim 1.1 }
471    
472 dl 1.4 /**
473     * If executed task was periodic, cause the task for the next
474     * period to execute.
475 dl 1.9 * @param r the task (assumed to be a DelayedTask)
476     * @param t the exception
477 dl 1.4 */
478     protected void afterExecute(Runnable r, Throwable t) {
479     if (isShutdown())
480     return;
481     super.afterExecute(r, t);
482     DelayedTask d = (DelayedTask)r;
483     DelayedTask next = d.nextTask();
484 dl 1.6 if (next == null)
485     return;
486     try {
487 dl 1.12 execute(next);
488 dl 1.6 }
489     catch(RejectedExecutionException ex) {
490     // lost race to detect shutdown; ignore
491     }
492 dl 1.4 }
493 tim 1.1 }
494 dl 1.4
495