ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.45
Committed: Tue Sep 18 01:04:35 2007 UTC (16 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.44: +58 -62 lines
Log Message:
6602600: Fast removal of cancelled scheduled thread pool tasks

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission. If {@link #setRemoveOnCancelPolicy} is set {@code true},
25 * cancelled tasks are automatically removed from the work queue.
26 *
27 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28 * of the inherited tuning methods are not useful for it. In
29 * particular, because it acts as a fixed-sized pool using
30 * {@code corePoolSize} threads and an unbounded queue, adjustments
31 * to {@code maximumPoolSize} have no useful effect. Additionally, it
32 * is almost never a good idea to set {@code corePoolSize} to zero or
33 * use {@code allowCoreThreadTimeOut} because this may leave the pool
34 * without threads to handle tasks once they become eligible to run.
35 *
36 * <p><b>Extension notes:</b> This class overrides the
37 * {@link ThreadPoolExecutor#execute execute} and
38 * {@link AbstractExecutorService#submit(Runnable) submit}
39 * methods to generate internal {@link ScheduledFuture} objects to
40 * control per-task delays and scheduling. To preserve
41 * functionality, any further overrides of these methods in
42 * subclasses must invoke superclass versions, which effectively
43 * disables additional task customization. However, this class
44 * provides alternative protected extension method
45 * {@code decorateTask} (one version each for {@code Runnable} and
46 * {@code Callable}) that can be used to customize the concrete task
47 * types used to execute commands entered via {@code execute},
48 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
49 * and {@code scheduleWithFixedDelay}. By default, a
50 * {@code ScheduledThreadPoolExecutor} uses a task type extending
51 * {@link FutureTask}. However, this may be modified or replaced using
52 * subclasses of the form:
53 *
54 * <pre> {@code
55 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
56 *
57 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
58 *
59 * protected <V> RunnableScheduledFuture<V> decorateTask(
60 * Runnable r, RunnableScheduledFuture<V> task) {
61 * return new CustomTask<V>(r, task);
62 * }
63 *
64 * protected <V> RunnableScheduledFuture<V> decorateTask(
65 * Callable<V> c, RunnableScheduledFuture<V> task) {
66 * return new CustomTask<V>(c, task);
67 * }
68 * // ... add constructors, etc.
69 * }}</pre>
70 *
71 * @since 1.5
72 * @author Doug Lea
73 */
74 public class ScheduledThreadPoolExecutor
75 extends ThreadPoolExecutor
76 implements ScheduledExecutorService {
77
78 /*
79 * This class specializes ThreadPoolExecutor implementation by
80 *
81 * 1. Using a custom task type, ScheduledFutureTask for
82 * tasks, even those that don't require scheduling (i.e.,
83 * those submitted using ExecutorService execute, not
84 * ScheduledExecutorService methods) which are treated as
85 * delayed tasks with a delay of zero.
86 *
87 * 2. Using a custom queue (DelayedWorkQueue) based on an
88 * unbounded DelayQueue. The lack of capacity constraint and
89 * the fact that corePoolSize and maximumPoolSize are
90 * effectively identical simplifies some execution mechanics
91 * (see delayedExecute) compared to ThreadPoolExecutor
92 * version.
93 *
94 * The DelayedWorkQueue class is defined below for the sake of
95 * ensuring that all elements are instances of
96 * RunnableScheduledFuture. Since DelayQueue otherwise
97 * requires type be Delayed, but not necessarily Runnable, and
98 * the workQueue requires the opposite, we need to explicitly
99 * define a class that requires both to ensure that users don't
100 * add objects that aren't RunnableScheduledFutures via
101 * getQueue().add() etc.
102 *
103 * 3. Supporting optional run-after-shutdown parameters, which
104 * leads to overrides of shutdown methods to remove and cancel
105 * tasks that should NOT be run after shutdown, as well as
106 * different recheck logic when task (re)submission overlaps
107 * with a shutdown.
108 *
109 * 4. Task decoration methods to allow interception and
110 * instrumentation, which are needed because subclasses cannot
111 * otherwise override submit methods to get this effect. These
112 * don't have any impact on pool control logic though.
113 */
114
115 /**
116 * False if should cancel/suppress periodic tasks on shutdown.
117 */
118 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
119
120 /**
121 * False if should cancel non-periodic tasks on shutdown.
122 */
123 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
124
125 /**
126 * True if ScheduledFutureTask.cancel should remove from queue
127 */
128 private volatile boolean removeOnCancel = false;
129
130 /**
131 * Sequence number to break scheduling ties, and in turn to
132 * guarantee FIFO order among tied entries.
133 */
134 private static final AtomicLong sequencer = new AtomicLong(0);
135
136 /**
137 * Returns current nanosecond time.
138 */
139 final long now() {
140 return System.nanoTime();
141 }
142
143 private class ScheduledFutureTask<V>
144 extends FutureTask<V> implements RunnableScheduledFuture<V> {
145
146 /** Sequence number to break ties FIFO */
147 private final long sequenceNumber;
148
149 /** The time the task is enabled to execute in nanoTime units */
150 private long time;
151
152 /**
153 * Period in nanoseconds for repeating tasks. A positive
154 * value indicates fixed-rate execution. A negative value
155 * indicates fixed-delay execution. A value of 0 indicates a
156 * non-repeating task.
157 */
158 private final long period;
159
160 /** The actual task to be re-enqueued by reExecutePeriodic */
161 RunnableScheduledFuture<V> outerTask = this;
162
163 /**
164 * Index into delay queue, to support faster cancellation.
165 */
166 int heapIndex;
167
168 /**
169 * Creates a one-shot action with given nanoTime-based trigger time.
170 */
171 ScheduledFutureTask(Runnable r, V result, long ns) {
172 super(r, result);
173 this.time = ns;
174 this.period = 0;
175 this.sequenceNumber = sequencer.getAndIncrement();
176 }
177
178 /**
179 * Creates a periodic action with given nano time and period.
180 */
181 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
182 super(r, result);
183 this.time = ns;
184 this.period = period;
185 this.sequenceNumber = sequencer.getAndIncrement();
186 }
187
188 /**
189 * Creates a one-shot action with given nanoTime-based trigger.
190 */
191 ScheduledFutureTask(Callable<V> callable, long ns) {
192 super(callable);
193 this.time = ns;
194 this.period = 0;
195 this.sequenceNumber = sequencer.getAndIncrement();
196 }
197
198 public long getDelay(TimeUnit unit) {
199 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
200 return d;
201 }
202
203 public int compareTo(Delayed other) {
204 if (other == this) // compare zero ONLY if same object
205 return 0;
206 if (other instanceof ScheduledFutureTask) {
207 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
208 long diff = time - x.time;
209 if (diff < 0)
210 return -1;
211 else if (diff > 0)
212 return 1;
213 else if (sequenceNumber < x.sequenceNumber)
214 return -1;
215 else
216 return 1;
217 }
218 long d = (getDelay(TimeUnit.NANOSECONDS) -
219 other.getDelay(TimeUnit.NANOSECONDS));
220 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
221 }
222
223 /**
224 * Returns true if this is a periodic (not a one-shot) action.
225 *
226 * @return true if periodic
227 */
228 public boolean isPeriodic() {
229 return period != 0;
230 }
231
232 /**
233 * Sets the next time to run for a periodic task.
234 */
235 private void setNextRunTime() {
236 long p = period;
237 if (p > 0)
238 time += p;
239 else
240 time = now() - p;
241 }
242
243 public boolean cancel(boolean mayInterruptIfRunning) {
244 boolean cancelled = super.cancel(mayInterruptIfRunning);
245 if (cancelled && removeOnCancel && heapIndex >= 0)
246 remove(this);
247 return cancelled;
248 }
249
250 /**
251 * Overrides FutureTask version so as to reset/requeue if periodic.
252 */
253 public void run() {
254 boolean periodic = isPeriodic();
255 if (!canRunInCurrentRunState(periodic))
256 cancel(false);
257 else if (!periodic)
258 ScheduledFutureTask.super.run();
259 else if (ScheduledFutureTask.super.runAndReset()) {
260 setNextRunTime();
261 reExecutePeriodic(outerTask);
262 }
263 }
264 }
265
266 /**
267 * Returns true if can run a task given current run state
268 * and run-after-shutdown parameters.
269 *
270 * @param periodic true if this task periodic, false if delayed
271 */
272 boolean canRunInCurrentRunState(boolean periodic) {
273 return isRunningOrShutdown(periodic ?
274 continueExistingPeriodicTasksAfterShutdown :
275 executeExistingDelayedTasksAfterShutdown);
276 }
277
278 /**
279 * Main execution method for delayed or periodic tasks. If pool
280 * is shut down, rejects the task. Otherwise adds task to queue
281 * and starts a thread, if necessary, to run it. (We cannot
282 * prestart the thread to run the task because the task (probably)
283 * shouldn't be run yet,) If the pool is shut down while the task
284 * is being added, cancel and remove it if required by state and
285 * run-after-shutdown parameters.
286 *
287 * @param task the task
288 */
289 private void delayedExecute(RunnableScheduledFuture<?> task) {
290 if (isShutdown())
291 reject(task);
292 else {
293 super.getQueue().add(task);
294 if (isShutdown() &&
295 !canRunInCurrentRunState(task.isPeriodic()) &&
296 remove(task))
297 task.cancel(false);
298 else
299 prestartCoreThread();
300 }
301 }
302
303 /**
304 * Requeues a periodic task unless current run state precludes it.
305 * Same idea as delayedExecute except drops task rather than rejecting.
306 *
307 * @param task the task
308 */
309 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
310 if (canRunInCurrentRunState(true)) {
311 super.getQueue().add(task);
312 if (!canRunInCurrentRunState(true) && remove(task))
313 task.cancel(false);
314 else
315 prestartCoreThread();
316 }
317 }
318
319 /**
320 * Cancels and clears the queue of all tasks that should not be run
321 * due to shutdown policy. Invoked within super.shutdown.
322 */
323 @Override void onShutdown() {
324 BlockingQueue<Runnable> q = super.getQueue();
325 boolean keepDelayed =
326 getExecuteExistingDelayedTasksAfterShutdownPolicy();
327 boolean keepPeriodic =
328 getContinueExistingPeriodicTasksAfterShutdownPolicy();
329 if (!keepDelayed && !keepPeriodic)
330 q.clear();
331 else {
332 // Traverse snapshot to avoid iterator exceptions
333 for (Object e : q.toArray()) {
334 if (e instanceof RunnableScheduledFuture) {
335 RunnableScheduledFuture<?> t =
336 (RunnableScheduledFuture<?>)e;
337 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
338 t.isCancelled()) { // also remove if already cancelled
339 if (q.remove(t))
340 t.cancel(false);
341 }
342 }
343 }
344 }
345 tryTerminate();
346 }
347
348 /**
349 * Modifies or replaces the task used to execute a runnable.
350 * This method can be used to override the concrete
351 * class used for managing internal tasks.
352 * The default implementation simply returns the given task.
353 *
354 * @param runnable the submitted Runnable
355 * @param task the task created to execute the runnable
356 * @return a task that can execute the runnable
357 * @since 1.6
358 */
359 protected <V> RunnableScheduledFuture<V> decorateTask(
360 Runnable runnable, RunnableScheduledFuture<V> task) {
361 return task;
362 }
363
364 /**
365 * Modifies or replaces the task used to execute a callable.
366 * This method can be used to override the concrete
367 * class used for managing internal tasks.
368 * The default implementation simply returns the given task.
369 *
370 * @param callable the submitted Callable
371 * @param task the task created to execute the callable
372 * @return a task that can execute the callable
373 * @since 1.6
374 */
375 protected <V> RunnableScheduledFuture<V> decorateTask(
376 Callable<V> callable, RunnableScheduledFuture<V> task) {
377 return task;
378 }
379
380 /**
381 * Creates a new {@code ScheduledThreadPoolExecutor} with the
382 * given core pool size.
383 *
384 * @param corePoolSize the number of threads to keep in the pool, even
385 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
386 * @throws IllegalArgumentException if {@code corePoolSize < 0}
387 */
388 public ScheduledThreadPoolExecutor(int corePoolSize) {
389 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
390 new DelayedWorkQueue());
391 }
392
393 /**
394 * Creates a new {@code ScheduledThreadPoolExecutor} with the
395 * given initial parameters.
396 *
397 * @param corePoolSize the number of threads to keep in the pool, even
398 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
399 * @param threadFactory the factory to use when the executor
400 * creates a new thread
401 * @throws IllegalArgumentException if {@code corePoolSize < 0}
402 * @throws NullPointerException if {@code threadFactory} is null
403 */
404 public ScheduledThreadPoolExecutor(int corePoolSize,
405 ThreadFactory threadFactory) {
406 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
407 new DelayedWorkQueue(), threadFactory);
408 }
409
410 /**
411 * Creates a new ScheduledThreadPoolExecutor with the given
412 * initial parameters.
413 *
414 * @param corePoolSize the number of threads to keep in the pool, even
415 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
416 * @param handler the handler to use when execution is blocked
417 * because the thread bounds and queue capacities are reached
418 * @throws IllegalArgumentException if {@code corePoolSize < 0}
419 * @throws NullPointerException if {@code handler} is null
420 */
421 public ScheduledThreadPoolExecutor(int corePoolSize,
422 RejectedExecutionHandler handler) {
423 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
424 new DelayedWorkQueue(), handler);
425 }
426
427 /**
428 * Creates a new ScheduledThreadPoolExecutor with the given
429 * initial parameters.
430 *
431 * @param corePoolSize the number of threads to keep in the pool, even
432 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
433 * @param threadFactory the factory to use when the executor
434 * creates a new thread
435 * @param handler the handler to use when execution is blocked
436 * because the thread bounds and queue capacities are reached
437 * @throws IllegalArgumentException if {@code corePoolSize < 0}
438 * @throws NullPointerException if {@code threadFactory} or
439 * {@code handler} is null
440 */
441 public ScheduledThreadPoolExecutor(int corePoolSize,
442 ThreadFactory threadFactory,
443 RejectedExecutionHandler handler) {
444 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
445 new DelayedWorkQueue(), threadFactory, handler);
446 }
447
448 /**
449 * @throws RejectedExecutionException {@inheritDoc}
450 * @throws NullPointerException {@inheritDoc}
451 */
452 public ScheduledFuture<?> schedule(Runnable command,
453 long delay,
454 TimeUnit unit) {
455 if (command == null || unit == null)
456 throw new NullPointerException();
457 if (delay < 0) delay = 0;
458 long triggerTime = now() + unit.toNanos(delay);
459 RunnableScheduledFuture<?> t = decorateTask(command,
460 new ScheduledFutureTask<Void>(command, null, triggerTime));
461 delayedExecute(t);
462 return t;
463 }
464
465 /**
466 * @throws RejectedExecutionException {@inheritDoc}
467 * @throws NullPointerException {@inheritDoc}
468 */
469 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
470 long delay,
471 TimeUnit unit) {
472 if (callable == null || unit == null)
473 throw new NullPointerException();
474 if (delay < 0) delay = 0;
475 long triggerTime = now() + unit.toNanos(delay);
476 RunnableScheduledFuture<V> t = decorateTask(callable,
477 new ScheduledFutureTask<V>(callable, triggerTime));
478 delayedExecute(t);
479 return t;
480 }
481
482 /**
483 * @throws RejectedExecutionException {@inheritDoc}
484 * @throws NullPointerException {@inheritDoc}
485 * @throws IllegalArgumentException {@inheritDoc}
486 */
487 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
488 long initialDelay,
489 long period,
490 TimeUnit unit) {
491 if (command == null || unit == null)
492 throw new NullPointerException();
493 if (period <= 0)
494 throw new IllegalArgumentException();
495 if (initialDelay < 0) initialDelay = 0;
496 long triggerTime = now() + unit.toNanos(initialDelay);
497 ScheduledFutureTask<Void> sft =
498 new ScheduledFutureTask<Void>(command,
499 null,
500 triggerTime,
501 unit.toNanos(period));
502 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
503 sft.outerTask = t;
504 delayedExecute(t);
505 return t;
506 }
507
508 /**
509 * @throws RejectedExecutionException {@inheritDoc}
510 * @throws NullPointerException {@inheritDoc}
511 * @throws IllegalArgumentException {@inheritDoc}
512 */
513 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
514 long initialDelay,
515 long delay,
516 TimeUnit unit) {
517 if (command == null || unit == null)
518 throw new NullPointerException();
519 if (delay <= 0)
520 throw new IllegalArgumentException();
521 if (initialDelay < 0) initialDelay = 0;
522 long triggerTime = now() + unit.toNanos(initialDelay);
523 ScheduledFutureTask<Void> sft =
524 new ScheduledFutureTask<Void>(command,
525 null,
526 triggerTime,
527 unit.toNanos(-delay));
528 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
529 sft.outerTask = t;
530 delayedExecute(t);
531 return t;
532 }
533
534 /**
535 * Executes {@code command} with zero required delay.
536 * This has effect equivalent to
537 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
538 * Note that inspections of the queue and of the list returned by
539 * {@code shutdownNow} will access the zero-delayed
540 * {@link ScheduledFuture}, not the {@code command} itself.
541 *
542 * <p>A consequence of the use of {@code ScheduledFuture} objects is
543 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
544 * called with a null second {@code Throwable} argument, even if the
545 * {@code command} terminated abruptly. Instead, the {@code Throwable}
546 * thrown by such a task can be obtained via {@link Future#get}.
547 *
548 * @throws RejectedExecutionException at discretion of
549 * {@code RejectedExecutionHandler}, if the task
550 * cannot be accepted for execution because the
551 * executor has been shut down
552 * @throws NullPointerException {@inheritDoc}
553 */
554 public void execute(Runnable command) {
555 schedule(command, 0, TimeUnit.NANOSECONDS);
556 }
557
558 // Override AbstractExecutorService methods
559
560 /**
561 * @throws RejectedExecutionException {@inheritDoc}
562 * @throws NullPointerException {@inheritDoc}
563 */
564 public Future<?> submit(Runnable task) {
565 return schedule(task, 0, TimeUnit.NANOSECONDS);
566 }
567
568 /**
569 * @throws RejectedExecutionException {@inheritDoc}
570 * @throws NullPointerException {@inheritDoc}
571 */
572 public <T> Future<T> submit(Runnable task, T result) {
573 return schedule(Executors.callable(task, result),
574 0, TimeUnit.NANOSECONDS);
575 }
576
577 /**
578 * @throws RejectedExecutionException {@inheritDoc}
579 * @throws NullPointerException {@inheritDoc}
580 */
581 public <T> Future<T> submit(Callable<T> task) {
582 return schedule(task, 0, TimeUnit.NANOSECONDS);
583 }
584
585 /**
586 * Sets the policy on whether to continue executing existing
587 * periodic tasks even when this executor has been {@code shutdown}.
588 * In this case, these tasks will only terminate upon
589 * {@code shutdownNow} or after setting the policy to
590 * {@code false} when already shutdown.
591 * This value is by default {@code false}.
592 *
593 * @param value if {@code true}, continue after shutdown, else don't.
594 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
595 */
596 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
597 continueExistingPeriodicTasksAfterShutdown = value;
598 if (!value && isShutdown())
599 onShutdown();
600 }
601
602 /**
603 * Gets the policy on whether to continue executing existing
604 * periodic tasks even when this executor has been {@code shutdown}.
605 * In this case, these tasks will only terminate upon
606 * {@code shutdownNow} or after setting the policy to
607 * {@code false} when already shutdown.
608 * This value is by default {@code false}.
609 *
610 * @return {@code true} if will continue after shutdown
611 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
612 */
613 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
614 return continueExistingPeriodicTasksAfterShutdown;
615 }
616
617 /**
618 * Sets the policy on whether to execute existing delayed
619 * tasks even when this executor has been {@code shutdown}.
620 * In this case, these tasks will only terminate upon
621 * {@code shutdownNow}, or after setting the policy to
622 * {@code false} when already shutdown.
623 * This value is by default {@code true}.
624 *
625 * @param value if {@code true}, execute after shutdown, else don't.
626 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
627 */
628 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
629 executeExistingDelayedTasksAfterShutdown = value;
630 if (!value && isShutdown())
631 onShutdown();
632 }
633
634 /**
635 * Gets the policy on whether to execute existing delayed
636 * tasks even when this executor has been {@code shutdown}.
637 * In this case, these tasks will only terminate upon
638 * {@code shutdownNow}, or after setting the policy to
639 * {@code false} when already shutdown.
640 * This value is by default {@code true}.
641 *
642 * @return {@code true} if will execute after shutdown
643 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
644 */
645 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
646 return executeExistingDelayedTasksAfterShutdown;
647 }
648
649 /**
650 * Sets the policy on whether cancellation of a task should remove
651 * it from the work queue. This value is by default {@code false}.
652 *
653 * @param value if {@code true}, remove on cancellation, else don't
654 * @see #getRemoveOnCancelPolicy
655 * @since 1.7
656 */
657 public void setRemoveOnCancelPolicy(boolean value) {
658 removeOnCancel = value;
659 }
660
661 /**
662 * Gets the policy on whether cancellation of a task should remove
663 * it from the work queue. This value is by default {@code false}.
664 *
665 * @return {@code true} if cancelled tasks are removed from the queue
666 * @see #setRemoveOnCancelPolicy
667 * @since 1.7
668 */
669 public boolean getRemoveOnCancelPolicy() {
670 return removeOnCancel;
671 }
672
673 /**
674 * Initiates an orderly shutdown in which previously submitted
675 * tasks are executed, but no new tasks will be accepted. If the
676 * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
677 * been set {@code false}, existing delayed tasks whose delays
678 * have not yet elapsed are cancelled. And unless the
679 * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
680 * been set {@code true}, future executions of existing periodic
681 * tasks will be cancelled.
682 *
683 * @throws SecurityException {@inheritDoc}
684 */
685 public void shutdown() {
686 super.shutdown();
687 }
688
689 /**
690 * Attempts to stop all actively executing tasks, halts the
691 * processing of waiting tasks, and returns a list of the tasks
692 * that were awaiting execution.
693 *
694 * <p>There are no guarantees beyond best-effort attempts to stop
695 * processing actively executing tasks. This implementation
696 * cancels tasks via {@link Thread#interrupt}, so any task that
697 * fails to respond to interrupts may never terminate.
698 *
699 * @return list of tasks that never commenced execution.
700 * Each element of this list is a {@link ScheduledFuture},
701 * including those tasks submitted using {@code execute},
702 * which are for scheduling purposes used as the basis of a
703 * zero-delay {@code ScheduledFuture}.
704 * @throws SecurityException {@inheritDoc}
705 */
706 public List<Runnable> shutdownNow() {
707 return super.shutdownNow();
708 }
709
710 /**
711 * Returns the task queue used by this executor. Each element of
712 * this queue is a {@link ScheduledFuture}, including those
713 * tasks submitted using {@code execute} which are for scheduling
714 * purposes used as the basis of a zero-delay
715 * {@code ScheduledFuture}. Iteration over this queue is
716 * <em>not</em> guaranteed to traverse tasks in the order in
717 * which they will execute.
718 *
719 * @return the task queue
720 */
721 public BlockingQueue<Runnable> getQueue() {
722 return super.getQueue();
723 }
724
725 /**
726 * Specialized delay queue. To mesh with TPE declarations, this
727 * class must be declared as a BlockingQueue<Runnable> even though
728 * it can only hold RunnableScheduledFutures.
729 */
730 static class DelayedWorkQueue extends AbstractQueue<Runnable>
731 implements BlockingQueue<Runnable> {
732
733 /*
734 * A DelayedWorkQueue is based on a heap-based data structure
735 * like those in DelayQueue and PriorityQueue, except that
736 * every ScheduledFutureTask also records its index into the
737 * heap array. This eliminates the need to find a task upon
738 * cancellation, greatly speeding up removal (down from O(n)
739 * to O(log n)), and reducing garbage retention that would
740 * otherwise occur by waiting for the element to rise to top
741 * before clearing. But because the queue may also hold
742 * RunnableScheduledFutures that are not ScheduledFutureTasks,
743 * we are not guaranteed to have such indices available, in
744 * which case we fall back to linear search. (We expect that
745 * most tasks will not be decorated, and that the faster cases
746 * will be much more common.)
747 *
748 * All heap operations must record index changes -- mainly
749 * within siftUp and siftDown. Upon removal, a task's
750 * heapIndex is set to -1. Note that ScheduledFutureTasks can
751 * appear at most once in the queue (this need not be true for
752 * other kinds of tasks or work queues), so are uniquely
753 * identified by heapIndex.
754 */
755
756 private static final int INITIAL_CAPACITY = 64;
757 private transient RunnableScheduledFuture[] queue =
758 new RunnableScheduledFuture[INITIAL_CAPACITY];
759 private transient final ReentrantLock lock = new ReentrantLock();
760 private transient final Condition available = lock.newCondition();
761 private int size = 0;
762
763
764 /**
765 * Set f's heapIndex if it is a ScheduledFutureTask.
766 */
767 private void setIndex(RunnableScheduledFuture f, int idx) {
768 if (f instanceof ScheduledFutureTask)
769 ((ScheduledFutureTask)f).heapIndex = idx;
770 }
771
772 /**
773 * Sift element added at bottom up to its heap-ordered spot.
774 * Call only when holding lock.
775 */
776 private void siftUp(int k, RunnableScheduledFuture key) {
777 while (k > 0) {
778 int parent = (k - 1) >>> 1;
779 RunnableScheduledFuture e = queue[parent];
780 if (key.compareTo(e) >= 0)
781 break;
782 queue[k] = e;
783 setIndex(e, k);
784 k = parent;
785 }
786 queue[k] = key;
787 setIndex(key, k);
788 }
789
790 /**
791 * Sift element added at top down to its heap-ordered spot.
792 * Call only when holding lock.
793 */
794 private void siftDown(int k, RunnableScheduledFuture key) {
795 int half = size >>> 1;
796 while (k < half) {
797 int child = (k << 1) + 1;
798 RunnableScheduledFuture c = queue[child];
799 int right = child + 1;
800 if (right < size && c.compareTo(queue[right]) > 0)
801 c = queue[child = right];
802 if (key.compareTo(c) <= 0)
803 break;
804 queue[k] = c;
805 setIndex(c, k);
806 k = child;
807 }
808 queue[k] = key;
809 setIndex(key, k);
810 }
811
812 /**
813 * Performs common bookkeeping for poll and take: Replaces
814 * first element with last; sifts it down, and signals any
815 * waiting consumers. Call only when holding lock.
816 * @param f the task to remove and return
817 */
818 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
819 int s = --size;
820 RunnableScheduledFuture x = queue[s];
821 queue[s] = null;
822 if (s != 0) {
823 siftDown(0, x);
824 available.signalAll();
825 }
826 setIndex(f, -1);
827 return f;
828 }
829
830 /**
831 * Resize the heap array. Call only when holding lock.
832 */
833 private void grow() {
834 int oldCapacity = queue.length;
835 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
836 if (newCapacity < 0) // overflow
837 newCapacity = Integer.MAX_VALUE;
838 queue = Arrays.copyOf(queue, newCapacity);
839 }
840
841 /**
842 * Find index of given object, or -1 if absent
843 */
844 private int indexOf(Object x) {
845 if (x != null) {
846 if (x instanceof ScheduledFutureTask) {
847 int i = ((ScheduledFutureTask) x).heapIndex;
848 // Sanity check; x could conceivably be a
849 // ScheduledFutureTask from some other pool.
850 if (i >= 0 && i < size && queue[i] == x)
851 return i;
852 } else {
853 for (int i = 0; i < size; i++)
854 if (x.equals(queue[i]))
855 return i;
856 }
857 }
858 return -1;
859 }
860
861 public boolean contains(Object x) {
862 final ReentrantLock lock = this.lock;
863 lock.lock();
864 try {
865 return indexOf(x) != -1;
866 } finally {
867 lock.unlock();
868 }
869 }
870
871 public boolean remove(Object x) {
872 final ReentrantLock lock = this.lock;
873 lock.lock();
874 try {
875 int i = indexOf(x);
876 if (i < 0)
877 return false;
878
879 setIndex(queue[i], -1);
880 int s = --size;
881 RunnableScheduledFuture replacement = queue[s];
882 queue[s] = null;
883 if (s != i) {
884 siftDown(i, replacement);
885 if (queue[i] == replacement)
886 siftUp(i, replacement);
887 }
888 return true;
889 } finally {
890 lock.unlock();
891 }
892 }
893
894 public int size() {
895 final ReentrantLock lock = this.lock;
896 lock.lock();
897 try {
898 return size;
899 } finally {
900 lock.unlock();
901 }
902 }
903
904 public boolean isEmpty() {
905 return size() == 0;
906 }
907
908 public int remainingCapacity() {
909 return Integer.MAX_VALUE;
910 }
911
912 public RunnableScheduledFuture peek() {
913 final ReentrantLock lock = this.lock;
914 lock.lock();
915 try {
916 return queue[0];
917 } finally {
918 lock.unlock();
919 }
920 }
921
922 public boolean offer(Runnable x) {
923 if (x == null)
924 throw new NullPointerException();
925 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
926 final ReentrantLock lock = this.lock;
927 lock.lock();
928 try {
929 int i = size;
930 if (i >= queue.length)
931 grow();
932 size = i + 1;
933 if (i == 0) {
934 queue[0] = e;
935 setIndex(e, 0);
936 } else {
937 siftUp(i, e);
938 }
939 if (queue[0] == e)
940 available.signalAll();
941 } finally {
942 lock.unlock();
943 }
944 return true;
945 }
946
947 public void put(Runnable e) {
948 offer(e);
949 }
950
951 public boolean add(Runnable e) {
952 return offer(e);
953 }
954
955 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
956 return offer(e);
957 }
958
959 public RunnableScheduledFuture poll() {
960 final ReentrantLock lock = this.lock;
961 lock.lock();
962 try {
963 RunnableScheduledFuture first = queue[0];
964 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
965 return null;
966 else
967 return finishPoll(first);
968 } finally {
969 lock.unlock();
970 }
971 }
972
973 public RunnableScheduledFuture take() throws InterruptedException {
974 final ReentrantLock lock = this.lock;
975 lock.lockInterruptibly();
976 try {
977 for (;;) {
978 RunnableScheduledFuture first = queue[0];
979 if (first == null)
980 available.await();
981 else {
982 long delay = first.getDelay(TimeUnit.NANOSECONDS);
983 if (delay > 0)
984 available.awaitNanos(delay);
985 else
986 return finishPoll(first);
987 }
988 }
989 } finally {
990 lock.unlock();
991 }
992 }
993
994 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
995 throws InterruptedException {
996 long nanos = unit.toNanos(timeout);
997 final ReentrantLock lock = this.lock;
998 lock.lockInterruptibly();
999 try {
1000 for (;;) {
1001 RunnableScheduledFuture first = queue[0];
1002 if (first == null) {
1003 if (nanos <= 0)
1004 return null;
1005 else
1006 nanos = available.awaitNanos(nanos);
1007 } else {
1008 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1009 if (delay > 0) {
1010 if (nanos <= 0)
1011 return null;
1012 if (delay > nanos)
1013 delay = nanos;
1014 long timeLeft = available.awaitNanos(delay);
1015 nanos -= delay - timeLeft;
1016 } else
1017 return finishPoll(first);
1018 }
1019 }
1020 } finally {
1021 lock.unlock();
1022 }
1023 }
1024
1025 public void clear() {
1026 final ReentrantLock lock = this.lock;
1027 lock.lock();
1028 try {
1029 for (int i = 0; i < size; i++) {
1030 RunnableScheduledFuture t = queue[i];
1031 if (t != null) {
1032 queue[i] = null;
1033 setIndex(t, -1);
1034 }
1035 }
1036 size = 0;
1037 } finally {
1038 lock.unlock();
1039 }
1040 }
1041
1042 /**
1043 * Return and remove first element only if it is expired.
1044 * Used only by drainTo. Call only when holding lock.
1045 */
1046 private RunnableScheduledFuture pollExpired() {
1047 RunnableScheduledFuture first = queue[0];
1048 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1049 return null;
1050 setIndex(first, -1);
1051 int s = --size;
1052 RunnableScheduledFuture x = queue[s];
1053 queue[s] = null;
1054 if (s != 0)
1055 siftDown(0, x);
1056 return first;
1057 }
1058
1059 public int drainTo(Collection<? super Runnable> c) {
1060 if (c == null)
1061 throw new NullPointerException();
1062 if (c == this)
1063 throw new IllegalArgumentException();
1064 final ReentrantLock lock = this.lock;
1065 lock.lock();
1066 try {
1067 RunnableScheduledFuture first;
1068 int n = 0;
1069 while ((first = pollExpired()) != null) {
1070 c.add(first);
1071 ++n;
1072 }
1073 return n;
1074 } finally {
1075 lock.unlock();
1076 }
1077 }
1078
1079 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1080 if (c == null)
1081 throw new NullPointerException();
1082 if (c == this)
1083 throw new IllegalArgumentException();
1084 if (maxElements <= 0)
1085 return 0;
1086 final ReentrantLock lock = this.lock;
1087 lock.lock();
1088 try {
1089 RunnableScheduledFuture first;
1090 int n = 0;
1091 while (n < maxElements && (first = pollExpired()) != null) {
1092 c.add(first);
1093 ++n;
1094 }
1095 return n;
1096 } finally {
1097 lock.unlock();
1098 }
1099 }
1100
1101 public Object[] toArray() {
1102 final ReentrantLock lock = this.lock;
1103 lock.lock();
1104 try {
1105 return Arrays.copyOf(queue, size, Object[].class);
1106 } finally {
1107 lock.unlock();
1108 }
1109 }
1110
1111 @SuppressWarnings("unchecked")
1112 public <T> T[] toArray(T[] a) {
1113 final ReentrantLock lock = this.lock;
1114 lock.lock();
1115 try {
1116 if (a.length < size)
1117 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1118 System.arraycopy(queue, 0, a, 0, size);
1119 if (a.length > size)
1120 a[size] = null;
1121 return a;
1122 } finally {
1123 lock.unlock();
1124 }
1125 }
1126
1127 public Iterator<Runnable> iterator() {
1128 return new Itr(Arrays.copyOf(queue, size));
1129 }
1130
1131 /**
1132 * Snapshot iterator that works off copy of underlying q array.
1133 */
1134 private class Itr implements Iterator<Runnable> {
1135 final RunnableScheduledFuture[] array;
1136 int cursor = 0; // index of next element to return
1137 int lastRet = -1; // index of last element, or -1 if no such
1138
1139 Itr(RunnableScheduledFuture[] array) {
1140 this.array = array;
1141 }
1142
1143 public boolean hasNext() {
1144 return cursor < array.length;
1145 }
1146
1147 public Runnable next() {
1148 if (cursor >= array.length)
1149 throw new NoSuchElementException();
1150 lastRet = cursor;
1151 return array[cursor++];
1152 }
1153
1154 public void remove() {
1155 if (lastRet < 0)
1156 throw new IllegalStateException();
1157 DelayedWorkQueue.this.remove(array[lastRet]);
1158 lastRet = -1;
1159 }
1160 }
1161 }
1162 }