ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.14
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 9 months ago) by tim
Branch: MAIN
Changes since 1.13: +1 -2 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An <tt>Executor</tt> that can schedule command to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission. An
31 * internal {@link DelayQueue} used for scheduling relies on relative
32 * delays, which may drift from absolute times (as returned by
33 * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 *
35 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36 * of the inherited tuning methods are not especially useful for
37 * it. In particular, because a <tt>ScheduledExecutor</tt> always acts
38 * as a fixed-sized pool using <tt>corePoolSize</tt> threads and an
39 * unbounded queue, adjustments to <tt>maximumPoolSize</tt> have no
40 * useful effect.
41 *
42 * @since 1.5
43 * @see Executors
44 *
45 * @spec JSR-166
46 * @author Doug Lea
47 */
48 public class ScheduledExecutor extends ThreadPoolExecutor {
49
50 /**
51 * Sequence number to break scheduling ties, and in turn to
52 * guarantee FIFO order among tied entries.
53 */
54 private static final AtomicLong sequencer = new AtomicLong(0);
55
56 /**
57 * A delayed or periodic action.
58 */
59 public static class DelayedTask extends CancellableTask implements Delayed {
60 /** Sequence number to break ties FIFO */
61 private final long sequenceNumber;
62 /** The time the task is enabled to execute in nanoTime units */
63 private final long time;
64 /** The delay forllowing next time, or <= 0 if non-periodic */
65 private final long period;
66 /** true if at fixed rate; false if fixed delay */
67 private final boolean rateBased;
68
69 /**
70 * Creates a one-shot action with given nanoTime-based trigger time
71 */
72 DelayedTask(Runnable r, long ns) {
73 super(r);
74 this.time = ns;
75 this.period = 0;
76 rateBased = false;
77 this.sequenceNumber = sequencer.getAndIncrement();
78 }
79
80 /**
81 * Creates a periodic action with given nano time and period
82 */
83 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
84 super(r);
85 if (period <= 0)
86 throw new IllegalArgumentException();
87 this.time = ns;
88 this.period = period;
89 this.rateBased = rateBased;
90 this.sequenceNumber = sequencer.getAndIncrement();
91 }
92
93
94 public long getDelay(TimeUnit unit) {
95 long d = unit.convert(time - System.nanoTime(),
96 TimeUnit.NANOSECONDS);
97 return d;
98 }
99
100 public int compareTo(Object other) {
101 DelayedTask x = (DelayedTask)other;
102 long diff = time - x.time;
103 if (diff < 0)
104 return -1;
105 else if (diff > 0)
106 return 1;
107 else if (sequenceNumber < x.sequenceNumber)
108 return -1;
109 else
110 return 1;
111 }
112
113 /**
114 * Return true if this is a periodic (not a one-shot) action.
115 * @return true if periodic
116 */
117 public boolean isPeriodic() {
118 return period > 0;
119 }
120
121 /**
122 * Returns the period, or zero if non-periodic.
123 *
124 * @return the period
125 */
126 public long getPeriod(TimeUnit unit) {
127 return unit.convert(period, TimeUnit.NANOSECONDS);
128 }
129
130 /**
131 * Return a new DelayedTask that will trigger in the period
132 * subsequent to current task, or null if non-periodic
133 * or canceled.
134 */
135 DelayedTask nextTask() {
136 if (period <= 0 || isCancelled())
137 return null;
138 long nextTime = period + (rateBased ? time : System.nanoTime());
139 return new DelayedTask(getRunnable(), nextTime, period, rateBased);
140 }
141
142 }
143
144 /**
145 * A delayed result-bearing action.
146 */
147 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
148 /**
149 * Creates a Future that may trigger after the given delay.
150 */
151 DelayedFutureTask(Callable<V> callable, long delay, TimeUnit unit) {
152 // must set after super ctor call to use inner class
153 super(null, System.nanoTime() + unit.toNanos(delay));
154 setRunnable(new InnerCancellableFuture<V>(callable));
155 }
156
157 /**
158 * Creates a one-shot action that may trigger after the given date.
159 */
160 DelayedFutureTask(Callable<V> callable, Date date) {
161 super(null,
162 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
163 System.currentTimeMillis()));
164 setRunnable(new InnerCancellableFuture<V>(callable));
165 }
166
167 public V get() throws InterruptedException, ExecutionException {
168 return ((InnerCancellableFuture<V>)getRunnable()).get();
169 }
170
171 public V get(long timeout, TimeUnit unit)
172 throws InterruptedException, ExecutionException, TimeoutException {
173 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
174 }
175
176 protected void set(V v) {
177 ((InnerCancellableFuture<V>)getRunnable()).set(v);
178 }
179
180 protected void setException(Throwable t) {
181 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
182 }
183 }
184
185
186 /**
187 * An annoying wrapper class to convince generics compiler to
188 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
189 */
190 private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {
191 private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
192 public Runnable poll() { return dq.poll(); }
193 public Runnable peek() { return dq.peek(); }
194 public Runnable take() throws InterruptedException { return dq.take(); }
195 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
196 return dq.poll(timeout, unit);
197 }
198
199 public boolean add(Runnable x) { return dq.add((DelayedTask)x); }
200 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
201 public void put(Runnable x) throws InterruptedException {
202 dq.put((DelayedTask)x);
203 }
204 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
205 return dq.offer((DelayedTask)x, timeout, unit);
206 }
207
208 public Runnable remove() { return dq.remove(); }
209 public Runnable element() { return dq.element(); }
210 public void clear() { dq.clear(); }
211
212 public int remainingCapacity() { return dq.remainingCapacity(); }
213 public boolean remove(Object x) { return dq.remove(x); }
214 public boolean contains(Object x) { return dq.contains(x); }
215 public int size() { return dq.size(); }
216 public boolean isEmpty() { return dq.isEmpty(); }
217 public Iterator<Runnable> iterator() {
218 return new Iterator<Runnable>() {
219 private Iterator<DelayedTask> it = dq.iterator();
220 public boolean hasNext() { return it.hasNext(); }
221 public Runnable next() { return it.next(); }
222 public void remove() { it.remove(); }
223 };
224 }
225 }
226
227 /**
228 * Creates a new ScheduledExecutor with the given initial parameters.
229 *
230 * @param corePoolSize the number of threads to keep in the pool,
231 * even if they are idle.
232 */
233 public ScheduledExecutor(int corePoolSize) {
234 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
235 new DelayedWorkQueue());
236 }
237
238 /**
239 * Creates a new ScheduledExecutor with the given initial parameters.
240 *
241 * @param corePoolSize the number of threads to keep in the pool,
242 * even if they are idle.
243 * @param threadFactory the factory to use when the executor
244 * creates a new thread.
245 */
246 public ScheduledExecutor(int corePoolSize,
247 ThreadFactory threadFactory) {
248 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
249 new DelayedWorkQueue(), threadFactory);
250 }
251
252 /**
253 * Creates a new ScheduledExecutor with the given initial parameters.
254 *
255 * @param corePoolSize the number of threads to keep in the pool,
256 * even if they are idle.
257 * @param handler the handler to use when execution is blocked
258 * because the thread bounds and queue capacities are reached.
259 */
260 public ScheduledExecutor(int corePoolSize,
261 RejectedExecutionHandler handler) {
262 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
263 new DelayedWorkQueue(), handler);
264 }
265
266 /**
267 * Creates a new ScheduledExecutor with the given initial parameters.
268 *
269 * @param corePoolSize the number of threads to keep in the pool,
270 * even if they are idle.
271 * @param threadFactory the factory to use when the executor
272 * creates a new thread.
273 * @param handler the handler to use when execution is blocked
274 * because the thread bounds and queue capacities are reached.
275 */
276 public ScheduledExecutor(int corePoolSize,
277 ThreadFactory threadFactory,
278 RejectedExecutionHandler handler) {
279 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
280 new DelayedWorkQueue(), threadFactory, handler);
281 }
282
283 /**
284 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
285 */
286 void delayedExecute(Runnable command) {
287 if (isShutdown()) {
288 reject(command);
289 return;
290 }
291 // Prestart thread if necessary. We cannot prestart it running
292 // the task because the task (probably) shouldn't be run yet,
293 // so thread will just idle until delay elapses.
294 if (getPoolSize() < getCorePoolSize())
295 addIfUnderCorePoolSize(null);
296
297 getQueue().offer(command);
298 }
299
300 /**
301 * Creates and executes a one-shot action that becomes enabled after
302 * the given delay.
303 * @param command the task to execute.
304 * @param delay the time from now to delay execution.
305 * @param unit the time unit of the delay parameter.
306 * @return a handle that can be used to cancel the task.
307 */
308
309 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
310 DelayedTask t = new DelayedTask(command, System.nanoTime() + unit.toNanos(delay));
311 delayedExecute(t);
312 return t;
313 }
314
315 /**
316 * Creates and executes a one-shot action that becomes enabled
317 * after the given date.
318 * @param command the task to execute.
319 * @param date the time to commence excution.
320 * @return a handle that can be used to cancel the task.
321 * @throws RejectedExecutionException if task cannot be scheduled
322 * for execution because the executor has been shut down.
323 */
324 public DelayedTask schedule(Runnable command, Date date) {
325 DelayedTask t = new DelayedTask
326 (command,
327 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
328 System.currentTimeMillis()));
329 delayedExecute(t);
330 return t;
331 }
332
333 /**
334 * Creates and executes a periodic action that becomes enabled first
335 * after the given initial delay, and subsequently with the given
336 * period; that is executions will commence after
337 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
338 * <tt>initialDelay + 2 * period</tt>, and so on.
339 * @param command the task to execute.
340 * @param initialDelay the time to delay first execution.
341 * @param period the period between successive executions.
342 * @param unit the time unit of the delay and period parameters
343 * @return a handle that can be used to cancel the task.
344 * @throws RejectedExecutionException if task cannot be scheduled
345 * for execution because the executor has been shut down.
346 */
347 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
348 DelayedTask t = new DelayedTask
349 (command, System.nanoTime() + unit.toNanos(initialDelay),
350 unit.toNanos(period), true);
351 delayedExecute(t);
352 return t;
353 }
354
355 /**
356 * Creates a periodic action that becomes enabled first after the
357 * given date, and subsequently with the given period
358 * period; that is executions will commence after
359 * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
360 * <tt>initialDate + 2 * period</tt>, and so on.
361 * @param command the task to execute.
362 * @param initialDate the time to delay first execution.
363 * @param period the period between commencement of successive
364 * executions.
365 * @param unit the time unit of the period parameter.
366 * @return a handle that can be used to cancel the task.
367 * @throws RejectedExecutionException if task cannot be scheduled
368 * for execution because the executor has been shut down.
369 */
370 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
371 DelayedTask t = new DelayedTask
372 (command,
373 TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
374 System.currentTimeMillis()),
375 unit.toNanos(period), true);
376 delayedExecute(t);
377 return t;
378 }
379
380 /**
381 * Creates and executes a periodic action that becomes enabled first
382 * after the given initial delay, and and subsequently with the
383 * given delay between the termination of one execution and the
384 * commencement of the next.
385 * @param command the task to execute.
386 * @param initialDelay the time to delay first execution.
387 * @param delay the delay between the termination of one
388 * execution and the commencement of the next.
389 * @param unit the time unit of the delay and delay parameters
390 * @return a handle that can be used to cancel the task.
391 * @throws RejectedExecutionException if task cannot be scheduled
392 * for execution because the executor has been shut down.
393 */
394 public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
395 DelayedTask t = new DelayedTask
396 (command, System.nanoTime() + unit.toNanos(initialDelay),
397 unit.toNanos(delay), false);
398 delayedExecute(t);
399 return t;
400 }
401
402 /**
403 * Creates a periodic action that becomes enabled first after the
404 * given date, and subsequently with the given delay between
405 * the termination of one execution and the commencement of the
406 * next.
407 * @param command the task to execute.
408 * @param initialDate the time to delay first execution.
409 * @param delay the delay between the termination of one
410 * execution and the commencement of the next.
411 * @param unit the time unit of the delay parameter.
412 * @return a handle that can be used to cancel the task.
413 * @throws RejectedExecutionException if task cannot be scheduled
414 * for execution because the executor has been shut down.
415 */
416 public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
417 DelayedTask t = new DelayedTask
418 (command,
419 TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
420 System.currentTimeMillis()),
421 unit.toNanos(delay), false);
422 delayedExecute(t);
423 return t;
424 }
425
426
427 /**
428 * Creates and executes a Future that becomes enabled after the
429 * given delay.
430 * @param callable the function to execute.
431 * @param delay the time from now to delay execution.
432 * @param unit the time unit of the delay parameter.
433 * @return a Future that can be used to extract result or cancel.
434 * @throws RejectedExecutionException if task cannot be scheduled
435 * for execution because the executor has been shut down.
436 */
437 public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
438 DelayedFutureTask<V> t = new DelayedFutureTask<V>
439 (callable, delay, unit);
440 delayedExecute(t);
441 return t;
442 }
443
444 /**
445 * Creates and executes a one-shot action that becomes enabled after
446 * the given date.
447 * @param callable the function to execute.
448 * @param date the time to commence excution.
449 * @return a Future that can be used to extract result or cancel.
450 * @throws RejectedExecutionException if task cannot be scheduled
451 * for execution because the executor has been shut down.
452 */
453 public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
454 DelayedFutureTask<V> t = new DelayedFutureTask<V>
455 (callable, date);
456 delayedExecute(t);
457 return t;
458 }
459
460 /**
461 * Execute command with zero required delay.
462 *
463 * @param command the task to execute
464 * @throws RejectedExecutionException at discretion of
465 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
466 * for execution because the executor has been shut down.
467 */
468 public void execute(Runnable command) {
469 schedule(command, 0, TimeUnit.NANOSECONDS);
470 }
471
472 /**
473 * If executed task was periodic, cause the task for the next
474 * period to execute.
475 * @param r the task (assumed to be a DelayedTask)
476 * @param t the exception
477 */
478 protected void afterExecute(Runnable r, Throwable t) {
479 if (isShutdown())
480 return;
481 super.afterExecute(r, t);
482 DelayedTask d = (DelayedTask)r;
483 DelayedTask next = d.nextTask();
484 if (next == null)
485 return;
486 try {
487 execute(next);
488 } catch(RejectedExecutionException ex) {
489 // lost race to detect shutdown; ignore
490 }
491 }
492 }
493
494