ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledExecutor.java
Revision: 1.8
Committed: Wed Jun 11 13:17:21 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.7: +2 -8 lines
Log Message:
Removed automatic queue removal on cancel; Added TPE purge; Fixed RL typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * An <tt>Executor</tt> that can schedule command to run after a given
13 * delay, or to execute periodically. This class is preferable to
14 * <tt>java.util.Timer</tt> when multiple worker threads are needed,
15 * or when the additional flexibility or capabilities of
16 * <tt>ThreadPoolExecutor</tt> (which this class extends) are
17 * required.
18 *
19 * <p> The <tt>schedule</tt> methods create tasks with various delays
20 * and return a task object that can be used to cancel or check
21 * execution. The <tt>scheduleAtFixedRate</tt> and
22 * <tt>scheduleWithFixedDelay</tt> methods create and execute tasks
23 * that run periodically until cancelled. Commands submitted using
24 * the <tt>execute</tt> method are scheduled with a requested delay of
25 * zero.
26 *
27 * <p> Delayed tasks execute no sooner than they are enabled, but
28 * without any real-time guarantees about when, after they are enabled
29 * they will commence. Tasks tied for the same execution time are
30 * enabled in first-in-first-out (FIFO) order of submission. An
31 * internal {@link DelayQueue} used for scheduling relies on relative
32 * delays, which may drift from absolute times (as returned by
33 * <tt>System.currentTimeMillis</tt>) over sufficiently long periods.
34 *
35 * @since 1.5
36 * @see Executors
37 *
38 * @spec JSR-166
39 */
40 public class ScheduledExecutor extends ThreadPoolExecutor {
41
42 /**
43 * Sequence number to break scheduling ties, and in turn to
44 * guarantee FIFO order among tied entries.
45 */
46 private static final AtomicLong sequencer = new AtomicLong(0);
47
48 /**
49 * A delayed or periodic action.
50 */
51 public static class DelayedTask extends CancellableTask implements Delayed {
52 private final long sequenceNumber;
53 private final long time;
54 private final long period;
55 private final boolean rateBased; // true if at fixed rate;
56 // false if fixed delay
57 /**
58 * Creates a one-shot action with given nanoTime-based trigger time
59 */
60 DelayedTask(Runnable r, long ns) {
61 super(r);
62 this.time = ns;
63 this.period = 0;
64 rateBased = false;
65 this.sequenceNumber = sequencer.getAndIncrement();
66 }
67
68 /**
69 * Creates a periodic action with given nano time and period
70 */
71 DelayedTask(Runnable r, long ns, long period, boolean rateBased) {
72 super(r);
73 if (period <= 0)
74 throw new IllegalArgumentException();
75 this.time = ns;
76 this.period = period;
77 this.rateBased = rateBased;
78 this.sequenceNumber = sequencer.getAndIncrement();
79 }
80
81
82 public long getDelay(TimeUnit unit) {
83 return unit.convert(time - TimeUnit.nanoTime(),
84 TimeUnit.NANOSECONDS);
85 }
86
87 public int compareTo(Object other) {
88 DelayedTask x = (DelayedTask)other;
89 long diff = time - x.time;
90 if (diff < 0)
91 return -1;
92 else if (diff > 0)
93 return 1;
94 else if (sequenceNumber < x.sequenceNumber)
95 return -1;
96 else
97 return 1;
98 }
99
100 /**
101 * Return true if this is a periodic (not a one-shot) action.
102 */
103 public boolean isPeriodic() {
104 return period > 0;
105 }
106
107 /**
108 * Returns the period, or zero if non-periodic
109 */
110 public long getPeriod(TimeUnit unit) {
111 return unit.convert(period, TimeUnit.NANOSECONDS);
112 }
113
114 /**
115 * Return a new DelayedTask that will trigger in the period
116 * subsequent to current task, or null if non-periodic
117 * or canceled.
118 */
119 DelayedTask nextTask() {
120 if (period <= 0 || isCancelled())
121 return null;
122 long nextTime = period + (rateBased ? time : TimeUnit.nanoTime());
123 return new DelayedTask(getRunnable(), nextTime, period, rateBased);
124 }
125
126 }
127
128 /**
129 * A delayed result-bearing action.
130 */
131 public static class DelayedFutureTask<V> extends DelayedTask implements Future<V> {
132 /**
133 * Creates a Future that may trigger after the given delay.
134 */
135 DelayedFutureTask(Callable<V> callable, long delay, TimeUnit unit) {
136 // must set after super ctor call to use inner class
137 super(null, TimeUnit.nanoTime() + unit.toNanos(delay));
138 setRunnable(new InnerCancellableFuture<V>(callable));
139 }
140
141 /**
142 * Creates a one-shot action that may trigger after the given date.
143 */
144 DelayedFutureTask(Callable<V> callable, Date date) {
145 super(null,
146 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
147 System.currentTimeMillis()));
148 setRunnable(new InnerCancellableFuture<V>(callable));
149 }
150
151 public V get() throws InterruptedException, ExecutionException {
152 return ((InnerCancellableFuture<V>)getRunnable()).get();
153 }
154
155 public V get(long timeout, TimeUnit unit)
156 throws InterruptedException, ExecutionException, TimeoutException {
157 return ((InnerCancellableFuture<V>)getRunnable()).get(timeout, unit);
158 }
159
160 protected void set(V v) {
161 ((InnerCancellableFuture<V>)getRunnable()).set(v);
162 }
163
164 protected void setException(Throwable t) {
165 ((InnerCancellableFuture<V>)getRunnable()).setException(t);
166 }
167 }
168
169
170 /**
171 * An annoying wrapper class to convince generics compiler to
172 * use a DelayQueue<DelayedTask> as a BlockingQueue<Runnable>
173 */
174 private static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
175 private final DelayQueue<DelayedTask> dq = new DelayQueue<DelayedTask>();
176 public Runnable poll() { return dq.poll(); }
177 public Runnable peek() { return dq.peek(); }
178 public Runnable take() throws InterruptedException { return dq.take(); }
179 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
180 return dq.poll(timeout, unit);
181 }
182 public boolean offer(Runnable x) { return dq.offer((DelayedTask)x); }
183 public void put(Runnable x) throws InterruptedException {
184 dq.put((DelayedTask)x);
185 }
186 public boolean offer(Runnable x, long timeout, TimeUnit unit) throws InterruptedException {
187 return dq.offer((DelayedTask)x, timeout, unit);
188 }
189 public int remainingCapacity() { return dq.remainingCapacity(); }
190 public boolean remove(Object x) { return dq.remove(x); }
191 public boolean contains(Object x) { return dq.contains(x); }
192 public int size() { return dq.size(); }
193 public boolean isEmpty() { return dq.isEmpty(); }
194 public Iterator<Runnable> iterator() {
195 return new Iterator<Runnable>() {
196 private Iterator<DelayedTask> it = dq.iterator();
197 public boolean hasNext() { return it.hasNext(); }
198 public Runnable next() { return it.next(); }
199 public void remove() { it.remove(); }
200 };
201 }
202 }
203
204 /**
205 * Creates a new ScheduledExecutor with the given initial parameters.
206 *
207 * @param corePoolSize the number of threads to keep in the pool,
208 * even if they are idle.
209 */
210 public ScheduledExecutor(int corePoolSize) {
211 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
212 new DelayedWorkQueue());
213 }
214
215 /**
216 * Creates a new ScheduledExecutor with the given initial parameters.
217 *
218 * @param corePoolSize the number of threads to keep in the pool,
219 * even if they are idle.
220 * @param threadFactory the factory to use when the executor
221 * creates a new thread.
222 */
223 public ScheduledExecutor(int corePoolSize,
224 ThreadFactory threadFactory) {
225 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
226 new DelayedWorkQueue(), threadFactory);
227 }
228
229 /**
230 * Creates a new ScheduledExecutor with the given initial parameters.
231 *
232 * @param corePoolSize the number of threads to keep in the pool,
233 * even if they are idle.
234 * @param handler the handler to use when execution is blocked
235 * because the thread bounds and queue capacities are reached.
236 */
237 public ScheduledExecutor(int corePoolSize,
238 RejectedExecutionHandler handler) {
239 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
240 new DelayedWorkQueue(), handler);
241 }
242
243 /**
244 * Creates a new ScheduledExecutor with the given initial parameters.
245 *
246 * @param corePoolSize the number of threads to keep in the pool,
247 * even if they are idle.
248 * @param threadFactory the factory to use when the executor
249 * creates a new thread.
250 * @param handler the handler to use when execution is blocked
251 * because the thread bounds and queue capacities are reached.
252 */
253 public ScheduledExecutor(int corePoolSize,
254 ThreadFactory threadFactory,
255 RejectedExecutionHandler handler) {
256 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
257 new DelayedWorkQueue(), threadFactory, handler);
258 }
259
260 /**
261 * Creates and executes a one-shot action that becomes enabled after
262 * the given delay.
263 * @param command the task to execute.
264 * @param delay the time from now to delay execution.
265 * @param unit the time unit of the delay parameter.
266 * @return a handle that can be used to cancel the task.
267 */
268
269 public DelayedTask schedule(Runnable command, long delay, TimeUnit unit) {
270 DelayedTask t = new DelayedTask(command, TimeUnit.nanoTime() + unit.toNanos(delay));
271 super.execute(t);
272 return t;
273 }
274
275 /**
276 * Creates and executes a one-shot action that becomes enabled after the given date.
277 * @param command the task to execute.
278 * @param date the time to commence excution.
279 * @return a handle that can be used to cancel the task.
280 * @throws RejectedExecutionException if task cannot be scheduled
281 * for execution because the executor has been shut down.
282 */
283 public DelayedTask schedule(Runnable command, Date date) {
284 DelayedTask t = new DelayedTask
285 (command,
286 TimeUnit.MILLISECONDS.toNanos(date.getTime() -
287 System.currentTimeMillis()));
288 super.execute(t);
289 return t;
290 }
291
292 /**
293 * Creates and executes a periodic action that becomes enabled first
294 * after the given initial delay, and subsequently with the given
295 * period; that is executions will commence after
296 * <tt>initialDelay</tt> then <tt>initialDelay+period</tt>, then
297 * <tt>initialDelay + 2 * period</tt>, and so on.
298 * @param command the task to execute.
299 * @param initialDelay the time to delay first execution.
300 * @param period the period between successive executions.
301 * @param unit the time unit of the delay and period parameters
302 * @return a handle that can be used to cancel the task.
303 * @throws RejectedExecutionException if task cannot be scheduled
304 * for execution because the executor has been shut down.
305 */
306 public DelayedTask scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
307 DelayedTask t = new DelayedTask
308 (command, TimeUnit.nanoTime() + unit.toNanos(initialDelay),
309 unit.toNanos(period), true);
310 super.execute(t);
311 return t;
312 }
313
314 /**
315 * Creates a periodic action that becomes enabled first after the
316 * given date, and subsequently with the given period
317 * period; that is executions will commence after
318 * <tt>initialDate</tt> then <tt>initialDate+period</tt>, then
319 * <tt>initialDate + 2 * period</tt>, and so on.
320 * @param command the task to execute.
321 * @param initialDate the time to delay first execution.
322 * @param period the period between commencement of successive
323 * executions.
324 * @param unit the time unit of the period parameter.
325 * @return a handle that can be used to cancel the task.
326 * @throws RejectedExecutionException if task cannot be scheduled
327 * for execution because the executor has been shut down.
328 */
329 public DelayedTask scheduleAtFixedRate(Runnable command, Date initialDate, long period, TimeUnit unit) {
330 DelayedTask t = new DelayedTask
331 (command,
332 TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
333 System.currentTimeMillis()),
334 unit.toNanos(period), true);
335 super.execute(t);
336 return t;
337 }
338
339 /**
340 * Creates and executes a periodic action that becomes enabled first
341 * after the given initial delay, and and subsequently with the
342 * given delay between the termination of one execution and the
343 * commencement of the next.
344 * @param command the task to execute.
345 * @param initialDelay the time to delay first execution.
346 * @param delay the delay between the termination of one
347 * execution and the commencement of the next.
348 * @param unit the time unit of the delay and delay parameters
349 * @return a handle that can be used to cancel the task.
350 * @throws RejectedExecutionException if task cannot be scheduled
351 * for execution because the executor has been shut down.
352 */
353 public DelayedTask scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
354 DelayedTask t = new DelayedTask
355 (command, TimeUnit.nanoTime() + unit.toNanos(initialDelay),
356 unit.toNanos(delay), false);
357 super.execute(t);
358 return t;
359 }
360
361 /**
362 * Creates a periodic action that becomes enabled first after the
363 * given date, and subsequently with the given delay between
364 * the termination of one execution and the commencement of the
365 * next.
366 * @param command the task to execute.
367 * @param initialDate the time to delay first execution.
368 * @param delay the delay between the termination of one
369 * execution and the commencement of the next.
370 * @param unit the time unit of the delay parameter.
371 * @return a handle that can be used to cancel the task.
372 * @throws RejectedExecutionException if task cannot be scheduled
373 * for execution because the executor has been shut down.
374 */
375 public DelayedTask scheduleWithFixedDelay(Runnable command, Date initialDate, long delay, TimeUnit unit) {
376 DelayedTask t = new DelayedTask
377 (command,
378 TimeUnit.MILLISECONDS.toNanos(initialDate.getTime() -
379 System.currentTimeMillis()),
380 unit.toNanos(delay), false);
381 super.execute(t);
382 return t;
383 }
384
385
386 /**
387 * Creates and executes a Future that becomes enabled after the
388 * given delay.
389 * @param callable the function to execute.
390 * @param delay the time from now to delay execution.
391 * @param unit the time unit of the delay parameter.
392 * @return a Future that can be used to extract result or cancel.
393 * @throws RejectedExecutionException if task cannot be scheduled
394 * for execution because the executor has been shut down.
395 */
396 public <V> DelayedFutureTask<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
397 DelayedFutureTask<V> t = new DelayedFutureTask<V>
398 (callable, delay, unit);
399 super.execute(t);
400 return t;
401 }
402
403 /**
404 * Creates and executes a one-shot action that becomes enabled after
405 * the given date.
406 * @param callable the function to execute.
407 * @param date the time to commence excution.
408 * @return a Future that can be used to extract result or cancel.
409 * @throws RejectedExecutionException if task cannot be scheduled
410 * for execution because the executor has been shut down.
411 */
412 public <V> DelayedFutureTask<V> schedule(Callable<V> callable, Date date) {
413 DelayedFutureTask<V> t = new DelayedFutureTask<V>
414 (callable, date);
415 super.execute(t);
416 return t;
417 }
418
419 /**
420 * Execute command with zero required delay
421 * @param command the task to execute
422 * @throws RejectedExecutionException at discretion of
423 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
424 * for execution because the executor has been shut down.
425 */
426 public void execute(Runnable command) {
427 schedule(command, 0, TimeUnit.NANOSECONDS);
428 }
429
430 /**
431 * If executed task was periodic, cause the task for the next
432 * period to execute.
433 */
434 protected void afterExecute(Runnable r, Throwable t) {
435 if (isShutdown())
436 return;
437 super.afterExecute(r, t);
438 DelayedTask d = (DelayedTask)r;
439 DelayedTask next = d.nextTask();
440 if (next == null)
441 return;
442 try {
443 super.execute(next);
444 }
445 catch(RejectedExecutionException ex) {
446 // lost race to detect shutdown; ignore
447 }
448 }
449 }
450
451