ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.35
Committed: Tue Feb 7 20:54:24 2006 UTC (18 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.34: +0 -1 lines
Log Message:
6378729: Remove workaround for 6280605

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * A {@link ThreadPoolExecutor} that can additionally schedule
13 * commands to run after a given delay, or to execute
14 * periodically. This class is preferable to {@link java.util.Timer}
15 * when multiple worker threads are needed, or when the additional
16 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
17 * this class extends) are required.
18 *
19 * <p> Delayed tasks execute no sooner than they are enabled, but
20 * without any real-time guarantees about when, after they are
21 * enabled, they will commence. Tasks scheduled for exactly the same
22 * execution time are enabled in first-in-first-out (FIFO) order of
23 * submission.
24 *
25 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
26 * of the inherited tuning methods are not useful for it. In
27 * particular, because it acts as a fixed-sized pool using
28 * <tt>corePoolSize</tt> threads and an unbounded queue, adjustments
29 * to <tt>maximumPoolSize</tt> have no useful effect.
30 *
31 * <p><b>Extension notes:</b> This class overrides {@link
32 * AbstractExecutorService} <tt>submit</tt> methods to generate
33 * internal objects to control per-task delays and scheduling. To
34 * preserve functionality, any further overrides of these methods in
35 * subclasses must invoke superclass versions, which effectively
36 * disables additional task customization. However, this class
37 * provides alternative protected extension method
38 * <tt>decorateTask</tt> (one version each for <tt>Runnable</tt> and
39 * <tt>Callable</tt>) that can be used to customize the concrete task
40 * types used to execute commands entered via <tt>execute</tt>,
41 * <tt>submit</tt>, <tt>schedule</tt>, <tt>scheduleAtFixedRate</tt>,
42 * and <tt>scheduleWithFixedDelay</tt>. By default, a
43 * <tt>ScheduledThreadPoolExecutor</tt> uses a task type extending
44 * {@link FutureTask}. However, this may be modified or replaced using
45 * subclasses of the form:
46 *
47 * <pre>
48 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
49 *
50 * static class CustomTask&lt;V&gt; implements RunnableScheduledFuture&lt;V&gt; { ... }
51 *
52 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
53 * Runnable r, RunnableScheduledFuture&lt;V&gt; task) {
54 * return new CustomTask&lt;V&gt;(r, task);
55 * }
56 *
57 * protected &lt;V&gt; RunnableScheduledFuture&lt;V&gt; decorateTask(
58 * Callable&lt;V&gt; c, RunnableScheduledFuture&lt;V&gt; task) {
59 * return new CustomTask&lt;V&gt;(c, task);
60 * }
61 * // ... add constructors, etc.
62 * }
63 * </pre>
64 * @since 1.5
65 * @author Doug Lea
66 */
67 public class ScheduledThreadPoolExecutor
68 extends ThreadPoolExecutor
69 implements ScheduledExecutorService {
70
71 /**
72 * False if should cancel/suppress periodic tasks on shutdown.
73 */
74 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
75
76 /**
77 * False if should cancel non-periodic tasks on shutdown.
78 */
79 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
80
81 /**
82 * Sequence number to break scheduling ties, and in turn to
83 * guarantee FIFO order among tied entries.
84 */
85 private static final AtomicLong sequencer = new AtomicLong(0);
86
87 /** Base of nanosecond timings, to avoid wrapping */
88 private static final long NANO_ORIGIN = System.nanoTime();
89
90 /**
91 * Returns nanosecond time offset by origin
92 */
93 final long now() {
94 return System.nanoTime() - NANO_ORIGIN;
95 }
96
97 private class ScheduledFutureTask<V>
98 extends FutureTask<V> implements RunnableScheduledFuture<V> {
99
100 /** Sequence number to break ties FIFO */
101 private final long sequenceNumber;
102 /** The time the task is enabled to execute in nanoTime units */
103 private long time;
104 /**
105 * Period in nanoseconds for repeating tasks. A positive
106 * value indicates fixed-rate execution. A negative value
107 * indicates fixed-delay execution. A value of 0 indicates a
108 * non-repeating task.
109 */
110 private final long period;
111
112 /**
113 * Creates a one-shot action with given nanoTime-based trigger time.
114 */
115 ScheduledFutureTask(Runnable r, V result, long ns) {
116 super(r, result);
117 this.time = ns;
118 this.period = 0;
119 this.sequenceNumber = sequencer.getAndIncrement();
120 }
121
122 /**
123 * Creates a periodic action with given nano time and period.
124 */
125 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
126 super(r, result);
127 this.time = ns;
128 this.period = period;
129 this.sequenceNumber = sequencer.getAndIncrement();
130 }
131
132 /**
133 * Creates a one-shot action with given nanoTime-based trigger.
134 */
135 ScheduledFutureTask(Callable<V> callable, long ns) {
136 super(callable);
137 this.time = ns;
138 this.period = 0;
139 this.sequenceNumber = sequencer.getAndIncrement();
140 }
141
142 public long getDelay(TimeUnit unit) {
143 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
144 return d;
145 }
146
147 public int compareTo(Delayed other) {
148 if (other == this) // compare zero ONLY if same object
149 return 0;
150 if (other instanceof ScheduledFutureTask) {
151 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
152 long diff = time - x.time;
153 if (diff < 0)
154 return -1;
155 else if (diff > 0)
156 return 1;
157 else if (sequenceNumber < x.sequenceNumber)
158 return -1;
159 else
160 return 1;
161 }
162 long d = (getDelay(TimeUnit.NANOSECONDS) -
163 other.getDelay(TimeUnit.NANOSECONDS));
164 return (d == 0)? 0 : ((d < 0)? -1 : 1);
165 }
166
167 /**
168 * Returns true if this is a periodic (not a one-shot) action.
169 *
170 * @return true if periodic
171 */
172 public boolean isPeriodic() {
173 return period != 0;
174 }
175
176 /**
177 * Runs a periodic task.
178 */
179 private void runPeriodic() {
180 boolean ok = ScheduledFutureTask.super.runAndReset();
181 boolean down = isShutdown();
182 // Reschedule if not cancelled and not shutdown or policy allows
183 if (ok && (!down ||
184 (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
185 !isTerminating()))) {
186 long p = period;
187 if (p > 0)
188 time += p;
189 else
190 time = now() - p;
191 ScheduledThreadPoolExecutor.super.getQueue().add(this);
192 }
193 // This might have been the final executed delayed
194 // task. Wake up threads to check.
195 else if (down)
196 interruptIdleWorkers();
197 }
198
199 /**
200 * Overrides FutureTask version so as to reset/requeue if periodic.
201 */
202 public void run() {
203 if (isPeriodic())
204 runPeriodic();
205 else
206 ScheduledFutureTask.super.run();
207 }
208 }
209
210 /**
211 * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
212 */
213 private void delayedExecute(Runnable command) {
214 if (isShutdown()) {
215 reject(command);
216 return;
217 }
218 // Prestart a thread if necessary. We cannot prestart it
219 // running the task because the task (probably) shouldn't be
220 // run yet, so thread will just idle until delay elapses.
221 if (getPoolSize() < getCorePoolSize())
222 prestartCoreThread();
223
224 super.getQueue().add(command);
225 }
226
227 /**
228 * Cancels and clears the queue of all tasks that should not be run
229 * due to shutdown policy.
230 */
231 private void cancelUnwantedTasks() {
232 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
233 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
234 if (!keepDelayed && !keepPeriodic)
235 super.getQueue().clear();
236 else if (keepDelayed || keepPeriodic) {
237 Object[] entries = super.getQueue().toArray();
238 for (int i = 0; i < entries.length; ++i) {
239 Object e = entries[i];
240 if (e instanceof RunnableScheduledFuture) {
241 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
242 if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
243 t.cancel(false);
244 }
245 }
246 entries = null;
247 purge();
248 }
249 }
250
251 public boolean remove(Runnable task) {
252 if (!(task instanceof RunnableScheduledFuture))
253 return false;
254 return getQueue().remove(task);
255 }
256
257 /**
258 * Modifies or replaces the task used to execute a runnable.
259 * This method can be used to override the concrete
260 * class used for managing internal tasks.
261 * The default implementation simply returns the given task.
262 *
263 * @param runnable the submitted Runnable
264 * @param task the task created to execute the runnable
265 * @return a task that can execute the runnable
266 * @since 1.6
267 */
268 protected <V> RunnableScheduledFuture<V> decorateTask(
269 Runnable runnable, RunnableScheduledFuture<V> task) {
270 return task;
271 }
272
273 /**
274 * Modifies or replaces the task used to execute a callable.
275 * This method can be used to override the concrete
276 * class used for managing internal tasks.
277 * The default implementation simply returns the given task.
278 *
279 * @param callable the submitted Callable
280 * @param task the task created to execute the callable
281 * @return a task that can execute the callable
282 * @since 1.6
283 */
284 protected <V> RunnableScheduledFuture<V> decorateTask(
285 Callable<V> callable, RunnableScheduledFuture<V> task) {
286 return task;
287 }
288
289 /**
290 * Creates a new ScheduledThreadPoolExecutor with the given core
291 * pool size.
292 *
293 * @param corePoolSize the number of threads to keep in the pool,
294 * even if they are idle
295 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
296 */
297 public ScheduledThreadPoolExecutor(int corePoolSize) {
298 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
299 new DelayedWorkQueue());
300 }
301
302 /**
303 * Creates a new ScheduledThreadPoolExecutor with the given
304 * initial parameters.
305 *
306 * @param corePoolSize the number of threads to keep in the pool,
307 * even if they are idle
308 * @param threadFactory the factory to use when the executor
309 * creates a new thread
310 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
311 * @throws NullPointerException if threadFactory is null
312 */
313 public ScheduledThreadPoolExecutor(int corePoolSize,
314 ThreadFactory threadFactory) {
315 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
316 new DelayedWorkQueue(), threadFactory);
317 }
318
319 /**
320 * Creates a new ScheduledThreadPoolExecutor with the given
321 * initial parameters.
322 *
323 * @param corePoolSize the number of threads to keep in the pool,
324 * even if they are idle
325 * @param handler the handler to use when execution is blocked
326 * because the thread bounds and queue capacities are reached
327 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
328 * @throws NullPointerException if handler is null
329 */
330 public ScheduledThreadPoolExecutor(int corePoolSize,
331 RejectedExecutionHandler handler) {
332 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
333 new DelayedWorkQueue(), handler);
334 }
335
336 /**
337 * Creates a new ScheduledThreadPoolExecutor with the given
338 * initial parameters.
339 *
340 * @param corePoolSize the number of threads to keep in the pool,
341 * even if they are idle
342 * @param threadFactory the factory to use when the executor
343 * creates a new thread
344 * @param handler the handler to use when execution is blocked
345 * because the thread bounds and queue capacities are reached.
346 * @throws IllegalArgumentException if <tt>corePoolSize &lt;= 0</tt>
347 * @throws NullPointerException if threadFactory or handler is null
348 */
349 public ScheduledThreadPoolExecutor(int corePoolSize,
350 ThreadFactory threadFactory,
351 RejectedExecutionHandler handler) {
352 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
353 new DelayedWorkQueue(), threadFactory, handler);
354 }
355
356 public ScheduledFuture<?> schedule(Runnable command,
357 long delay,
358 TimeUnit unit) {
359 if (command == null || unit == null)
360 throw new NullPointerException();
361 long triggerTime = now() + unit.toNanos(delay);
362 RunnableScheduledFuture<?> t = decorateTask(command,
363 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
364 delayedExecute(t);
365 return t;
366 }
367
368 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
369 long delay,
370 TimeUnit unit) {
371 if (callable == null || unit == null)
372 throw new NullPointerException();
373 if (delay < 0) delay = 0;
374 long triggerTime = now() + unit.toNanos(delay);
375 RunnableScheduledFuture<V> t = decorateTask(callable,
376 new ScheduledFutureTask<V>(callable, triggerTime));
377 delayedExecute(t);
378 return t;
379 }
380
381 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
382 long initialDelay,
383 long period,
384 TimeUnit unit) {
385 if (command == null || unit == null)
386 throw new NullPointerException();
387 if (period <= 0)
388 throw new IllegalArgumentException();
389 if (initialDelay < 0) initialDelay = 0;
390 long triggerTime = now() + unit.toNanos(initialDelay);
391 RunnableScheduledFuture<?> t = decorateTask(command,
392 new ScheduledFutureTask<Object>(command,
393 null,
394 triggerTime,
395 unit.toNanos(period)));
396 delayedExecute(t);
397 return t;
398 }
399
400 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
401 long initialDelay,
402 long delay,
403 TimeUnit unit) {
404 if (command == null || unit == null)
405 throw new NullPointerException();
406 if (delay <= 0)
407 throw new IllegalArgumentException();
408 if (initialDelay < 0) initialDelay = 0;
409 long triggerTime = now() + unit.toNanos(initialDelay);
410 RunnableScheduledFuture<?> t = decorateTask(command,
411 new ScheduledFutureTask<Boolean>(command,
412 null,
413 triggerTime,
414 unit.toNanos(-delay)));
415 delayedExecute(t);
416 return t;
417 }
418
419
420 /**
421 * Executes command with zero required delay. This has effect
422 * equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note
423 * that inspections of the queue and of the list returned by
424 * <tt>shutdownNow</tt> will access the zero-delayed
425 * {@link ScheduledFuture}, not the <tt>command</tt> itself.
426 *
427 * @param command the task to execute
428 * @throws RejectedExecutionException at discretion of
429 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted
430 * for execution because the executor has been shut down.
431 * @throws NullPointerException if command is null
432 */
433 public void execute(Runnable command) {
434 if (command == null)
435 throw new NullPointerException();
436 schedule(command, 0, TimeUnit.NANOSECONDS);
437 }
438
439 // Override AbstractExecutorService methods
440
441 public Future<?> submit(Runnable task) {
442 return schedule(task, 0, TimeUnit.NANOSECONDS);
443 }
444
445 public <T> Future<T> submit(Runnable task, T result) {
446 return schedule(Executors.callable(task, result),
447 0, TimeUnit.NANOSECONDS);
448 }
449
450 public <T> Future<T> submit(Callable<T> task) {
451 return schedule(task, 0, TimeUnit.NANOSECONDS);
452 }
453
454 /**
455 * Sets the policy on whether to continue executing existing periodic
456 * tasks even when this executor has been <tt>shutdown</tt>. In
457 * this case, these tasks will only terminate upon
458 * <tt>shutdownNow</tt>, or after setting the policy to
459 * <tt>false</tt> when already shutdown. This value is by default
460 * false.
461 *
462 * @param value if true, continue after shutdown, else don't.
463 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
464 */
465 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
466 continueExistingPeriodicTasksAfterShutdown = value;
467 if (!value && isShutdown())
468 cancelUnwantedTasks();
469 }
470
471 /**
472 * Gets the policy on whether to continue executing existing
473 * periodic tasks even when this executor has been
474 * <tt>shutdown</tt>. In this case, these tasks will only
475 * terminate upon <tt>shutdownNow</tt> or after setting the policy
476 * to <tt>false</tt> when already shutdown. This value is by
477 * default false.
478 *
479 * @return true if will continue after shutdown
480 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
481 */
482 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
483 return continueExistingPeriodicTasksAfterShutdown;
484 }
485
486 /**
487 * Sets the policy on whether to execute existing delayed
488 * tasks even when this executor has been <tt>shutdown</tt>. In
489 * this case, these tasks will only terminate upon
490 * <tt>shutdownNow</tt>, or after setting the policy to
491 * <tt>false</tt> when already shutdown. This value is by default
492 * true.
493 *
494 * @param value if true, execute after shutdown, else don't.
495 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
496 */
497 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
498 executeExistingDelayedTasksAfterShutdown = value;
499 if (!value && isShutdown())
500 cancelUnwantedTasks();
501 }
502
503 /**
504 * Gets the policy on whether to execute existing delayed
505 * tasks even when this executor has been <tt>shutdown</tt>. In
506 * this case, these tasks will only terminate upon
507 * <tt>shutdownNow</tt>, or after setting the policy to
508 * <tt>false</tt> when already shutdown. This value is by default
509 * true.
510 *
511 * @return true if will execute after shutdown
512 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
513 */
514 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
515 return executeExistingDelayedTasksAfterShutdown;
516 }
517
518
519 /**
520 * Initiates an orderly shutdown in which previously submitted
521 * tasks are executed, but no new tasks will be accepted. If the
522 * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
523 * been set <tt>false</tt>, existing delayed tasks whose delays
524 * have not yet elapsed are cancelled. And unless the
525 * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
526 * been set <tt>true</tt>, future executions of existing periodic
527 * tasks will be cancelled.
528 */
529 public void shutdown() {
530 cancelUnwantedTasks();
531 super.shutdown();
532 }
533
534 /**
535 * Attempts to stop all actively executing tasks, halts the
536 * processing of waiting tasks, and returns a list of the tasks
537 * that were awaiting execution.
538 *
539 * <p>There are no guarantees beyond best-effort attempts to stop
540 * processing actively executing tasks. This implementation
541 * cancels tasks via {@link Thread#interrupt}, so any task that
542 * fails to respond to interrupts may never terminate.
543 *
544 * @return list of tasks that never commenced execution. Each
545 * element of this list is a {@link ScheduledFuture},
546 * including those tasks submitted using <tt>execute</tt>, which
547 * are for scheduling purposes used as the basis of a zero-delay
548 * <tt>ScheduledFuture</tt>.
549 * @throws SecurityException {@inheritDoc}
550 */
551 public List<Runnable> shutdownNow() {
552 return super.shutdownNow();
553 }
554
555 /**
556 * Returns the task queue used by this executor. Each element of
557 * this queue is a {@link ScheduledFuture}, including those
558 * tasks submitted using <tt>execute</tt> which are for scheduling
559 * purposes used as the basis of a zero-delay
560 * <tt>ScheduledFuture</tt>. Iteration over this queue is
561 * <em>not</em> guaranteed to traverse tasks in the order in
562 * which they will execute.
563 *
564 * @return the task queue
565 */
566 public BlockingQueue<Runnable> getQueue() {
567 return super.getQueue();
568 }
569
570 /**
571 * An annoying wrapper class to convince javac to use a
572 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
573 */
574 private static class DelayedWorkQueue
575 extends AbstractCollection<Runnable>
576 implements BlockingQueue<Runnable> {
577
578 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
579 public Runnable poll() { return dq.poll(); }
580 public Runnable peek() { return dq.peek(); }
581 public Runnable take() throws InterruptedException { return dq.take(); }
582 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
583 return dq.poll(timeout, unit);
584 }
585
586 public boolean add(Runnable x) {
587 return dq.add((RunnableScheduledFuture)x);
588 }
589 public boolean offer(Runnable x) {
590 return dq.offer((RunnableScheduledFuture)x);
591 }
592 public void put(Runnable x) {
593 dq.put((RunnableScheduledFuture)x);
594 }
595 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
596 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
597 }
598
599 public Runnable remove() { return dq.remove(); }
600 public Runnable element() { return dq.element(); }
601 public void clear() { dq.clear(); }
602 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
603 public int drainTo(Collection<? super Runnable> c, int maxElements) {
604 return dq.drainTo(c, maxElements);
605 }
606
607 public int remainingCapacity() { return dq.remainingCapacity(); }
608 public boolean remove(Object x) { return dq.remove(x); }
609 public boolean contains(Object x) { return dq.contains(x); }
610 public int size() { return dq.size(); }
611 public boolean isEmpty() { return dq.isEmpty(); }
612 public Object[] toArray() { return dq.toArray(); }
613 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
614 public Iterator<Runnable> iterator() {
615 return new Iterator<Runnable>() {
616 private Iterator<RunnableScheduledFuture> it = dq.iterator();
617 public boolean hasNext() { return it.hasNext(); }
618 public Runnable next() { return it.next(); }
619 public void remove() { it.remove(); }
620 };
621 }
622 }
623 }