ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.34
Committed: Tue Dec 13 00:13:36 2005 UTC (18 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.33: +15 -10 lines
Log Message:
Make compareTo cope with arbitrary Delayed's as arguments

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
9 import java.util.concurrent.atomic.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27 * of the inherited tuning methods are not useful for it. In
28 * particular, because it acts as a fixed-sized pool using
29 * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30 * to <tt>maximumPoolSize</tt> have no useful effect.
31 *
32 * <p><b>Extension notes:</b> This class overrides {@link
33 * AbstractExecutorService} <tt>submit</tt> methods to generate
34 * internal objects to control per-task delays and scheduling. To
35 * preserve functionality, any further overrides of these methods in
36 * subclasses must invoke superclass versions, which effectively
37 * disables additional task customization. However, this class
38 * provides alternative protected extension method
39 * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
40 * <tt>Callable</tt>) that can be used to customize the concrete task
41 * types used to execute commands entered via <tt>execute</tt>,
42 * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>,
43 * and <tt>scheduleWithFixedDelay</tt>. By default, a
44 * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending
45 * {@link FutureTask}. However, this may be modified or replaced using
46 * subclasses of the form:
47 *
48 * <pre>
49 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
50 *
51 * static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
52 *
53 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
54 * Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
55 * return new CustomTask&lt;V&gt;(r, task);
56 * }
57 *
58 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
59 * Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
60 * return new CustomTask&lt;V&gt;(c, task);
61 * }
62 * // ... add constructors, etc.
63 * }
64 * </pre>
65 * @since 1.5
66 * @author Doug Lea
67 */
68 public class ScheduledThreadPoolExecutor
69 extends ThreadPoolExecutor
70 implements ScheduledExecutorService {
71
72 /**
73 * False if should cancel/suppress periodic tasks on shutdown.
74 */
75 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
76
77 /**
78 * False if should cancel non-periodic tasks on shutdown.
79 */
80 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
81
82 /**
83 * Sequence number to break scheduling ties, and in turn to
84 * guarantee FIFO order among tied entries.
85 */
86 private static final AtomicLong sequencer = new AtomicLong(0);
87
88 /** Base of nanosecond timings, to avoid wrapping */
89 private static final long NANO_ORIGIN = System.nanoTime();
90
91 /**
92 * Returns nanosecond time offset by origin
93 */
94 final long now() {
95 return System.nanoTime() - NANO_ORIGIN;
96 }
97
98 private class ScheduledFutureTask<V>
99 extends FutureTask<V> implements RunnableScheduledFuture<V> {
100
101 /** Sequence number to break ties FIFO */
102 private final long sequenceNumber;
103 /** The time the task is enabled to execute in nanoTime units */
104 private long time;
105 /**
106 * Period in nanoseconds for repeating tasks. A positive
107 * value indicates fixed-rate execution. A negative value
108 * indicates fixed-delay execution. A value of 0 indicates a
109 * non-repeating task.
110 */
111 private final long period;
112
113 /**
114 * Creates a one-shot action with given nanoTime-based trigger time.
115 */
116 ScheduledFutureTask(Runnable r, V result, long ns) {
117 super(r, result);
118 this.time = ns;
119 this.period = 0;
120 this.sequenceNumber = sequencer.getAndIncrement();
121 }
122
123 /**
124 * Creates a periodic action with given nano time and period.
125 */
126 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
127 super(r, result);
128 this.time = ns;
129 this.period = period;
130 this.sequenceNumber = sequencer.getAndIncrement();
131 }
132
133 /**
134 * Creates a one-shot action with given nanoTime-based trigger.
135 */
136 ScheduledFutureTask(Callable<V> callable, long ns) {
137 super(callable);
138 this.time = ns;
139 this.period = 0;
140 this.sequenceNumber = sequencer.getAndIncrement();
141 }
142
143 public long getDelay(TimeUnit unit) {
144 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
145 return d;
146 }
147
148 public int compareTo(Delayed other) {
149 if (other == this) // compare zero ONLY if same object
150 return 0;
151 if (other instanceof ScheduledFutureTask) {
152 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
153 long diff = time - x.time;
154 if (diff < 0)
155 return -1;
156 else if (diff > 0)
157 return 1;
158 else if (sequenceNumber < x.sequenceNumber)
159 return -1;
160 else
161 return 1;
162 }
163 long d = (getDelay(TimeUnit.NANOSECONDS) -
164 other.getDelay(TimeUnit.NANOSECONDS));
165 return (d == 0)? 0 : ((d < 0)? -1 : 1);
166 }
167
168 /**
169 * Returns true if this is a periodic (not a one-shot) action.
170 *
171 * @return true if periodic
172 */
173 public boolean isPeriodic() {
174 return period != 0;
175 }
176
177 /**
178 * Runs a periodic task.
179 */
180 private void runPeriodic() {
181 boolean ok = ScheduledFutureTask.super.runAndReset();
182 boolean down = isShutdown();
183 // Reschedule if not cancelled and not shutdown or policy allows
184 if (ok && (!down ||
185 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
186 !isTerminating()))) {
187 long p = period;
188 if (p > 0)
189 time += p;
190 else
191 time = now() - p;
192 ScheduledThreadPoolExecutor.super.getQueue().add(this);
193 }
194 // This might have been the final executed delayed
195 // task. Wake up threads to check.
196 else if (down)
197 interruptIdleWorkers();
198 }
199
200 /**
201 * Overrides FutureTask version so as to reset/requeue if periodic.
202 */
203 public void run() {
204 if (isPeriodic())
205 runPeriodic();
206 else
207 ScheduledFutureTask.super.run();
208 }
209 }
210
211 /**
212 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
213 */
214 private void delayedExecute(Runnable command) {
215 if (isShutdown()) {
216 reject(command);
217 return;
218 }
219 // Prestart a thread if necessary. We cannot prestart it
220 // running the task because the task (probably) shouldn't be
221 // run yet, so thread will just idle until delay elapses.
222 if (getPoolSize() < getCorePoolSize())
223 prestartCoreThread();
224
225 super.getQueue().add(command);
226 }
227
228 /**
229 * Cancels and clears the queue of all tasks that should not be run
230 * due to shutdown policy.
231 */
232 private void cancelUnwantedTasks() {
233 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
234 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
235 if (!keepDelayed && !keepPeriodic)
236 super.getQueue().clear();
237 else if (keepDelayed || keepPeriodic) {
238 Object[] entries = super.getQueue().toArray();
239 for (int i = 0; i < entries.length; ++i) {
240 Object e = entries[i];
241 if (e instanceof RunnableScheduledFuture) {
242 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
243 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
244 t.cancel(false);
245 }
246 }
247 entries = null;
248 purge();
249 }
250 }
251
252 public boolean remove(Runnable task) {
253 if (!(task instanceof RunnableScheduledFuture))
254 return false;
255 return getQueue().remove(task);
256 }
257
258 /**
259 * Modifies or replaces the task used to execute a runnable.
260 * This method can be used to override the concrete
261 * class used for managing internal tasks.
262 * The default implementation simply returns the given task.
263 *
264 * @param runnable the submitted Runnable
265 * @param task the task created to execute the runnable
266 * @return a task that can execute the runnable
267 * @since 1.6
268 */
269 protected <V> RunnableScheduledFuture<V> decorateTask(
270 Runnable runnable, RunnableScheduledFuture<V> task) {
271 return task;
272 }
273
274 /**
275 * Modifies or replaces the task used to execute a callable.
276 * This method can be used to override the concrete
277 * class used for managing internal tasks.
278 * The default implementation simply returns the given task.
279 *
280 * @param callable the submitted Callable
281 * @param task the task created to execute the callable
282 * @return a task that can execute the callable
283 * @since 1.6
284 */
285 protected <V> RunnableScheduledFuture<V> decorateTask(
286 Callable<V> callable, RunnableScheduledFuture<V> task) {
287 return task;
288 }
289
290 /**
291 * Creates a new ScheduledThreadPoolExecutor with the given core
292 * pool size.
293 *
294 * @param corePoolSize the number of threads to keep in the pool,
295 * even if they are idle
296 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
297 */
298 public ScheduledThreadPoolExecutor(int corePoolSize) {
299 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
300 new DelayedWorkQueue());
301 }
302
303 /**
304 * Creates a new ScheduledThreadPoolExecutor with the given
305 * initial parameters.
306 *
307 * @param corePoolSize the number of threads to keep in the pool,
308 * even if they are idle
309 * @param threadFactory the factory to use when the executor
310 * creates a new thread
311 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
312 * @throws NullPointerException if threadFactory is null
313 */
314 public ScheduledThreadPoolExecutor(int corePoolSize,
315 ThreadFactory threadFactory) {
316 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
317 new DelayedWorkQueue(), threadFactory);
318 }
319
320 /**
321 * Creates a new ScheduledThreadPoolExecutor with the given
322 * initial parameters.
323 *
324 * @param corePoolSize the number of threads to keep in the pool,
325 * even if they are idle
326 * @param handler the handler to use when execution is blocked
327 * because the thread bounds and queue capacities are reached
328 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
329 * @throws NullPointerException if handler is null
330 */
331 public ScheduledThreadPoolExecutor(int corePoolSize,
332 RejectedExecutionHandler handler) {
333 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
334 new DelayedWorkQueue(), handler);
335 }
336
337 /**
338 * Creates a new ScheduledThreadPoolExecutor with the given
339 * initial parameters.
340 *
341 * @param corePoolSize the number of threads to keep in the pool,
342 * even if they are idle
343 * @param threadFactory the factory to use when the executor
344 * creates a new thread
345 * @param handler the handler to use when execution is blocked
346 * because the thread bounds and queue capacities are reached.
347 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
348 * @throws NullPointerException if threadFactory or handler is null
349 */
350 public ScheduledThreadPoolExecutor(int corePoolSize,
351 ThreadFactory threadFactory,
352 RejectedExecutionHandler handler) {
353 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
354 new DelayedWorkQueue(), threadFactory, handler);
355 }
356
357 public ScheduledFuture<?> schedule(Runnable command,
358 long delay,
359 TimeUnit unit) {
360 if (command == null || unit == null)
361 throw new NullPointerException();
362 long triggerTime = now() + unit.toNanos(delay);
363 RunnableScheduledFuture<?> t = decorateTask(command,
364 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
365 delayedExecute(t);
366 return t;
367 }
368
369 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
370 long delay,
371 TimeUnit unit) {
372 if (callable == null || unit == null)
373 throw new NullPointerException();
374 if (delay < 0) delay = 0;
375 long triggerTime = now() + unit.toNanos(delay);
376 RunnableScheduledFuture<V> t = decorateTask(callable,
377 new ScheduledFutureTask<V>(callable, triggerTime));
378 delayedExecute(t);
379 return t;
380 }
381
382 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
383 long initialDelay,
384 long period,
385 TimeUnit unit) {
386 if (command == null || unit == null)
387 throw new NullPointerException();
388 if (period <= 0)
389 throw new IllegalArgumentException();
390 if (initialDelay < 0) initialDelay = 0;
391 long triggerTime = now() + unit.toNanos(initialDelay);
392 RunnableScheduledFuture<?> t = decorateTask(command,
393 new ScheduledFutureTask<Object>(command,
394 null,
395 triggerTime,
396 unit.toNanos(period)));
397 delayedExecute(t);
398 return t;
399 }
400
401 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
402 long initialDelay,
403 long delay,
404 TimeUnit unit) {
405 if (command == null || unit == null)
406 throw new NullPointerException();
407 if (delay <= 0)
408 throw new IllegalArgumentException();
409 if (initialDelay < 0) initialDelay = 0;
410 long triggerTime = now() + unit.toNanos(initialDelay);
411 RunnableScheduledFuture<?> t = decorateTask(command,
412 new ScheduledFutureTask<Boolean>(command,
413 null,
414 triggerTime,
415 unit.toNanos(-delay)));
416 delayedExecute(t);
417 return t;
418 }
419
420
421 /**
422 * Executes command with zero required delay. This has effect
423 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
424 * that inspections of the queue and of the list returned by
425 * <tt>shutdownNow</tt> will access the zero-delayed
426 * {@link ScheduledFuture}, not the <tt>command</tt> itself.
427 *
428 * @param command the task to execute
429 * @throws RejectedExecutionException at discretion of
430 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
431 * for execution because the executor has been shut down.
432 * @throws NullPointerException if command is null
433 */
434 public void execute(Runnable command) {
435 if (command == null)
436 throw new NullPointerException();
437 schedule(command, 0, TimeUnit.NANOSECONDS);
438 }
439
440 // Override AbstractExecutorService methods
441
442 public Future<?> submit(Runnable task) {
443 return schedule(task, 0, TimeUnit.NANOSECONDS);
444 }
445
446 public <T> Future<T> submit(Runnable task, T result) {
447 return schedule(Executors.callable(task, result),
448 0, TimeUnit.NANOSECONDS);
449 }
450
451 public <T> Future<T> submit(Callable<T> task) {
452 return schedule(task, 0, TimeUnit.NANOSECONDS);
453 }
454
455 /**
456 * Sets the policy on whether to continue executing existing periodic
457 * tasks even when this executor has been <tt>shutdown</tt>. In
458 * this case, these tasks will only terminate upon
459 * <tt>shutdownNow</tt>, or after setting the policy to
460 * <tt>false</tt> when already shutdown. This value is by default
461 * false.
462 *
463 * @param value if true, continue after shutdown, else don't.
464 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
465 */
466 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
467 continueExistingPeriodicTasksAfterShutdown = value;
468 if (!value && isShutdown())
469 cancelUnwantedTasks();
470 }
471
472 /**
473 * Gets the policy on whether to continue executing existing
474 * periodic tasks even when this executor has been
475 * <tt>shutdown</tt>. In this case, these tasks will only
476 * terminate upon <tt>shutdownNow</tt> or after setting the policy
477 * to <tt>false</tt> when already shutdown. This value is by
478 * default false.
479 *
480 * @return true if will continue after shutdown
481 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
482 */
483 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
484 return continueExistingPeriodicTasksAfterShutdown;
485 }
486
487 /**
488 * Sets the policy on whether to execute existing delayed
489 * tasks even when this executor has been <tt>shutdown</tt>. In
490 * this case, these tasks will only terminate upon
491 * <tt>shutdownNow</tt>, or after setting the policy to
492 * <tt>false</tt> when already shutdown. This value is by default
493 * true.
494 *
495 * @param value if true, execute after shutdown, else don't.
496 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
497 */
498 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
499 executeExistingDelayedTasksAfterShutdown = value;
500 if (!value && isShutdown())
501 cancelUnwantedTasks();
502 }
503
504 /**
505 * Gets the policy on whether to execute existing delayed
506 * tasks even when this executor has been <tt>shutdown</tt>. In
507 * this case, these tasks will only terminate upon
508 * <tt>shutdownNow</tt>, or after setting the policy to
509 * <tt>false</tt> when already shutdown. This value is by default
510 * true.
511 *
512 * @return true if will execute after shutdown
513 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
514 */
515 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
516 return executeExistingDelayedTasksAfterShutdown;
517 }
518
519
520 /**
521 * Initiates an orderly shutdown in which previously submitted
522 * tasks are executed, but no new tasks will be accepted. If the
523 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
524 * been set <tt>false</tt>, existing delayed tasks whose delays
525 * have not yet elapsed are cancelled. And unless the
526 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
527 * been set <tt>true</tt>, future executions of existing periodic
528 * tasks will be cancelled.
529 */
530 public void shutdown() {
531 cancelUnwantedTasks();
532 super.shutdown();
533 }
534
535 /**
536 * Attempts to stop all actively executing tasks, halts the
537 * processing of waiting tasks, and returns a list of the tasks
538 * that were awaiting execution.
539 *
540 * <p>There are no guarantees beyond best-effort attempts to stop
541 * processing actively executing tasks. This implementation
542 * cancels tasks via {@link Thread#interrupt}, so any task that
543 * fails to respond to interrupts may never terminate.
544 *
545 * @return list of tasks that never commenced execution. Each
546 * element of this list is a {@link ScheduledFuture},
547 * including those tasks submitted using <tt>execute</tt>, which
548 * are for scheduling purposes used as the basis of a zero-delay
549 * <tt>ScheduledFuture</tt>.
550 * @throws SecurityException {@inheritDoc}
551 */
552 public List<Runnable> shutdownNow() {
553 return super.shutdownNow();
554 }
555
556 /**
557 * Returns the task queue used by this executor. Each element of
558 * this queue is a {@link ScheduledFuture}, including those
559 * tasks submitted using <tt>execute</tt> which are for scheduling
560 * purposes used as the basis of a zero-delay
561 * <tt>ScheduledFuture</tt>. Iteration over this queue is
562 * <em>not</em> guaranteed to traverse tasks in the order in
563 * which they will execute.
564 *
565 * @return the task queue
566 */
567 public BlockingQueue<Runnable> getQueue() {
568 return super.getQueue();
569 }
570
571 /**
572 * An annoying wrapper class to convince javac to use a
573 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
574 */
575 private static class DelayedWorkQueue
576 extends AbstractCollection<Runnable>
577 implements BlockingQueue<Runnable> {
578
579 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
580 public Runnable poll() { return dq.poll(); }
581 public Runnable peek() { return dq.peek(); }
582 public Runnable take() throws InterruptedException { return dq.take(); }
583 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
584 return dq.poll(timeout, unit);
585 }
586
587 public boolean add(Runnable x) {
588 return dq.add((RunnableScheduledFuture)x);
589 }
590 public boolean offer(Runnable x) {
591 return dq.offer((RunnableScheduledFuture)x);
592 }
593 public void put(Runnable x) {
594 dq.put((RunnableScheduledFuture)x);
595 }
596 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
597 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
598 }
599
600 public Runnable remove() { return dq.remove(); }
601 public Runnable element() { return dq.element(); }
602 public void clear() { dq.clear(); }
603 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
604 public int drainTo(Collection<? super Runnable> c, int maxElements) {
605 return dq.drainTo(c, maxElements);
606 }
607
608 public int remainingCapacity() { return dq.remainingCapacity(); }
609 public boolean remove(Object x) { return dq.remove(x); }
610 public boolean contains(Object x) { return dq.contains(x); }
611 public int size() { return dq.size(); }
612 public boolean isEmpty() { return dq.isEmpty(); }
613 public Object[] toArray() { return dq.toArray(); }
614 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
615 public Iterator<Runnable> iterator() {
616 return new Iterator<Runnable>() {
617 private Iterator<RunnableScheduledFuture> it = dq.iterator();
618 public boolean hasNext() { return it.hasNext(); }
619 public Runnable next() { return it.next(); }
620 public void remove() { it.remove(); }
621 };
622 }
623 }
624 }