ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.41
Committed: Tue Feb 20 00:11:20 2007 UTC (17 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.40: +40 -6 lines
Log Message:
Added setRemoveOnCancelPolicy

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission. If {@link #setRemoveOnCancelPolicy} is set {@code true}
25 * cancelled tasks are automatically removed from the work queue.
26 *
27 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28 * of the inherited tuning methods are not useful for it. In
29 * particular, because it acts as a fixed-sized pool using
30 * {@code corePoolSize} threads and an unbounded queue, adjustments
31 * to {@code maximumPoolSize} have no useful effect. Additionally, it
32 * is almost never a good idea to set {@code corePoolSize} to zero or
33 * use {@code allowCoreThreadTimeOut} because this may leave the pool
34 * without threads to handle tasks once they become eligible to run.
35 *
36 * <p><b>Extension notes:</b> This class overrides the
37 * {@link ThreadPoolExecutor#execute execute} and
38 * {@link AbstractExecutorService#submit(Runnable) submit}
39 * methods to generate internal {@link ScheduledFuture} objects to
40 * control per-task delays and scheduling. To preserve
41 * functionality, any further overrides of these methods in
42 * subclasses must invoke superclass versions, which effectively
43 * disables additional task customization. However, this class
44 * provides alternative protected extension method
45 * {@code decorateTask} (one version each for {@code Runnable} and
46 * {@code Callable}) that can be used to customize the concrete task
47 * types used to execute commands entered via {@code execute},
48 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
49 * and {@code scheduleWithFixedDelay}. By default, a
50 * {@code ScheduledThreadPoolExecutor} uses a task type extending
51 * {@link FutureTask}. However, this may be modified or replaced using
52 * subclasses of the form:
53 *
54 * <pre> {@code
55 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
56 *
57 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
58 *
59 * protected <V> RunnableScheduledFuture<V> decorateTask(
60 * Runnable r, RunnableScheduledFuture<V> task) {
61 * return new CustomTask<V>(r, task);
62 * }
63 *
64 * protected <V> RunnableScheduledFuture<V> decorateTask(
65 * Callable<V> c, RunnableScheduledFuture<V> task) {
66 * return new CustomTask<V>(c, task);
67 * }
68 * // ... add constructors, etc.
69 * }}</pre>
70 *
71 * @since 1.5
72 * @author Doug Lea
73 */
74 public class ScheduledThreadPoolExecutor
75 extends ThreadPoolExecutor
76 implements ScheduledExecutorService {
77
78 /*
79 * This class specializes ThreadPoolExecutor implementation by
80 *
81 * 1. Using a custom task type, ScheduledFutureTask for
82 * tasks, even those that don't require scheduling (i.e.,
83 * those submitted using ExecutorService execute, not
84 * ScheduledExecutorService methods) which are treated as
85 * delayed tasks with a delay of zero.
86 *
87 * 2. Using a custom queue (DelayedWorkQueue) based on an
88 * unbounded DelayQueue. The lack of capacity constraint and
89 * the fact that corePoolSize and maximumPoolSize are
90 * effectively identical simplifies some execution mechanics
91 * (see delayedExecute) compared to ThreadPoolExecutor
92 * version.
93 *
94 * The DelayedWorkQueue class is defined below for the sake of
95 * ensuring that all elements are instances of
96 * RunnableScheduledFuture. Since DelayQueue otherwise
97 * requires type be Delayed, but not necessarily Runnable, and
98 * the workQueue requires the opposite, we need to explicitly
99 * define a class that requires both to ensure that users don't
100 * add objects that aren't RunnableScheduledFutures via
101 * getQueue().add() etc.
102 *
103 * 3. Supporting optional run-after-shutdown parameters, which
104 * leads to overrides of shutdown methods to remove and cancel
105 * tasks that should NOT be run after shutdown, as well as
106 * different recheck logic when task (re)submission overlaps
107 * with a shutdown.
108 *
109 * 4. Task decoration methods to allow interception and
110 * instrumentation, which are needed because subclasses cannot
111 * otherwise override submit methods to get this effect. These
112 * don't have any impact on pool control logic though.
113 */
114
115 /**
116 * False if should cancel/suppress periodic tasks on shutdown.
117 */
118 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
119
120 /**
121 * False if should cancel non-periodic tasks on shutdown.
122 */
123 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
124
125 /**
126 * True if ScheduledFutureTask.cancel should remove from queue
127 */
128 private volatile boolean removeOnCancel = false;
129
130 /**
131 * Sequence number to break scheduling ties, and in turn to
132 * guarantee FIFO order among tied entries.
133 */
134 private static final AtomicLong sequencer = new AtomicLong(0);
135
136 /**
137 * Returns current nanosecond time.
138 */
139 final long now() {
140 return System.nanoTime();
141 }
142
143 private class ScheduledFutureTask<V>
144 extends FutureTask<V> implements RunnableScheduledFuture<V> {
145
146 /** Sequence number to break ties FIFO */
147 private final long sequenceNumber;
148 /** The time the task is enabled to execute in nanoTime units */
149 private long time;
150 /**
151 * Period in nanoseconds for repeating tasks. A positive
152 * value indicates fixed-rate execution. A negative value
153 * indicates fixed-delay execution. A value of 0 indicates a
154 * non-repeating task.
155 */
156 private final long period;
157
158 /**
159 * Index into delay queue, to support faster cancellation.
160 */
161 int heapIndex;
162
163 /**
164 * Creates a one-shot action with given nanoTime-based trigger time.
165 */
166 ScheduledFutureTask(Runnable r, V result, long ns) {
167 super(r, result);
168 this.time = ns;
169 this.period = 0;
170 this.sequenceNumber = sequencer.getAndIncrement();
171 }
172
173 /**
174 * Creates a periodic action with given nano time and period.
175 */
176 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
177 super(r, result);
178 this.time = ns;
179 this.period = period;
180 this.sequenceNumber = sequencer.getAndIncrement();
181 }
182
183 /**
184 * Creates a one-shot action with given nanoTime-based trigger.
185 */
186 ScheduledFutureTask(Callable<V> callable, long ns) {
187 super(callable);
188 this.time = ns;
189 this.period = 0;
190 this.sequenceNumber = sequencer.getAndIncrement();
191 }
192
193 public long getDelay(TimeUnit unit) {
194 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
195 return d;
196 }
197
198 public int compareTo(Delayed other) {
199 if (other == this) // compare zero ONLY if same object
200 return 0;
201 if (other instanceof ScheduledFutureTask) {
202 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
203 long diff = time - x.time;
204 if (diff < 0)
205 return -1;
206 else if (diff > 0)
207 return 1;
208 else if (sequenceNumber < x.sequenceNumber)
209 return -1;
210 else
211 return 1;
212 }
213 long d = (getDelay(TimeUnit.NANOSECONDS) -
214 other.getDelay(TimeUnit.NANOSECONDS));
215 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
216 }
217
218 /**
219 * Returns true if this is a periodic (not a one-shot) action.
220 *
221 * @return true if periodic
222 */
223 public boolean isPeriodic() {
224 return period != 0;
225 }
226
227 /**
228 * Sets the next time to run for a periodic task.
229 */
230 private void setNextRunTime() {
231 long p = period;
232 if (p > 0)
233 time += p;
234 else
235 time = now() - p;
236 }
237
238 public boolean cancel(boolean mayInterruptIfRunning) {
239 boolean cancelled = super.cancel(mayInterruptIfRunning);
240 if (cancelled && removeOnCancel && heapIndex >= 0)
241 remove(this);
242 return cancelled;
243 }
244
245 /**
246 * Overrides FutureTask version so as to reset/requeue if periodic.
247 */
248 public void run() {
249 boolean periodic = isPeriodic();
250 if (!canRunInCurrentRunState(periodic))
251 cancel(false);
252 else if (!periodic)
253 ScheduledFutureTask.super.run();
254 else if (ScheduledFutureTask.super.runAndReset()) {
255 setNextRunTime();
256 reExecutePeriodic(this);
257 }
258 }
259 }
260
261 /**
262 * Returns true if can run a task given current run state
263 * and run-after-shutdown parameters.
264 *
265 * @param periodic true if this task periodic, false if delayed
266 */
267 boolean canRunInCurrentRunState(boolean periodic) {
268 return isRunningOrShutdown(periodic ?
269 continueExistingPeriodicTasksAfterShutdown :
270 executeExistingDelayedTasksAfterShutdown);
271 }
272
273 /**
274 * Main execution method for delayed or periodic tasks. If pool
275 * is shut down, rejects the task. Otherwise adds task to queue
276 * and starts a thread, if necessary, to run it. (We cannot
277 * prestart the thread to run the task because the task (probably)
278 * shouldn't be run yet,) If the pool is shut down while the task
279 * is being added, cancel and remove it if required by state and
280 * run-after-shutdown parameters.
281 *
282 * @param task the task
283 */
284 private void delayedExecute(RunnableScheduledFuture<?> task) {
285 if (isShutdown())
286 reject(task);
287 else {
288 super.getQueue().add(task);
289 if (isShutdown() &&
290 !canRunInCurrentRunState(task.isPeriodic()) &&
291 remove(task))
292 task.cancel(false);
293 else
294 prestartCoreThread();
295 }
296 }
297
298 /**
299 * Requeues a periodic task unless current run state precludes it.
300 * Same idea as delayedExecute except drops task rather than rejecting.
301 *
302 * @param task the task
303 */
304 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
305 if (canRunInCurrentRunState(true)) {
306 super.getQueue().add(task);
307 if (!canRunInCurrentRunState(true) && remove(task))
308 task.cancel(false);
309 else
310 prestartCoreThread();
311 }
312 }
313
314 /**
315 * Cancels and clears the queue of all tasks that should not be run
316 * due to shutdown policy. Invoked within super.shutdown.
317 */
318 @Override void onShutdown() {
319 BlockingQueue<Runnable> q = super.getQueue();
320 boolean keepDelayed =
321 getExecuteExistingDelayedTasksAfterShutdownPolicy();
322 boolean keepPeriodic =
323 getContinueExistingPeriodicTasksAfterShutdownPolicy();
324 if (!keepDelayed && !keepPeriodic)
325 q.clear();
326 else {
327 // Traverse snapshot to avoid iterator exceptions
328 for (Object e : q.toArray()) {
329 if (e instanceof RunnableScheduledFuture) {
330 RunnableScheduledFuture<?> t =
331 (RunnableScheduledFuture<?>)e;
332 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
333 t.isCancelled()) { // also remove if already cancelled
334 if (q.remove(t))
335 t.cancel(false);
336 }
337 }
338 }
339 }
340 tryTerminate();
341 }
342
343 /**
344 * Modifies or replaces the task used to execute a runnable.
345 * This method can be used to override the concrete
346 * class used for managing internal tasks.
347 * The default implementation simply returns the given task.
348 *
349 * @param runnable the submitted Runnable
350 * @param task the task created to execute the runnable
351 * @return a task that can execute the runnable
352 * @since 1.6
353 */
354 protected <V> RunnableScheduledFuture<V> decorateTask(
355 Runnable runnable, RunnableScheduledFuture<V> task) {
356 return task;
357 }
358
359 /**
360 * Modifies or replaces the task used to execute a callable.
361 * This method can be used to override the concrete
362 * class used for managing internal tasks.
363 * The default implementation simply returns the given task.
364 *
365 * @param callable the submitted Callable
366 * @param task the task created to execute the callable
367 * @return a task that can execute the callable
368 * @since 1.6
369 */
370 protected <V> RunnableScheduledFuture<V> decorateTask(
371 Callable<V> callable, RunnableScheduledFuture<V> task) {
372 return task;
373 }
374
375 /**
376 * Creates a new {@code ScheduledThreadPoolExecutor} with the
377 * given core pool size.
378 *
379 * @param corePoolSize the number of threads to keep in the pool, even
380 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
381 * @throws IllegalArgumentException if {@code corePoolSize < 0}
382 */
383 public ScheduledThreadPoolExecutor(int corePoolSize) {
384 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
385 new DelayedWorkQueue());
386 }
387
388 /**
389 * Creates a new {@code ScheduledThreadPoolExecutor} with the
390 * given initial parameters.
391 *
392 * @param corePoolSize the number of threads to keep in the pool, even
393 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
394 * @param threadFactory the factory to use when the executor
395 * creates a new thread
396 * @throws IllegalArgumentException if {@code corePoolSize < 0}
397 * @throws NullPointerException if {@code threadFactory} is null
398 */
399 public ScheduledThreadPoolExecutor(int corePoolSize,
400 ThreadFactory threadFactory) {
401 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
402 new DelayedWorkQueue(), threadFactory);
403 }
404
405 /**
406 * Creates a new ScheduledThreadPoolExecutor with the given
407 * initial parameters.
408 *
409 * @param corePoolSize the number of threads to keep in the pool, even
410 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
411 * @param handler the handler to use when execution is blocked
412 * because the thread bounds and queue capacities are reached
413 * @throws IllegalArgumentException if {@code corePoolSize < 0}
414 * @throws NullPointerException if {@code handler} is null
415 */
416 public ScheduledThreadPoolExecutor(int corePoolSize,
417 RejectedExecutionHandler handler) {
418 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
419 new DelayedWorkQueue(), handler);
420 }
421
422 /**
423 * Creates a new ScheduledThreadPoolExecutor with the given
424 * initial parameters.
425 *
426 * @param corePoolSize the number of threads to keep in the pool, even
427 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
428 * @param threadFactory the factory to use when the executor
429 * creates a new thread
430 * @param handler the handler to use when execution is blocked
431 * because the thread bounds and queue capacities are reached
432 * @throws IllegalArgumentException if {@code corePoolSize < 0}
433 * @throws NullPointerException if {@code threadFactory} or
434 * {@code handler} is null
435 */
436 public ScheduledThreadPoolExecutor(int corePoolSize,
437 ThreadFactory threadFactory,
438 RejectedExecutionHandler handler) {
439 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
440 new DelayedWorkQueue(), threadFactory, handler);
441 }
442
443 /**
444 * @throws RejectedExecutionException {@inheritDoc}
445 * @throws NullPointerException {@inheritDoc}
446 */
447 public ScheduledFuture<?> schedule(Runnable command,
448 long delay,
449 TimeUnit unit) {
450 if (command == null || unit == null)
451 throw new NullPointerException();
452 if (delay < 0) delay = 0;
453 long triggerTime = now() + unit.toNanos(delay);
454 RunnableScheduledFuture<?> t = decorateTask(command,
455 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
456 delayedExecute(t);
457 return t;
458 }
459
460 /**
461 * @throws RejectedExecutionException {@inheritDoc}
462 * @throws NullPointerException {@inheritDoc}
463 */
464 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
465 long delay,
466 TimeUnit unit) {
467 if (callable == null || unit == null)
468 throw new NullPointerException();
469 if (delay < 0) delay = 0;
470 long triggerTime = now() + unit.toNanos(delay);
471 RunnableScheduledFuture<V> t = decorateTask(callable,
472 new ScheduledFutureTask<V>(callable, triggerTime));
473 delayedExecute(t);
474 return t;
475 }
476
477 /**
478 * @throws RejectedExecutionException {@inheritDoc}
479 * @throws NullPointerException {@inheritDoc}
480 * @throws IllegalArgumentException {@inheritDoc}
481 */
482 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
483 long initialDelay,
484 long period,
485 TimeUnit unit) {
486 if (command == null || unit == null)
487 throw new NullPointerException();
488 if (period <= 0)
489 throw new IllegalArgumentException();
490 if (initialDelay < 0) initialDelay = 0;
491 long triggerTime = now() + unit.toNanos(initialDelay);
492 RunnableScheduledFuture<?> t = decorateTask(command,
493 new ScheduledFutureTask<Object>(command,
494 null,
495 triggerTime,
496 unit.toNanos(period)));
497 delayedExecute(t);
498 return t;
499 }
500
501 /**
502 * @throws RejectedExecutionException {@inheritDoc}
503 * @throws NullPointerException {@inheritDoc}
504 * @throws IllegalArgumentException {@inheritDoc}
505 */
506 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
507 long initialDelay,
508 long delay,
509 TimeUnit unit) {
510 if (command == null || unit == null)
511 throw new NullPointerException();
512 if (delay <= 0)
513 throw new IllegalArgumentException();
514 if (initialDelay < 0) initialDelay = 0;
515 long triggerTime = now() + unit.toNanos(initialDelay);
516 RunnableScheduledFuture<?> t = decorateTask(command,
517 new ScheduledFutureTask<Boolean>(command,
518 null,
519 triggerTime,
520 unit.toNanos(-delay)));
521 delayedExecute(t);
522 return t;
523 }
524
525 /**
526 * Executes {@code command} with zero required delay.
527 * This has effect equivalent to
528 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
529 * Note that inspections of the queue and of the list returned by
530 * {@code shutdownNow} will access the zero-delayed
531 * {@link ScheduledFuture}, not the {@code command} itself.
532 *
533 * <p>A consequence of the use of {@code ScheduledFuture} objects is
534 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
535 * called with a null second {@code Throwable} argument, even if the
536 * {@code command} terminated abruptly. Instead, the {@code Throwable}
537 * thrown by such a task can be obtained via {@link Future#get}.
538 *
539 * @throws RejectedExecutionException at discretion of
540 * {@code RejectedExecutionHandler}, if the task
541 * cannot be accepted for execution because the
542 * executor has been shut down
543 * @throws NullPointerException {@inheritDoc}
544 */
545 public void execute(Runnable command) {
546 schedule(command, 0, TimeUnit.NANOSECONDS);
547 }
548
549 // Override AbstractExecutorService methods
550
551 /**
552 * @throws RejectedExecutionException {@inheritDoc}
553 * @throws NullPointerException {@inheritDoc}
554 */
555 public Future<?> submit(Runnable task) {
556 return schedule(task, 0, TimeUnit.NANOSECONDS);
557 }
558
559 /**
560 * @throws RejectedExecutionException {@inheritDoc}
561 * @throws NullPointerException {@inheritDoc}
562 */
563 public <T> Future<T> submit(Runnable task, T result) {
564 return schedule(Executors.callable(task, result),
565 0, TimeUnit.NANOSECONDS);
566 }
567
568 /**
569 * @throws RejectedExecutionException {@inheritDoc}
570 * @throws NullPointerException {@inheritDoc}
571 */
572 public <T> Future<T> submit(Callable<T> task) {
573 return schedule(task, 0, TimeUnit.NANOSECONDS);
574 }
575
576 /**
577 * Sets the policy on whether to continue executing existing
578 * periodic tasks even when this executor has been {@code shutdown}.
579 * In this case, these tasks will only terminate upon
580 * {@code shutdownNow} or after setting the policy to
581 * {@code false} when already shutdown.
582 * This value is by default {@code false}.
583 *
584 * @param value if {@code true}, continue after shutdown, else don't.
585 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
586 */
587 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
588 continueExistingPeriodicTasksAfterShutdown = value;
589 if (!value && isShutdown())
590 onShutdown();
591 }
592
593 /**
594 * Gets the policy on whether to continue executing existing
595 * periodic tasks even when this executor has been {@code shutdown}.
596 * In this case, these tasks will only terminate upon
597 * {@code shutdownNow} or after setting the policy to
598 * {@code false} when already shutdown.
599 * This value is by default {@code false}.
600 *
601 * @return {@code true} if will continue after shutdown
602 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
603 */
604 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
605 return continueExistingPeriodicTasksAfterShutdown;
606 }
607
608 /**
609 * Sets the policy on whether to execute existing delayed
610 * tasks even when this executor has been {@code shutdown}.
611 * In this case, these tasks will only terminate upon
612 * {@code shutdownNow}, or after setting the policy to
613 * {@code false} when already shutdown.
614 * This value is by default {@code true}.
615 *
616 * @param value if {@code true}, execute after shutdown, else don't.
617 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
618 */
619 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
620 executeExistingDelayedTasksAfterShutdown = value;
621 if (!value && isShutdown())
622 onShutdown();
623 }
624
625 /**
626 * Gets the policy on whether to execute existing delayed
627 * tasks even when this executor has been {@code shutdown}.
628 * In this case, these tasks will only terminate upon
629 * {@code shutdownNow}, or after setting the policy to
630 * {@code false} when already shutdown.
631 * This value is by default {@code true}.
632 *
633 * @return {@code true} if will execute after shutdown
634 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
635 */
636 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
637 return executeExistingDelayedTasksAfterShutdown;
638 }
639
640 /**
641 * Sets the policy on whether to cancellation of a a task should
642 * remove it from the work queue. This value is
643 * by default {@code false}.
644 *
645 * @param value if {@code true}, remove on cancellation, else don't.
646 * @see #getRemoveOnCancelPolicy
647 */
648 public void setRemoveOnCancelPolicy(boolean value) {
649 removeOnCancel = value;
650 }
651
652 /**
653 * Gets the policy on whether to cancellation of a a task should
654 * remove it from the work queue.
655 * This value is by default {@code false}.
656 *
657 * @return {@code true} if cancelled tasks are removed from the queue.
658 * @see #setRemoveOnCancelPolicy
659 */
660 public boolean getRemoveOnCancelPolicy() {
661 return removeOnCancel;
662 }
663
664 /**
665 * Initiates an orderly shutdown in which previously submitted
666 * tasks are executed, but no new tasks will be accepted. If the
667 * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
668 * been set {@code false}, existing delayed tasks whose delays
669 * have not yet elapsed are cancelled. And unless the
670 * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
671 * been set {@code true}, future executions of existing periodic
672 * tasks will be cancelled.
673 *
674 * @throws SecurityException {@inheritDoc}
675 */
676 public void shutdown() {
677 super.shutdown();
678 }
679
680 /**
681 * Attempts to stop all actively executing tasks, halts the
682 * processing of waiting tasks, and returns a list of the tasks
683 * that were awaiting execution.
684 *
685 * <p>There are no guarantees beyond best-effort attempts to stop
686 * processing actively executing tasks. This implementation
687 * cancels tasks via {@link Thread#interrupt}, so any task that
688 * fails to respond to interrupts may never terminate.
689 *
690 * @return list of tasks that never commenced execution.
691 * Each element of this list is a {@link ScheduledFuture},
692 * including those tasks submitted using {@code execute},
693 * which are for scheduling purposes used as the basis of a
694 * zero-delay {@code ScheduledFuture}.
695 * @throws SecurityException {@inheritDoc}
696 */
697 public List<Runnable> shutdownNow() {
698 return super.shutdownNow();
699 }
700
701 /**
702 * Returns the task queue used by this executor. Each element of
703 * this queue is a {@link ScheduledFuture}, including those
704 * tasks submitted using {@code execute} which are for scheduling
705 * purposes used as the basis of a zero-delay
706 * {@code ScheduledFuture}. Iteration over this queue is
707 * <em>not</em> guaranteed to traverse tasks in the order in
708 * which they will execute.
709 *
710 * @return the task queue
711 */
712 public BlockingQueue<Runnable> getQueue() {
713 return super.getQueue();
714 }
715
716 /**
717 * Specialized delay queue. To mesh with TPE declarations, this
718 * class must be declared as a BlockingQueue<Runnable> even though
719 * it can only hold RunnableScheduledFutures
720 */
721 static class DelayedWorkQueue extends AbstractQueue<Runnable>
722 implements BlockingQueue<Runnable> {
723
724 /*
725 * A DelayedWorkQueue is based on a heap-based data structure
726 * like those in DelayQueue and PriorityQueue, except that
727 * every ScheduledFutureTask also records its index into the
728 * heap array. This eliminates the need to find a task upon
729 * cancellation, greatly speeding up removal (down from O(n)
730 * to O(log n)), and reducing garbage retention that would
731 * otherwise occur by waiting for the element to rise to top
732 * before clearing. But because the queue may also hold
733 * RunnableScheduledFutures that are not ScheduledFutureTasks,
734 * we are not guaranteed to have such indices available, in
735 * which case we fall back to linear search. (We expect that
736 * most tasks will not be decorated, and that the faster cases
737 * will be much more common.)
738 *
739 * All heap operations must record index changes -- mainly
740 * within siftUp and siftDown. Upon removal, a task's
741 * heapIndex is set to -1. Note that ScheduledFutureTasks can
742 * appear at most once in the queue (this need not be true for
743 * other kinds of tasks or work queues), so are uniquely
744 * identified by heapIndex.
745 */
746
747 private static final int INITIAL_CAPACITY = 64;
748 private transient RunnableScheduledFuture[] queue =
749 new RunnableScheduledFuture[INITIAL_CAPACITY];
750 private transient final ReentrantLock lock = new ReentrantLock();
751 private transient final Condition available = lock.newCondition();
752 private int size = 0;
753
754
755 /**
756 * Set f's heapIndex if it is a ScheduledFutureTask
757 */
758 private void setIndex(Object f, int idx) {
759 if (f instanceof ScheduledFutureTask)
760 ((ScheduledFutureTask)f).heapIndex = idx;
761 }
762
763 /**
764 * Sift element added at bottom up to its heap-ordered spot
765 * Call only when holding lock.
766 */
767 private void siftUp(int k, RunnableScheduledFuture key) {
768 while (k > 0) {
769 int parent = (k - 1) >>> 1;
770 RunnableScheduledFuture e = queue[parent];
771 if (key.compareTo(e) >= 0)
772 break;
773 queue[k] = e;
774 setIndex(e, k);
775 k = parent;
776 }
777 queue[k] = key;
778 setIndex(key, k);
779 }
780
781 /**
782 * Sift element added at top down to its heap-ordered spot
783 * Call only when holding lock.
784 */
785 private void siftDown(int k, RunnableScheduledFuture key) {
786 int half = size >>> 1;
787 while (k < half) {
788 int child = (k << 1) + 1;
789 RunnableScheduledFuture c = queue[child];
790 int right = child + 1;
791 if (right < size && c.compareTo(queue[right]) > 0)
792 c = queue[child = right];
793 if (key.compareTo(c) <= 0)
794 break;
795 queue[k] = c;
796 setIndex(c, k);
797 k = child;
798 }
799 queue[k] = key;
800 setIndex(key, k);
801 }
802
803 /**
804 * Performs common bookkeeping for poll and take: Replaces
805 * first element with last; sifts it down, and signals any
806 * waiting consumers. Call only when holding lock.
807 * @param f the task to remove and return
808 */
809 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
810 int s = --size;
811 RunnableScheduledFuture x = queue[s];
812 queue[s] = null;
813 if (s != 0) {
814 siftDown(0, x);
815 available.signalAll();
816 }
817 setIndex(f, -1);
818 return f;
819 }
820
821 /**
822 * Resize the heap array. Call only when holding lock.
823 */
824 private void grow() {
825 int oldCapacity = queue.length;
826 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
827 if (newCapacity < 0) // overflow
828 newCapacity = Integer.MAX_VALUE;
829 queue = Arrays.copyOf(queue, newCapacity);
830 }
831
832 /**
833 * Find index of given object, or -1 if absent
834 */
835 private int indexOf(Object x) {
836 if (x != null) {
837 for (int i = 0; i < size; i++)
838 if (x.equals(queue[i]))
839 return i;
840 }
841 return -1;
842 }
843
844 public boolean remove(Object x) {
845 boolean removed;
846 final ReentrantLock lock = this.lock;
847 lock.lock();
848 try {
849 int i;
850 if (x instanceof ScheduledFutureTask)
851 i = ((ScheduledFutureTask)x).heapIndex;
852 else
853 i = indexOf(x);
854 if (removed = (i >= 0 && i < size && queue[i] == x)) {
855 setIndex(x, -1);
856 int s = --size;
857 RunnableScheduledFuture replacement = queue[s];
858 queue[s] = null;
859 if (s != i) {
860 siftDown(i, replacement);
861 if (queue[i] == replacement)
862 siftUp(i, replacement);
863 }
864 }
865 } finally {
866 lock.unlock();
867 }
868 return removed;
869 }
870
871 public int size() {
872 int s;
873 final ReentrantLock lock = this.lock;
874 lock.lock();
875 try {
876 s = size;
877 } finally {
878 lock.unlock();
879 }
880 return s;
881 }
882
883 public boolean isEmpty() {
884 return size() == 0;
885 }
886
887 public int remainingCapacity() {
888 return Integer.MAX_VALUE;
889 }
890
891 public RunnableScheduledFuture peek() {
892 final ReentrantLock lock = this.lock;
893 lock.lock();
894 try {
895 return queue[0];
896 } finally {
897 lock.unlock();
898 }
899 }
900
901 public boolean offer(Runnable x) {
902 if (x == null)
903 throw new NullPointerException();
904 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
905 final ReentrantLock lock = this.lock;
906 lock.lock();
907 try {
908 int i = size;
909 if (i >= queue.length)
910 grow();
911 size = i + 1;
912 boolean notify;
913 if (i == 0) {
914 notify = true;
915 queue[0] = e;
916 setIndex(e, 0);
917 }
918 else {
919 notify = e.compareTo(queue[0]) < 0;
920 siftUp(i, e);
921 }
922 if (notify)
923 available.signalAll();
924 } finally {
925 lock.unlock();
926 }
927 return true;
928 }
929
930 public void put(Runnable e) {
931 offer(e);
932 }
933
934 public boolean add(Runnable e) {
935 return offer(e);
936 }
937
938 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
939 return offer(e);
940 }
941
942 public RunnableScheduledFuture poll() {
943 final ReentrantLock lock = this.lock;
944 lock.lock();
945 try {
946 RunnableScheduledFuture first = queue[0];
947 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
948 return null;
949 else
950 return finishPoll(first);
951 } finally {
952 lock.unlock();
953 }
954 }
955
956 public RunnableScheduledFuture take() throws InterruptedException {
957 final ReentrantLock lock = this.lock;
958 lock.lockInterruptibly();
959 try {
960 for (;;) {
961 RunnableScheduledFuture first = queue[0];
962 if (first == null)
963 available.await();
964 else {
965 long delay = first.getDelay(TimeUnit.NANOSECONDS);
966 if (delay > 0)
967 available.awaitNanos(delay);
968 else
969 return finishPoll(first);
970 }
971 }
972 } finally {
973 lock.unlock();
974 }
975 }
976
977 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
978 throws InterruptedException {
979 long nanos = unit.toNanos(timeout);
980 final ReentrantLock lock = this.lock;
981 lock.lockInterruptibly();
982 try {
983 for (;;) {
984 RunnableScheduledFuture first = queue[0];
985 if (first == null) {
986 if (nanos <= 0)
987 return null;
988 else
989 nanos = available.awaitNanos(nanos);
990 } else {
991 long delay = first.getDelay(TimeUnit.NANOSECONDS);
992 if (delay > 0) {
993 if (nanos <= 0)
994 return null;
995 if (delay > nanos)
996 delay = nanos;
997 long timeLeft = available.awaitNanos(delay);
998 nanos -= delay - timeLeft;
999 } else
1000 return finishPoll(first);
1001 }
1002 }
1003 } finally {
1004 lock.unlock();
1005 }
1006 }
1007
1008 public void clear() {
1009 final ReentrantLock lock = this.lock;
1010 lock.lock();
1011 try {
1012 for (int i = 0; i < size; i++) {
1013 RunnableScheduledFuture t = queue[i];
1014 if (t != null) {
1015 queue[i] = null;
1016 setIndex(t, -1);
1017 }
1018 }
1019 size = 0;
1020 } finally {
1021 lock.unlock();
1022 }
1023 }
1024
1025 /**
1026 * Return and remove first element only if it is expired.
1027 * Used only by drainTo. Call only when holding lock.
1028 */
1029 private RunnableScheduledFuture pollExpired() {
1030 RunnableScheduledFuture first = queue[0];
1031 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1032 return null;
1033 setIndex(first, -1);
1034 int s = --size;
1035 RunnableScheduledFuture x = queue[s];
1036 queue[s] = null;
1037 if (s != 0)
1038 siftDown(0, x);
1039 return first;
1040 }
1041
1042 public int drainTo(Collection<? super Runnable> c) {
1043 if (c == null)
1044 throw new NullPointerException();
1045 if (c == this)
1046 throw new IllegalArgumentException();
1047 final ReentrantLock lock = this.lock;
1048 lock.lock();
1049 try {
1050 int n = 0;
1051 for (;;) {
1052 RunnableScheduledFuture first = pollExpired();
1053 if (first != null) {
1054 c.add(first);
1055 ++n;
1056 }
1057 else
1058 break;
1059 }
1060 if (n > 0)
1061 available.signalAll();
1062 return n;
1063 } finally {
1064 lock.unlock();
1065 }
1066 }
1067
1068 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1069 if (c == null)
1070 throw new NullPointerException();
1071 if (c == this)
1072 throw new IllegalArgumentException();
1073 if (maxElements <= 0)
1074 return 0;
1075 final ReentrantLock lock = this.lock;
1076 lock.lock();
1077 try {
1078 int n = 0;
1079 while (n < maxElements) {
1080 RunnableScheduledFuture first = pollExpired();
1081 if (first != null) {
1082 c.add(first);
1083 ++n;
1084 }
1085 else
1086 break;
1087 }
1088 if (n > 0)
1089 available.signalAll();
1090 return n;
1091 } finally {
1092 lock.unlock();
1093 }
1094 }
1095
1096 public Object[] toArray() {
1097 final ReentrantLock lock = this.lock;
1098 lock.lock();
1099 try {
1100 return Arrays.copyOf(queue, size);
1101 } finally {
1102 lock.unlock();
1103 }
1104 }
1105
1106 public <T> T[] toArray(T[] a) {
1107 final ReentrantLock lock = this.lock;
1108 lock.lock();
1109 try {
1110 if (a.length < size)
1111 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1112 System.arraycopy(queue, 0, a, 0, size);
1113 if (a.length > size)
1114 a[size] = null;
1115 return a;
1116 } finally {
1117 lock.unlock();
1118 }
1119 }
1120
1121 public Iterator<Runnable> iterator() {
1122 return new Itr(toArray());
1123 }
1124
1125 /**
1126 * Snapshot iterator that works off copy of underlying q array.
1127 */
1128 private class Itr implements Iterator<Runnable> {
1129 final Object[] array; // Array of all elements
1130 int cursor; // index of next element to return;
1131 int lastRet; // index of last element, or -1 if no such
1132
1133 Itr(Object[] array) {
1134 lastRet = -1;
1135 this.array = array;
1136 }
1137
1138 public boolean hasNext() {
1139 return cursor < array.length;
1140 }
1141
1142 public Runnable next() {
1143 if (cursor >= array.length)
1144 throw new NoSuchElementException();
1145 lastRet = cursor;
1146 return (Runnable)array[cursor++];
1147 }
1148
1149 public void remove() {
1150 if (lastRet < 0)
1151 throw new IllegalStateException();
1152 DelayedWorkQueue.this.remove(array[lastRet]);
1153 lastRet = -1;
1154 }
1155 }
1156 }
1157 }