ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.47
Committed: Tue Sep 25 23:17:51 2007 UTC (16 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.46: +2 -2 lines
Log Message:
6602600: Fast removal of cancelled scheduled thread pool tasks

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p>Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>When a submitted task is cancelled before it is run, execution
27 * is suppressed. By default, such a cancelled task is not
28 * automatically removed from the work queue until its delay
29 * elapses. While this enables further inspection and monitoring, it
30 * may also cause unbounded retention of cancelled tasks. To avoid
31 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32 * causes tasks to be immediately removed from the work queue at
33 * time of cancellation.
34 *
35 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36 * of the inherited tuning methods are not useful for it. In
37 * particular, because it acts as a fixed-sized pool using
38 * {@code corePoolSize} threads and an unbounded queue, adjustments
39 * to {@code maximumPoolSize} have no useful effect. Additionally, it
40 * is almost never a good idea to set {@code corePoolSize} to zero or
41 * use {@code allowCoreThreadTimeOut} because this may leave the pool
42 * without threads to handle tasks once they become eligible to run.
43 *
44 * <p><b>Extension notes:</b> This class overrides the
45 * {@link ThreadPoolExecutor#execute execute} and
46 * {@link AbstractExecutorService#submit(Runnable) submit}
47 * methods to generate internal {@link ScheduledFuture} objects to
48 * control per-task delays and scheduling. To preserve
49 * functionality, any further overrides of these methods in
50 * subclasses must invoke superclass versions, which effectively
51 * disables additional task customization. However, this class
52 * provides alternative protected extension method
53 * {@code decorateTask} (one version each for {@code Runnable} and
54 * {@code Callable}) that can be used to customize the concrete task
55 * types used to execute commands entered via {@code execute},
56 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
57 * and {@code scheduleWithFixedDelay}. By default, a
58 * {@code ScheduledThreadPoolExecutor} uses a task type extending
59 * {@link FutureTask}. However, this may be modified or replaced using
60 * subclasses of the form:
61 *
62 * <pre> {@code
63 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
64 *
65 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
66 *
67 * protected <V> RunnableScheduledFuture<V> decorateTask(
68 * Runnable r, RunnableScheduledFuture<V> task) {
69 * return new CustomTask<V>(r, task);
70 * }
71 *
72 * protected <V> RunnableScheduledFuture<V> decorateTask(
73 * Callable<V> c, RunnableScheduledFuture<V> task) {
74 * return new CustomTask<V>(c, task);
75 * }
76 * // ... add constructors, etc.
77 * }}</pre>
78 *
79 * @since 1.5
80 * @author Doug Lea
81 */
82 public class ScheduledThreadPoolExecutor
83 extends ThreadPoolExecutor
84 implements ScheduledExecutorService {
85
86 /*
87 * This class specializes ThreadPoolExecutor implementation by
88 *
89 * 1. Using a custom task type, ScheduledFutureTask for
90 * tasks, even those that don't require scheduling (i.e.,
91 * those submitted using ExecutorService execute, not
92 * ScheduledExecutorService methods) which are treated as
93 * delayed tasks with a delay of zero.
94 *
95 * 2. Using a custom queue (DelayedWorkQueue), a variant of
96 * unbounded DelayQueue. The lack of capacity constraint and
97 * the fact that corePoolSize and maximumPoolSize are
98 * effectively identical simplifies some execution mechanics
99 * (see delayedExecute) compared to ThreadPoolExecutor.
100 *
101 * 3. Supporting optional run-after-shutdown parameters, which
102 * leads to overrides of shutdown methods to remove and cancel
103 * tasks that should NOT be run after shutdown, as well as
104 * different recheck logic when task (re)submission overlaps
105 * with a shutdown.
106 *
107 * 4. Task decoration methods to allow interception and
108 * instrumentation, which are needed because subclasses cannot
109 * otherwise override submit methods to get this effect. These
110 * don't have any impact on pool control logic though.
111 */
112
113 /**
114 * False if should cancel/suppress periodic tasks on shutdown.
115 */
116 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
117
118 /**
119 * False if should cancel non-periodic tasks on shutdown.
120 */
121 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
122
123 /**
124 * True if ScheduledFutureTask.cancel should remove from queue
125 */
126 private volatile boolean removeOnCancel = false;
127
128 /**
129 * Sequence number to break scheduling ties, and in turn to
130 * guarantee FIFO order among tied entries.
131 */
132 private static final AtomicLong sequencer = new AtomicLong(0);
133
134 /**
135 * Returns current nanosecond time.
136 */
137 final long now() {
138 return System.nanoTime();
139 }
140
141 private class ScheduledFutureTask<V>
142 extends FutureTask<V> implements RunnableScheduledFuture<V> {
143
144 /** Sequence number to break ties FIFO */
145 private final long sequenceNumber;
146
147 /** The time the task is enabled to execute in nanoTime units */
148 private long time;
149
150 /**
151 * Period in nanoseconds for repeating tasks. A positive
152 * value indicates fixed-rate execution. A negative value
153 * indicates fixed-delay execution. A value of 0 indicates a
154 * non-repeating task.
155 */
156 private final long period;
157
158 /** The actual task to be re-enqueued by reExecutePeriodic */
159 RunnableScheduledFuture<V> outerTask = this;
160
161 /**
162 * Index into delay queue, to support faster cancellation.
163 */
164 int heapIndex;
165
166 /**
167 * Creates a one-shot action with given nanoTime-based trigger time.
168 */
169 ScheduledFutureTask(Runnable r, V result, long ns) {
170 super(r, result);
171 this.time = ns;
172 this.period = 0;
173 this.sequenceNumber = sequencer.getAndIncrement();
174 }
175
176 /**
177 * Creates a periodic action with given nano time and period.
178 */
179 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
180 super(r, result);
181 this.time = ns;
182 this.period = period;
183 this.sequenceNumber = sequencer.getAndIncrement();
184 }
185
186 /**
187 * Creates a one-shot action with given nanoTime-based trigger.
188 */
189 ScheduledFutureTask(Callable<V> callable, long ns) {
190 super(callable);
191 this.time = ns;
192 this.period = 0;
193 this.sequenceNumber = sequencer.getAndIncrement();
194 }
195
196 public long getDelay(TimeUnit unit) {
197 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
198 return d;
199 }
200
201 public int compareTo(Delayed other) {
202 if (other == this) // compare zero ONLY if same object
203 return 0;
204 if (other instanceof ScheduledFutureTask) {
205 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
206 long diff = time - x.time;
207 if (diff < 0)
208 return -1;
209 else if (diff > 0)
210 return 1;
211 else if (sequenceNumber < x.sequenceNumber)
212 return -1;
213 else
214 return 1;
215 }
216 long d = (getDelay(TimeUnit.NANOSECONDS) -
217 other.getDelay(TimeUnit.NANOSECONDS));
218 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
219 }
220
221 /**
222 * Returns true if this is a periodic (not a one-shot) action.
223 *
224 * @return true if periodic
225 */
226 public boolean isPeriodic() {
227 return period != 0;
228 }
229
230 /**
231 * Sets the next time to run for a periodic task.
232 */
233 private void setNextRunTime() {
234 long p = period;
235 if (p > 0)
236 time += p;
237 else
238 time = now() - p;
239 }
240
241 public boolean cancel(boolean mayInterruptIfRunning) {
242 boolean cancelled = super.cancel(mayInterruptIfRunning);
243 if (cancelled && removeOnCancel && heapIndex >= 0)
244 remove(this);
245 return cancelled;
246 }
247
248 /**
249 * Overrides FutureTask version so as to reset/requeue if periodic.
250 */
251 public void run() {
252 boolean periodic = isPeriodic();
253 if (!canRunInCurrentRunState(periodic))
254 cancel(false);
255 else if (!periodic)
256 ScheduledFutureTask.super.run();
257 else if (ScheduledFutureTask.super.runAndReset()) {
258 setNextRunTime();
259 reExecutePeriodic(outerTask);
260 }
261 }
262 }
263
264 /**
265 * Returns true if can run a task given current run state
266 * and run-after-shutdown parameters.
267 *
268 * @param periodic true if this task periodic, false if delayed
269 */
270 boolean canRunInCurrentRunState(boolean periodic) {
271 return isRunningOrShutdown(periodic ?
272 continueExistingPeriodicTasksAfterShutdown :
273 executeExistingDelayedTasksAfterShutdown);
274 }
275
276 /**
277 * Main execution method for delayed or periodic tasks. If pool
278 * is shut down, rejects the task. Otherwise adds task to queue
279 * and starts a thread, if necessary, to run it. (We cannot
280 * prestart the thread to run the task because the task (probably)
281 * shouldn't be run yet,) If the pool is shut down while the task
282 * is being added, cancel and remove it if required by state and
283 * run-after-shutdown parameters.
284 *
285 * @param task the task
286 */
287 private void delayedExecute(RunnableScheduledFuture<?> task) {
288 if (isShutdown())
289 reject(task);
290 else {
291 super.getQueue().add(task);
292 if (isShutdown() &&
293 !canRunInCurrentRunState(task.isPeriodic()) &&
294 remove(task))
295 task.cancel(false);
296 else
297 prestartCoreThread();
298 }
299 }
300
301 /**
302 * Requeues a periodic task unless current run state precludes it.
303 * Same idea as delayedExecute except drops task rather than rejecting.
304 *
305 * @param task the task
306 */
307 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
308 if (canRunInCurrentRunState(true)) {
309 super.getQueue().add(task);
310 if (!canRunInCurrentRunState(true) && remove(task))
311 task.cancel(false);
312 else
313 prestartCoreThread();
314 }
315 }
316
317 /**
318 * Cancels and clears the queue of all tasks that should not be run
319 * due to shutdown policy. Invoked within super.shutdown.
320 */
321 @Override void onShutdown() {
322 BlockingQueue<Runnable> q = super.getQueue();
323 boolean keepDelayed =
324 getExecuteExistingDelayedTasksAfterShutdownPolicy();
325 boolean keepPeriodic =
326 getContinueExistingPeriodicTasksAfterShutdownPolicy();
327 if (!keepDelayed && !keepPeriodic)
328 q.clear();
329 else {
330 // Traverse snapshot to avoid iterator exceptions
331 for (Object e : q.toArray()) {
332 if (e instanceof RunnableScheduledFuture) {
333 RunnableScheduledFuture<?> t =
334 (RunnableScheduledFuture<?>)e;
335 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
336 t.isCancelled()) { // also remove if already cancelled
337 if (q.remove(t))
338 t.cancel(false);
339 }
340 }
341 }
342 }
343 tryTerminate();
344 }
345
346 /**
347 * Modifies or replaces the task used to execute a runnable.
348 * This method can be used to override the concrete
349 * class used for managing internal tasks.
350 * The default implementation simply returns the given task.
351 *
352 * @param runnable the submitted Runnable
353 * @param task the task created to execute the runnable
354 * @return a task that can execute the runnable
355 * @since 1.6
356 */
357 protected <V> RunnableScheduledFuture<V> decorateTask(
358 Runnable runnable, RunnableScheduledFuture<V> task) {
359 return task;
360 }
361
362 /**
363 * Modifies or replaces the task used to execute a callable.
364 * This method can be used to override the concrete
365 * class used for managing internal tasks.
366 * The default implementation simply returns the given task.
367 *
368 * @param callable the submitted Callable
369 * @param task the task created to execute the callable
370 * @return a task that can execute the callable
371 * @since 1.6
372 */
373 protected <V> RunnableScheduledFuture<V> decorateTask(
374 Callable<V> callable, RunnableScheduledFuture<V> task) {
375 return task;
376 }
377
378 /**
379 * Creates a new {@code ScheduledThreadPoolExecutor} with the
380 * given core pool size.
381 *
382 * @param corePoolSize the number of threads to keep in the pool, even
383 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
384 * @throws IllegalArgumentException if {@code corePoolSize < 0}
385 */
386 public ScheduledThreadPoolExecutor(int corePoolSize) {
387 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
388 new DelayedWorkQueue());
389 }
390
391 /**
392 * Creates a new {@code ScheduledThreadPoolExecutor} with the
393 * given initial parameters.
394 *
395 * @param corePoolSize the number of threads to keep in the pool, even
396 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
397 * @param threadFactory the factory to use when the executor
398 * creates a new thread
399 * @throws IllegalArgumentException if {@code corePoolSize < 0}
400 * @throws NullPointerException if {@code threadFactory} is null
401 */
402 public ScheduledThreadPoolExecutor(int corePoolSize,
403 ThreadFactory threadFactory) {
404 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
405 new DelayedWorkQueue(), threadFactory);
406 }
407
408 /**
409 * Creates a new ScheduledThreadPoolExecutor with the given
410 * initial parameters.
411 *
412 * @param corePoolSize the number of threads to keep in the pool, even
413 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
414 * @param handler the handler to use when execution is blocked
415 * because the thread bounds and queue capacities are reached
416 * @throws IllegalArgumentException if {@code corePoolSize < 0}
417 * @throws NullPointerException if {@code handler} is null
418 */
419 public ScheduledThreadPoolExecutor(int corePoolSize,
420 RejectedExecutionHandler handler) {
421 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
422 new DelayedWorkQueue(), handler);
423 }
424
425 /**
426 * Creates a new ScheduledThreadPoolExecutor with the given
427 * initial parameters.
428 *
429 * @param corePoolSize the number of threads to keep in the pool, even
430 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431 * @param threadFactory the factory to use when the executor
432 * creates a new thread
433 * @param handler the handler to use when execution is blocked
434 * because the thread bounds and queue capacities are reached
435 * @throws IllegalArgumentException if {@code corePoolSize < 0}
436 * @throws NullPointerException if {@code threadFactory} or
437 * {@code handler} is null
438 */
439 public ScheduledThreadPoolExecutor(int corePoolSize,
440 ThreadFactory threadFactory,
441 RejectedExecutionHandler handler) {
442 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
443 new DelayedWorkQueue(), threadFactory, handler);
444 }
445
446 /**
447 * @throws RejectedExecutionException {@inheritDoc}
448 * @throws NullPointerException {@inheritDoc}
449 */
450 public ScheduledFuture<?> schedule(Runnable command,
451 long delay,
452 TimeUnit unit) {
453 if (command == null || unit == null)
454 throw new NullPointerException();
455 if (delay < 0) delay = 0;
456 long triggerTime = now() + unit.toNanos(delay);
457 RunnableScheduledFuture<?> t = decorateTask(command,
458 new ScheduledFutureTask<Void>(command, null, triggerTime));
459 delayedExecute(t);
460 return t;
461 }
462
463 /**
464 * @throws RejectedExecutionException {@inheritDoc}
465 * @throws NullPointerException {@inheritDoc}
466 */
467 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
468 long delay,
469 TimeUnit unit) {
470 if (callable == null || unit == null)
471 throw new NullPointerException();
472 if (delay < 0) delay = 0;
473 long triggerTime = now() + unit.toNanos(delay);
474 RunnableScheduledFuture<V> t = decorateTask(callable,
475 new ScheduledFutureTask<V>(callable, triggerTime));
476 delayedExecute(t);
477 return t;
478 }
479
480 /**
481 * @throws RejectedExecutionException {@inheritDoc}
482 * @throws NullPointerException {@inheritDoc}
483 * @throws IllegalArgumentException {@inheritDoc}
484 */
485 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
486 long initialDelay,
487 long period,
488 TimeUnit unit) {
489 if (command == null || unit == null)
490 throw new NullPointerException();
491 if (period <= 0)
492 throw new IllegalArgumentException();
493 if (initialDelay < 0) initialDelay = 0;
494 long triggerTime = now() + unit.toNanos(initialDelay);
495 ScheduledFutureTask<Void> sft =
496 new ScheduledFutureTask<Void>(command,
497 null,
498 triggerTime,
499 unit.toNanos(period));
500 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
501 sft.outerTask = t;
502 delayedExecute(t);
503 return t;
504 }
505
506 /**
507 * @throws RejectedExecutionException {@inheritDoc}
508 * @throws NullPointerException {@inheritDoc}
509 * @throws IllegalArgumentException {@inheritDoc}
510 */
511 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
512 long initialDelay,
513 long delay,
514 TimeUnit unit) {
515 if (command == null || unit == null)
516 throw new NullPointerException();
517 if (delay <= 0)
518 throw new IllegalArgumentException();
519 if (initialDelay < 0) initialDelay = 0;
520 long triggerTime = now() + unit.toNanos(initialDelay);
521 ScheduledFutureTask<Void> sft =
522 new ScheduledFutureTask<Void>(command,
523 null,
524 triggerTime,
525 unit.toNanos(-delay));
526 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
527 sft.outerTask = t;
528 delayedExecute(t);
529 return t;
530 }
531
532 /**
533 * Executes {@code command} with zero required delay.
534 * This has effect equivalent to
535 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
536 * Note that inspections of the queue and of the list returned by
537 * {@code shutdownNow} will access the zero-delayed
538 * {@link ScheduledFuture}, not the {@code command} itself.
539 *
540 * <p>A consequence of the use of {@code ScheduledFuture} objects is
541 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
542 * called with a null second {@code Throwable} argument, even if the
543 * {@code command} terminated abruptly. Instead, the {@code Throwable}
544 * thrown by such a task can be obtained via {@link Future#get}.
545 *
546 * @throws RejectedExecutionException at discretion of
547 * {@code RejectedExecutionHandler}, if the task
548 * cannot be accepted for execution because the
549 * executor has been shut down
550 * @throws NullPointerException {@inheritDoc}
551 */
552 public void execute(Runnable command) {
553 schedule(command, 0, TimeUnit.NANOSECONDS);
554 }
555
556 // Override AbstractExecutorService methods
557
558 /**
559 * @throws RejectedExecutionException {@inheritDoc}
560 * @throws NullPointerException {@inheritDoc}
561 */
562 public Future<?> submit(Runnable task) {
563 return schedule(task, 0, TimeUnit.NANOSECONDS);
564 }
565
566 /**
567 * @throws RejectedExecutionException {@inheritDoc}
568 * @throws NullPointerException {@inheritDoc}
569 */
570 public <T> Future<T> submit(Runnable task, T result) {
571 return schedule(Executors.callable(task, result),
572 0, TimeUnit.NANOSECONDS);
573 }
574
575 /**
576 * @throws RejectedExecutionException {@inheritDoc}
577 * @throws NullPointerException {@inheritDoc}
578 */
579 public <T> Future<T> submit(Callable<T> task) {
580 return schedule(task, 0, TimeUnit.NANOSECONDS);
581 }
582
583 /**
584 * Sets the policy on whether to continue executing existing
585 * periodic tasks even when this executor has been {@code shutdown}.
586 * In this case, these tasks will only terminate upon
587 * {@code shutdownNow} or after setting the policy to
588 * {@code false} when already shutdown.
589 * This value is by default {@code false}.
590 *
591 * @param value if {@code true}, continue after shutdown, else don't.
592 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
593 */
594 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
595 continueExistingPeriodicTasksAfterShutdown = value;
596 if (!value && isShutdown())
597 onShutdown();
598 }
599
600 /**
601 * Gets the policy on whether to continue executing existing
602 * periodic tasks even when this executor has been {@code shutdown}.
603 * In this case, these tasks will only terminate upon
604 * {@code shutdownNow} or after setting the policy to
605 * {@code false} when already shutdown.
606 * This value is by default {@code false}.
607 *
608 * @return {@code true} if will continue after shutdown
609 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
610 */
611 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
612 return continueExistingPeriodicTasksAfterShutdown;
613 }
614
615 /**
616 * Sets the policy on whether to execute existing delayed
617 * tasks even when this executor has been {@code shutdown}.
618 * In this case, these tasks will only terminate upon
619 * {@code shutdownNow}, or after setting the policy to
620 * {@code false} when already shutdown.
621 * This value is by default {@code true}.
622 *
623 * @param value if {@code true}, execute after shutdown, else don't.
624 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
625 */
626 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
627 executeExistingDelayedTasksAfterShutdown = value;
628 if (!value && isShutdown())
629 onShutdown();
630 }
631
632 /**
633 * Gets the policy on whether to execute existing delayed
634 * tasks even when this executor has been {@code shutdown}.
635 * In this case, these tasks will only terminate upon
636 * {@code shutdownNow}, or after setting the policy to
637 * {@code false} when already shutdown.
638 * This value is by default {@code true}.
639 *
640 * @return {@code true} if will execute after shutdown
641 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
642 */
643 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
644 return executeExistingDelayedTasksAfterShutdown;
645 }
646
647 /**
648 * Sets the policy on whether cancelled tasks should be immediately
649 * removed from the work queue at time of cancellation. This value is
650 * by default {@code false}.
651 *
652 * @param value if {@code true}, remove on cancellation, else don't
653 * @see #getRemoveOnCancelPolicy
654 * @since 1.7
655 */
656 public void setRemoveOnCancelPolicy(boolean value) {
657 removeOnCancel = value;
658 }
659
660 /**
661 * Gets the policy on whether cancelled tasks should be immediately
662 * removed from the work queue at time of cancellation. This value is
663 * by default {@code false}.
664 *
665 * @return {@code true} if cancelled tasks are immediately removed
666 * from the queue
667 * @see #setRemoveOnCancelPolicy
668 * @since 1.7
669 */
670 public boolean getRemoveOnCancelPolicy() {
671 return removeOnCancel;
672 }
673
674 /**
675 * Initiates an orderly shutdown in which previously submitted
676 * tasks are executed, but no new tasks will be accepted. If the
677 * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
678 * been set {@code false}, existing delayed tasks whose delays
679 * have not yet elapsed are cancelled. And unless the
680 * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
681 * been set {@code true}, future executions of existing periodic
682 * tasks will be cancelled.
683 *
684 * @throws SecurityException {@inheritDoc}
685 */
686 public void shutdown() {
687 super.shutdown();
688 }
689
690 /**
691 * Attempts to stop all actively executing tasks, halts the
692 * processing of waiting tasks, and returns a list of the tasks
693 * that were awaiting execution.
694 *
695 * <p>There are no guarantees beyond best-effort attempts to stop
696 * processing actively executing tasks. This implementation
697 * cancels tasks via {@link Thread#interrupt}, so any task that
698 * fails to respond to interrupts may never terminate.
699 *
700 * @return list of tasks that never commenced execution.
701 * Each element of this list is a {@link ScheduledFuture},
702 * including those tasks submitted using {@code execute},
703 * which are for scheduling purposes used as the basis of a
704 * zero-delay {@code ScheduledFuture}.
705 * @throws SecurityException {@inheritDoc}
706 */
707 public List<Runnable> shutdownNow() {
708 return super.shutdownNow();
709 }
710
711 /**
712 * Returns the task queue used by this executor. Each element of
713 * this queue is a {@link ScheduledFuture}, including those
714 * tasks submitted using {@code execute} which are for scheduling
715 * purposes used as the basis of a zero-delay
716 * {@code ScheduledFuture}. Iteration over this queue is
717 * <em>not</em> guaranteed to traverse tasks in the order in
718 * which they will execute.
719 *
720 * @return the task queue
721 */
722 public BlockingQueue<Runnable> getQueue() {
723 return super.getQueue();
724 }
725
726 /**
727 * Specialized delay queue. To mesh with TPE declarations, this
728 * class must be declared as a BlockingQueue<Runnable> even though
729 * it can only hold RunnableScheduledFutures.
730 */
731 static class DelayedWorkQueue extends AbstractQueue<Runnable>
732 implements BlockingQueue<Runnable> {
733
734 /*
735 * A DelayedWorkQueue is based on a heap-based data structure
736 * like those in DelayQueue and PriorityQueue, except that
737 * every ScheduledFutureTask also records its index into the
738 * heap array. This eliminates the need to find a task upon
739 * cancellation, greatly speeding up removal (down from O(n)
740 * to O(log n)), and reducing garbage retention that would
741 * otherwise occur by waiting for the element to rise to top
742 * before clearing. But because the queue may also hold
743 * RunnableScheduledFutures that are not ScheduledFutureTasks,
744 * we are not guaranteed to have such indices available, in
745 * which case we fall back to linear search. (We expect that
746 * most tasks will not be decorated, and that the faster cases
747 * will be much more common.)
748 *
749 * All heap operations must record index changes -- mainly
750 * within siftUp and siftDown. Upon removal, a task's
751 * heapIndex is set to -1. Note that ScheduledFutureTasks can
752 * appear at most once in the queue (this need not be true for
753 * other kinds of tasks or work queues), so are uniquely
754 * identified by heapIndex.
755 */
756
757 private static final int INITIAL_CAPACITY = 16;
758 private RunnableScheduledFuture[] queue =
759 new RunnableScheduledFuture[INITIAL_CAPACITY];
760 private final ReentrantLock lock = new ReentrantLock();
761 private int size = 0;
762
763 /**
764 * Thread designated to wait for the task at the head of the
765 * queue. This variant of the Leader-Follower pattern
766 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
767 * minimize unnecessary timed waiting. When a thread becomes
768 * the leader, it waits only for the next delay to elapse, but
769 * other threads await indefinitely. The leader thread must
770 * signal some other thread before returning from take() or
771 * poll(...), unless some other thread becomes leader in the
772 * interim. Whenever the head of the queue is replaced with a
773 * task with an earlier expiration time, the leader field is
774 * invalidated by being reset to null, and some waiting
775 * thread, but not necessarily the current leader, is
776 * signalled. So waiting threads must be prepared to acquire
777 * and lose leadership while waiting.
778 */
779 private Thread leader = null;
780
781 /**
782 * Condition signalled when a newer task becomes available at the
783 * head of the queue or a new thread may need to become leader.
784 */
785 private final Condition available = lock.newCondition();
786
787 /**
788 * Set f's heapIndex if it is a ScheduledFutureTask.
789 */
790 private void setIndex(RunnableScheduledFuture f, int idx) {
791 if (f instanceof ScheduledFutureTask)
792 ((ScheduledFutureTask)f).heapIndex = idx;
793 }
794
795 /**
796 * Sift element added at bottom up to its heap-ordered spot.
797 * Call only when holding lock.
798 */
799 private void siftUp(int k, RunnableScheduledFuture key) {
800 while (k > 0) {
801 int parent = (k - 1) >>> 1;
802 RunnableScheduledFuture e = queue[parent];
803 if (key.compareTo(e) >= 0)
804 break;
805 queue[k] = e;
806 setIndex(e, k);
807 k = parent;
808 }
809 queue[k] = key;
810 setIndex(key, k);
811 }
812
813 /**
814 * Sift element added at top down to its heap-ordered spot.
815 * Call only when holding lock.
816 */
817 private void siftDown(int k, RunnableScheduledFuture key) {
818 int half = size >>> 1;
819 while (k < half) {
820 int child = (k << 1) + 1;
821 RunnableScheduledFuture c = queue[child];
822 int right = child + 1;
823 if (right < size && c.compareTo(queue[right]) > 0)
824 c = queue[child = right];
825 if (key.compareTo(c) <= 0)
826 break;
827 queue[k] = c;
828 setIndex(c, k);
829 k = child;
830 }
831 queue[k] = key;
832 setIndex(key, k);
833 }
834
835 /**
836 * Resize the heap array. Call only when holding lock.
837 */
838 private void grow() {
839 int oldCapacity = queue.length;
840 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
841 if (newCapacity < 0) // overflow
842 newCapacity = Integer.MAX_VALUE;
843 queue = Arrays.copyOf(queue, newCapacity);
844 }
845
846 /**
847 * Find index of given object, or -1 if absent
848 */
849 private int indexOf(Object x) {
850 if (x != null) {
851 if (x instanceof ScheduledFutureTask) {
852 int i = ((ScheduledFutureTask) x).heapIndex;
853 // Sanity check; x could conceivably be a
854 // ScheduledFutureTask from some other pool.
855 if (i >= 0 && i < size && queue[i] == x)
856 return i;
857 } else {
858 for (int i = 0; i < size; i++)
859 if (x.equals(queue[i]))
860 return i;
861 }
862 }
863 return -1;
864 }
865
866 public boolean contains(Object x) {
867 final ReentrantLock lock = this.lock;
868 lock.lock();
869 try {
870 return indexOf(x) != -1;
871 } finally {
872 lock.unlock();
873 }
874 }
875
876 public boolean remove(Object x) {
877 final ReentrantLock lock = this.lock;
878 lock.lock();
879 try {
880 int i = indexOf(x);
881 if (i < 0)
882 return false;
883
884 setIndex(queue[i], -1);
885 int s = --size;
886 RunnableScheduledFuture replacement = queue[s];
887 queue[s] = null;
888 if (s != i) {
889 siftDown(i, replacement);
890 if (queue[i] == replacement)
891 siftUp(i, replacement);
892 }
893 return true;
894 } finally {
895 lock.unlock();
896 }
897 }
898
899 public int size() {
900 final ReentrantLock lock = this.lock;
901 lock.lock();
902 try {
903 return size;
904 } finally {
905 lock.unlock();
906 }
907 }
908
909 public boolean isEmpty() {
910 return size() == 0;
911 }
912
913 public int remainingCapacity() {
914 return Integer.MAX_VALUE;
915 }
916
917 public RunnableScheduledFuture peek() {
918 final ReentrantLock lock = this.lock;
919 lock.lock();
920 try {
921 return queue[0];
922 } finally {
923 lock.unlock();
924 }
925 }
926
927 public boolean offer(Runnable x) {
928 if (x == null)
929 throw new NullPointerException();
930 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
931 final ReentrantLock lock = this.lock;
932 lock.lock();
933 try {
934 int i = size;
935 if (i >= queue.length)
936 grow();
937 size = i + 1;
938 if (i == 0) {
939 queue[0] = e;
940 setIndex(e, 0);
941 } else {
942 siftUp(i, e);
943 }
944 if (queue[0] == e) {
945 leader = null;
946 available.signal();
947 }
948 } finally {
949 lock.unlock();
950 }
951 return true;
952 }
953
954 public void put(Runnable e) {
955 offer(e);
956 }
957
958 public boolean add(Runnable e) {
959 return offer(e);
960 }
961
962 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
963 return offer(e);
964 }
965
966 /**
967 * Performs common bookkeeping for poll and take: Replaces
968 * first element with last and sifts it down. Call only when
969 * holding lock.
970 * @param f the task to remove and return
971 */
972 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
973 int s = --size;
974 RunnableScheduledFuture x = queue[s];
975 queue[s] = null;
976 if (s != 0)
977 siftDown(0, x);
978 setIndex(f, -1);
979 return f;
980 }
981
982 public RunnableScheduledFuture poll() {
983 final ReentrantLock lock = this.lock;
984 lock.lock();
985 try {
986 RunnableScheduledFuture first = queue[0];
987 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
988 return null;
989 else
990 return finishPoll(first);
991 } finally {
992 lock.unlock();
993 }
994 }
995
996 public RunnableScheduledFuture take() throws InterruptedException {
997 final ReentrantLock lock = this.lock;
998 lock.lockInterruptibly();
999 try {
1000 for (;;) {
1001 RunnableScheduledFuture first = queue[0];
1002 if (first == null)
1003 available.await();
1004 else {
1005 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1006 if (delay <= 0)
1007 return finishPoll(first);
1008 else if (leader != null)
1009 available.await();
1010 else {
1011 Thread thisThread = Thread.currentThread();
1012 leader = thisThread;
1013 try {
1014 available.awaitNanos(delay);
1015 } finally {
1016 if (leader == thisThread)
1017 leader = null;
1018 }
1019 }
1020 }
1021 }
1022 } finally {
1023 if (leader == null && queue[0] != null)
1024 available.signal();
1025 lock.unlock();
1026 }
1027 }
1028
1029 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1030 throws InterruptedException {
1031 long nanos = unit.toNanos(timeout);
1032 final ReentrantLock lock = this.lock;
1033 lock.lockInterruptibly();
1034 try {
1035 for (;;) {
1036 RunnableScheduledFuture first = queue[0];
1037 if (first == null) {
1038 if (nanos <= 0)
1039 return null;
1040 else
1041 nanos = available.awaitNanos(nanos);
1042 } else {
1043 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1044 if (delay <= 0)
1045 return finishPoll(first);
1046 if (nanos <= 0)
1047 return null;
1048 if (nanos < delay || leader != null)
1049 nanos = available.awaitNanos(nanos);
1050 else {
1051 Thread thisThread = Thread.currentThread();
1052 leader = thisThread;
1053 try {
1054 long timeLeft = available.awaitNanos(delay);
1055 nanos -= delay - timeLeft;
1056 } finally {
1057 if (leader == thisThread)
1058 leader = null;
1059 }
1060 }
1061 }
1062 }
1063 } finally {
1064 if (leader == null && queue[0] != null)
1065 available.signal();
1066 lock.unlock();
1067 }
1068 }
1069
1070 public void clear() {
1071 final ReentrantLock lock = this.lock;
1072 lock.lock();
1073 try {
1074 for (int i = 0; i < size; i++) {
1075 RunnableScheduledFuture t = queue[i];
1076 if (t != null) {
1077 queue[i] = null;
1078 setIndex(t, -1);
1079 }
1080 }
1081 size = 0;
1082 } finally {
1083 lock.unlock();
1084 }
1085 }
1086
1087 /**
1088 * Return and remove first element only if it is expired.
1089 * Used only by drainTo. Call only when holding lock.
1090 */
1091 private RunnableScheduledFuture pollExpired() {
1092 RunnableScheduledFuture first = queue[0];
1093 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1094 return null;
1095 return finishPoll(first);
1096 }
1097
1098 public int drainTo(Collection<? super Runnable> c) {
1099 if (c == null)
1100 throw new NullPointerException();
1101 if (c == this)
1102 throw new IllegalArgumentException();
1103 final ReentrantLock lock = this.lock;
1104 lock.lock();
1105 try {
1106 RunnableScheduledFuture first;
1107 int n = 0;
1108 while ((first = pollExpired()) != null) {
1109 c.add(first);
1110 ++n;
1111 }
1112 return n;
1113 } finally {
1114 lock.unlock();
1115 }
1116 }
1117
1118 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1119 if (c == null)
1120 throw new NullPointerException();
1121 if (c == this)
1122 throw new IllegalArgumentException();
1123 if (maxElements <= 0)
1124 return 0;
1125 final ReentrantLock lock = this.lock;
1126 lock.lock();
1127 try {
1128 RunnableScheduledFuture first;
1129 int n = 0;
1130 while (n < maxElements && (first = pollExpired()) != null) {
1131 c.add(first);
1132 ++n;
1133 }
1134 return n;
1135 } finally {
1136 lock.unlock();
1137 }
1138 }
1139
1140 public Object[] toArray() {
1141 final ReentrantLock lock = this.lock;
1142 lock.lock();
1143 try {
1144 return Arrays.copyOf(queue, size, Object[].class);
1145 } finally {
1146 lock.unlock();
1147 }
1148 }
1149
1150 @SuppressWarnings("unchecked")
1151 public <T> T[] toArray(T[] a) {
1152 final ReentrantLock lock = this.lock;
1153 lock.lock();
1154 try {
1155 if (a.length < size)
1156 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1157 System.arraycopy(queue, 0, a, 0, size);
1158 if (a.length > size)
1159 a[size] = null;
1160 return a;
1161 } finally {
1162 lock.unlock();
1163 }
1164 }
1165
1166 public Iterator<Runnable> iterator() {
1167 return new Itr(Arrays.copyOf(queue, size));
1168 }
1169
1170 /**
1171 * Snapshot iterator that works off copy of underlying q array.
1172 */
1173 private class Itr implements Iterator<Runnable> {
1174 final RunnableScheduledFuture[] array;
1175 int cursor = 0; // index of next element to return
1176 int lastRet = -1; // index of last element, or -1 if no such
1177
1178 Itr(RunnableScheduledFuture[] array) {
1179 this.array = array;
1180 }
1181
1182 public boolean hasNext() {
1183 return cursor < array.length;
1184 }
1185
1186 public Runnable next() {
1187 if (cursor >= array.length)
1188 throw new NoSuchElementException();
1189 lastRet = cursor;
1190 return array[cursor++];
1191 }
1192
1193 public void remove() {
1194 if (lastRet < 0)
1195 throw new IllegalStateException();
1196 DelayedWorkQueue.this.remove(array[lastRet]);
1197 lastRet = -1;
1198 }
1199 }
1200 }
1201 }