ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.31
Committed: Sat Aug 27 22:44:54 2005 UTC (18 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.30: +3 -2 lines
Log Message:
un-masking docstrings

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.*; // for javadoc (till 6280605 is fixed)
9 import java.util.concurrent.atomic.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
27 * of the inherited tuning methods are not useful for it. In
28 * particular, because it acts as a fixed-sized pool using
29 * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
30 * to <tt>maximumPoolSize</tt> have no useful effect.
31 *
32 * <p>This class supports protected extension method
33 * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
34 * <tt>Callable</tt>) that can be used to customize the concrete task
35 * types used to execute commands. By default,
36 * a <tt>ScheduledThreadPoolExecutor</tt> uses
37 * a task type extending {@link FutureTask}. However, this may
38 * be modified or replaced using subclasses of the form:
39 * <pre>
40 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
41 *
42 * static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
43 *
44 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
45 * Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
46 * return new CustomTask&lt;V&gt;(r, task);
47 * }
48 *
49 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
50 * Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
51 * return new CustomTask&lt;V&gt;(c, task);
52 * }
53 * // ... add constructors, etc.
54 * }
55 * </pre>
56 * @since 1.5
57 * @author Doug Lea
58 */
59 public class ScheduledThreadPoolExecutor
60 extends ThreadPoolExecutor
61 implements ScheduledExecutorService {
62
63 /**
64 * False if should cancel/suppress periodic tasks on shutdown.
65 */
66 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
67
68 /**
69 * False if should cancel non-periodic tasks on shutdown.
70 */
71 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
72
73 /**
74 * Sequence number to break scheduling ties, and in turn to
75 * guarantee FIFO order among tied entries.
76 */
77 private static final AtomicLong sequencer = new AtomicLong(0);
78
79 /** Base of nanosecond timings, to avoid wrapping */
80 private static final long NANO_ORIGIN = System.nanoTime();
81
82 /**
83 * Returns nanosecond time offset by origin
84 */
85 final long now() {
86 return System.nanoTime() - NANO_ORIGIN;
87 }
88
89 private class ScheduledFutureTask<V>
90 extends FutureTask<V> implements RunnableScheduledFuture<V> {
91
92 /** Sequence number to break ties FIFO */
93 private final long sequenceNumber;
94 /** The time the task is enabled to execute in nanoTime units */
95 private long time;
96 /**
97 * Period in nanoseconds for repeating tasks. A positive
98 * value indicates fixed-rate execution. A negative value
99 * indicates fixed-delay execution. A value of 0 indicates a
100 * non-repeating task.
101 */
102 private final long period;
103
104 /**
105 * Creates a one-shot action with given nanoTime-based trigger time.
106 */
107 ScheduledFutureTask(Runnable r, V result, long ns) {
108 super(r, result);
109 this.time = ns;
110 this.period = 0;
111 this.sequenceNumber = sequencer.getAndIncrement();
112 }
113
114 /**
115 * Creates a periodic action with given nano time and period.
116 */
117 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
118 super(r, result);
119 this.time = ns;
120 this.period = period;
121 this.sequenceNumber = sequencer.getAndIncrement();
122 }
123
124 /**
125 * Creates a one-shot action with given nanoTime-based trigger.
126 */
127 ScheduledFutureTask(Callable<V> callable, long ns) {
128 super(callable);
129 this.time = ns;
130 this.period = 0;
131 this.sequenceNumber = sequencer.getAndIncrement();
132 }
133
134 public long getDelay(TimeUnit unit) {
135 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
136 return d;
137 }
138
139 public int compareTo(Delayed other) {
140 if (other == this) // compare zero ONLY if same object
141 return 0;
142 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
143 long diff = time - x.time;
144 if (diff < 0)
145 return -1;
146 else if (diff > 0)
147 return 1;
148 else if (sequenceNumber < x.sequenceNumber)
149 return -1;
150 else
151 return 1;
152 }
153
154 /**
155 * Returns true if this is a periodic (not a one-shot) action.
156 *
157 * @return true if periodic
158 */
159 public boolean isPeriodic() {
160 return period != 0;
161 }
162
163 /**
164 * Runs a periodic task.
165 */
166 private void runPeriodic() {
167 boolean ok = ScheduledFutureTask.super.runAndReset();
168 boolean down = isShutdown();
169 // Reschedule if not cancelled and not shutdown or policy allows
170 if (ok && (!down ||
171 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
172 !isTerminating()))) {
173 long p = period;
174 if (p > 0)
175 time += p;
176 else
177 time = now() - p;
178 ScheduledThreadPoolExecutor.super.getQueue().add(this);
179 }
180 // This might have been the final executed delayed
181 // task. Wake up threads to check.
182 else if (down)
183 interruptIdleWorkers();
184 }
185
186 /**
187 * Overrides FutureTask version so as to reset/requeue if periodic.
188 */
189 public void run() {
190 if (isPeriodic())
191 runPeriodic();
192 else
193 ScheduledFutureTask.super.run();
194 }
195 }
196
197 /**
198 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
199 */
200 private void delayedExecute(Runnable command) {
201 if (isShutdown()) {
202 reject(command);
203 return;
204 }
205 // Prestart a thread if necessary. We cannot prestart it
206 // running the task because the task (probably) shouldn't be
207 // run yet, so thread will just idle until delay elapses.
208 if (getPoolSize() < getCorePoolSize())
209 prestartCoreThread();
210
211 super.getQueue().add(command);
212 }
213
214 /**
215 * Cancels and clears the queue of all tasks that should not be run
216 * due to shutdown policy.
217 */
218 private void cancelUnwantedTasks() {
219 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
220 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
221 if (!keepDelayed && !keepPeriodic)
222 super.getQueue().clear();
223 else if (keepDelayed || keepPeriodic) {
224 Object[] entries = super.getQueue().toArray();
225 for (int i = 0; i < entries.length; ++i) {
226 Object e = entries[i];
227 if (e instanceof RunnableScheduledFuture) {
228 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
229 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
230 t.cancel(false);
231 }
232 }
233 entries = null;
234 purge();
235 }
236 }
237
238 public boolean remove(Runnable task) {
239 if (!(task instanceof RunnableScheduledFuture))
240 return false;
241 return getQueue().remove(task);
242 }
243
244 /**
245 * Modifies or replaces the task used to execute a runnable.
246 * This method can be used to override the concrete
247 * class used for managing internal tasks.
248 * The default implementation simply returns the given task.
249 *
250 * @param runnable the submitted Runnable
251 * @param task the task created to execute the runnable
252 * @return a task that can execute the runnable
253 * @since 1.6
254 */
255 protected <V> RunnableScheduledFuture<V> decorateTask(
256 Runnable runnable, RunnableScheduledFuture<V> task) {
257 return task;
258 }
259
260 /**
261 * Modifies or replaces the task used to execute a callable.
262 * This method can be used to override the concrete
263 * class used for managing internal tasks.
264 * The default implementation simply returns the given task.
265 *
266 * @param callable the submitted Callable
267 * @param task the task created to execute the callable
268 * @return a task that can execute the callable
269 * @since 1.6
270 */
271 protected <V> RunnableScheduledFuture<V> decorateTask(
272 Callable<V> callable, RunnableScheduledFuture<V> task) {
273 return task;
274 }
275
276 /**
277 * Creates a new ScheduledThreadPoolExecutor with the given core
278 * pool size.
279 *
280 * @param corePoolSize the number of threads to keep in the pool,
281 * even if they are idle
282 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
283 */
284 public ScheduledThreadPoolExecutor(int corePoolSize) {
285 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
286 new DelayedWorkQueue());
287 }
288
289 /**
290 * Creates a new ScheduledThreadPoolExecutor with the given
291 * initial parameters.
292 *
293 * @param corePoolSize the number of threads to keep in the pool,
294 * even if they are idle
295 * @param threadFactory the factory to use when the executor
296 * creates a new thread
297 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
298 * @throws NullPointerException if threadFactory is null
299 */
300 public ScheduledThreadPoolExecutor(int corePoolSize,
301 ThreadFactory threadFactory) {
302 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
303 new DelayedWorkQueue(), threadFactory);
304 }
305
306 /**
307 * Creates a new ScheduledThreadPoolExecutor with the given
308 * initial parameters.
309 *
310 * @param corePoolSize the number of threads to keep in the pool,
311 * even if they are idle
312 * @param handler the handler to use when execution is blocked
313 * because the thread bounds and queue capacities are reached
314 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
315 * @throws NullPointerException if handler is null
316 */
317 public ScheduledThreadPoolExecutor(int corePoolSize,
318 RejectedExecutionHandler handler) {
319 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
320 new DelayedWorkQueue(), handler);
321 }
322
323 /**
324 * Creates a new ScheduledThreadPoolExecutor with the given
325 * initial parameters.
326 *
327 * @param corePoolSize the number of threads to keep in the pool,
328 * even if they are idle
329 * @param threadFactory the factory to use when the executor
330 * creates a new thread
331 * @param handler the handler to use when execution is blocked
332 * because the thread bounds and queue capacities are reached.
333 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
334 * @throws NullPointerException if threadFactory or handler is null
335 */
336 public ScheduledThreadPoolExecutor(int corePoolSize,
337 ThreadFactory threadFactory,
338 RejectedExecutionHandler handler) {
339 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
340 new DelayedWorkQueue(), threadFactory, handler);
341 }
342
343 public ScheduledFuture<?> schedule(Runnable command,
344 long delay,
345 TimeUnit unit) {
346 if (command == null || unit == null)
347 throw new NullPointerException();
348 long triggerTime = now() + unit.toNanos(delay);
349 RunnableScheduledFuture<?> t = decorateTask(command,
350 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
351 delayedExecute(t);
352 return t;
353 }
354
355 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
356 long delay,
357 TimeUnit unit) {
358 if (callable == null || unit == null)
359 throw new NullPointerException();
360 if (delay < 0) delay = 0;
361 long triggerTime = now() + unit.toNanos(delay);
362 RunnableScheduledFuture<V> t = decorateTask(callable,
363 new ScheduledFutureTask<V>(callable, triggerTime));
364 delayedExecute(t);
365 return t;
366 }
367
368 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
369 long initialDelay,
370 long period,
371 TimeUnit unit) {
372 if (command == null || unit == null)
373 throw new NullPointerException();
374 if (period <= 0)
375 throw new IllegalArgumentException();
376 if (initialDelay < 0) initialDelay = 0;
377 long triggerTime = now() + unit.toNanos(initialDelay);
378 RunnableScheduledFuture<?> t = decorateTask(command,
379 new ScheduledFutureTask<Object>(command,
380 null,
381 triggerTime,
382 unit.toNanos(period)));
383 delayedExecute(t);
384 return t;
385 }
386
387 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
388 long initialDelay,
389 long delay,
390 TimeUnit unit) {
391 if (command == null || unit == null)
392 throw new NullPointerException();
393 if (delay <= 0)
394 throw new IllegalArgumentException();
395 if (initialDelay < 0) initialDelay = 0;
396 long triggerTime = now() + unit.toNanos(initialDelay);
397 RunnableScheduledFuture<?> t = decorateTask(command,
398 new ScheduledFutureTask<Boolean>(command,
399 null,
400 triggerTime,
401 unit.toNanos(-delay)));
402 delayedExecute(t);
403 return t;
404 }
405
406
407 /**
408 * Executes command with zero required delay. This has effect
409 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
410 * that inspections of the queue and of the list returned by
411 * <tt>shutdownNow</tt> will access the zero-delayed
412 * {@link ScheduledFuture}, not the <tt>command</tt> itself.
413 *
414 * @param command the task to execute
415 * @throws RejectedExecutionException at discretion of
416 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
417 * for execution because the executor has been shut down.
418 * @throws NullPointerException if command is null
419 */
420 public void execute(Runnable command) {
421 if (command == null)
422 throw new NullPointerException();
423 schedule(command, 0, TimeUnit.NANOSECONDS);
424 }
425
426 // Override AbstractExecutorService methods
427
428 public Future<?> submit(Runnable task) {
429 return schedule(task, 0, TimeUnit.NANOSECONDS);
430 }
431
432 public <T> Future<T> submit(Runnable task, T result) {
433 return schedule(Executors.callable(task, result),
434 0, TimeUnit.NANOSECONDS);
435 }
436
437 public <T> Future<T> submit(Callable<T> task) {
438 return schedule(task, 0, TimeUnit.NANOSECONDS);
439 }
440
441 /**
442 * Sets the policy on whether to continue executing existing periodic
443 * tasks even when this executor has been <tt>shutdown</tt>. In
444 * this case, these tasks will only terminate upon
445 * <tt>shutdownNow</tt>, or after setting the policy to
446 * <tt>false</tt> when already shutdown. This value is by default
447 * false.
448 *
449 * @param value if true, continue after shutdown, else don't.
450 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
451 */
452 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
453 continueExistingPeriodicTasksAfterShutdown = value;
454 if (!value && isShutdown())
455 cancelUnwantedTasks();
456 }
457
458 /**
459 * Gets the policy on whether to continue executing existing
460 * periodic tasks even when this executor has been
461 * <tt>shutdown</tt>. In this case, these tasks will only
462 * terminate upon <tt>shutdownNow</tt> or after setting the policy
463 * to <tt>false</tt> when already shutdown. This value is by
464 * default false.
465 *
466 * @return true if will continue after shutdown
467 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
468 */
469 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
470 return continueExistingPeriodicTasksAfterShutdown;
471 }
472
473 /**
474 * Sets the policy on whether to execute existing delayed
475 * tasks even when this executor has been <tt>shutdown</tt>. In
476 * this case, these tasks will only terminate upon
477 * <tt>shutdownNow</tt>, or after setting the policy to
478 * <tt>false</tt> when already shutdown. This value is by default
479 * true.
480 *
481 * @param value if true, execute after shutdown, else don't.
482 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
483 */
484 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
485 executeExistingDelayedTasksAfterShutdown = value;
486 if (!value && isShutdown())
487 cancelUnwantedTasks();
488 }
489
490 /**
491 * Gets the policy on whether to execute existing delayed
492 * tasks even when this executor has been <tt>shutdown</tt>. In
493 * this case, these tasks will only terminate upon
494 * <tt>shutdownNow</tt>, or after setting the policy to
495 * <tt>false</tt> when already shutdown. This value is by default
496 * true.
497 *
498 * @return true if will execute after shutdown
499 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
500 */
501 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
502 return executeExistingDelayedTasksAfterShutdown;
503 }
504
505
506 /**
507 * Initiates an orderly shutdown in which previously submitted
508 * tasks are executed, but no new tasks will be accepted. If the
509 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
510 * been set <tt>false</tt>, existing delayed tasks whose delays
511 * have not yet elapsed are cancelled. And unless the
512 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
513 * been set <tt>true</tt>, future executions of existing periodic
514 * tasks will be cancelled.
515 */
516 public void shutdown() {
517 cancelUnwantedTasks();
518 super.shutdown();
519 }
520
521 /**
522 * Attempts to stop all actively executing tasks, halts the
523 * processing of waiting tasks, and returns a list of the tasks
524 * that were awaiting execution.
525 *
526 * <p>There are no guarantees beyond best-effort attempts to stop
527 * processing actively executing tasks. This implementation
528 * cancels tasks via {@link Thread#interrupt}, so any task that
529 * fails to respond to interrupts may never terminate.
530 *
531 * @return list of tasks that never commenced execution. Each
532 * element of this list is a {@link ScheduledFuture},
533 * including those tasks submitted using <tt>execute</tt>, which
534 * are for scheduling purposes used as the basis of a zero-delay
535 * <tt>ScheduledFuture</tt>.
536 * @throws SecurityException {@inheritDoc}
537 */
538 public List<Runnable> shutdownNow() {
539 return super.shutdownNow();
540 }
541
542 /**
543 * Returns the task queue used by this executor. Each element of
544 * this queue is a {@link ScheduledFuture}, including those
545 * tasks submitted using <tt>execute</tt> which are for scheduling
546 * purposes used as the basis of a zero-delay
547 * <tt>ScheduledFuture</tt>. Iteration over this queue is
548 * <em>not</em> guaranteed to traverse tasks in the order in
549 * which they will execute.
550 *
551 * @return the task queue
552 */
553 public BlockingQueue<Runnable> getQueue() {
554 return super.getQueue();
555 }
556
557 /**
558 * An annoying wrapper class to convince javac to use a
559 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
560 */
561 private static class DelayedWorkQueue
562 extends AbstractCollection<Runnable>
563 implements BlockingQueue<Runnable> {
564
565 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
566 public Runnable poll() { return dq.poll(); }
567 public Runnable peek() { return dq.peek(); }
568 public Runnable take() throws InterruptedException { return dq.take(); }
569 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
570 return dq.poll(timeout, unit);
571 }
572
573 public boolean add(Runnable x) {
574 return dq.add((RunnableScheduledFuture)x);
575 }
576 public boolean offer(Runnable x) {
577 return dq.offer((RunnableScheduledFuture)x);
578 }
579 public void put(Runnable x) {
580 dq.put((RunnableScheduledFuture)x);
581 }
582 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
583 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
584 }
585
586 public Runnable remove() { return dq.remove(); }
587 public Runnable element() { return dq.element(); }
588 public void clear() { dq.clear(); }
589 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
590 public int drainTo(Collection<? super Runnable> c, int maxElements) {
591 return dq.drainTo(c, maxElements);
592 }
593
594 public int remainingCapacity() { return dq.remainingCapacity(); }
595 public boolean remove(Object x) { return dq.remove(x); }
596 public boolean contains(Object x) { return dq.contains(x); }
597 public int size() { return dq.size(); }
598 public boolean isEmpty() { return dq.isEmpty(); }
599 public Object[] toArray() { return dq.toArray(); }
600 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
601 public Iterator<Runnable> iterator() {
602 return new Iterator<Runnable>() {
603 private Iterator<RunnableScheduledFuture> it = dq.iterator();
604 public boolean hasNext() { return it.hasNext(); }
605 public Runnable next() { return it.next(); }
606 public void remove() { it.remove(); }
607 };
608 }
609 }
610 }