ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.55
Committed: Tue Sep 7 06:28:36 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.54: +2 -2 lines
Log Message:
<code> => @code

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p>Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>When a submitted task is cancelled before it is run, execution
27 * is suppressed. By default, such a cancelled task is not
28 * automatically removed from the work queue until its delay
29 * elapses. While this enables further inspection and monitoring, it
30 * may also cause unbounded retention of cancelled tasks. To avoid
31 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32 * causes tasks to be immediately removed from the work queue at
33 * time of cancellation.
34 *
35 * <p>Successive executions of a task scheduled via
36 * {@code scheduleAtFixedRate} or
37 * {@code scheduleWithFixedDelay} do not overlap. While different
38 * executions may be performed by different threads, the effects of
39 * prior executions <a
40 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
41 * those of subsequent ones.
42 *
43 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 * of the inherited tuning methods are not useful for it. In
45 * particular, because it acts as a fixed-sized pool using
46 * {@code corePoolSize} threads and an unbounded queue, adjustments
47 * to {@code maximumPoolSize} have no useful effect. Additionally, it
48 * is almost never a good idea to set {@code corePoolSize} to zero or
49 * use {@code allowCoreThreadTimeOut} because this may leave the pool
50 * without threads to handle tasks once they become eligible to run.
51 *
52 * <p><b>Extension notes:</b> This class overrides the
53 * {@link ThreadPoolExecutor#execute execute} and
54 * {@link AbstractExecutorService#submit(Runnable) submit}
55 * methods to generate internal {@link ScheduledFuture} objects to
56 * control per-task delays and scheduling. To preserve
57 * functionality, any further overrides of these methods in
58 * subclasses must invoke superclass versions, which effectively
59 * disables additional task customization. However, this class
60 * provides alternative protected extension method
61 * {@code decorateTask} (one version each for {@code Runnable} and
62 * {@code Callable}) that can be used to customize the concrete task
63 * types used to execute commands entered via {@code execute},
64 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
65 * and {@code scheduleWithFixedDelay}. By default, a
66 * {@code ScheduledThreadPoolExecutor} uses a task type extending
67 * {@link FutureTask}. However, this may be modified or replaced using
68 * subclasses of the form:
69 *
70 * <pre> {@code
71 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
72 *
73 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
74 *
75 * protected <V> RunnableScheduledFuture<V> decorateTask(
76 * Runnable r, RunnableScheduledFuture<V> task) {
77 * return new CustomTask<V>(r, task);
78 * }
79 *
80 * protected <V> RunnableScheduledFuture<V> decorateTask(
81 * Callable<V> c, RunnableScheduledFuture<V> task) {
82 * return new CustomTask<V>(c, task);
83 * }
84 * // ... add constructors, etc.
85 * }}</pre>
86 *
87 * @since 1.5
88 * @author Doug Lea
89 */
90 public class ScheduledThreadPoolExecutor
91 extends ThreadPoolExecutor
92 implements ScheduledExecutorService {
93
94 /*
95 * This class specializes ThreadPoolExecutor implementation by
96 *
97 * 1. Using a custom task type, ScheduledFutureTask for
98 * tasks, even those that don't require scheduling (i.e.,
99 * those submitted using ExecutorService execute, not
100 * ScheduledExecutorService methods) which are treated as
101 * delayed tasks with a delay of zero.
102 *
103 * 2. Using a custom queue (DelayedWorkQueue), a variant of
104 * unbounded DelayQueue. The lack of capacity constraint and
105 * the fact that corePoolSize and maximumPoolSize are
106 * effectively identical simplifies some execution mechanics
107 * (see delayedExecute) compared to ThreadPoolExecutor.
108 *
109 * 3. Supporting optional run-after-shutdown parameters, which
110 * leads to overrides of shutdown methods to remove and cancel
111 * tasks that should NOT be run after shutdown, as well as
112 * different recheck logic when task (re)submission overlaps
113 * with a shutdown.
114 *
115 * 4. Task decoration methods to allow interception and
116 * instrumentation, which are needed because subclasses cannot
117 * otherwise override submit methods to get this effect. These
118 * don't have any impact on pool control logic though.
119 */
120
121 /**
122 * False if should cancel/suppress periodic tasks on shutdown.
123 */
124 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
125
126 /**
127 * False if should cancel non-periodic tasks on shutdown.
128 */
129 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
130
131 /**
132 * True if ScheduledFutureTask.cancel should remove from queue
133 */
134 private volatile boolean removeOnCancel = false;
135
136 /**
137 * Sequence number to break scheduling ties, and in turn to
138 * guarantee FIFO order among tied entries.
139 */
140 private static final AtomicLong sequencer = new AtomicLong(0);
141
142 /**
143 * Returns current nanosecond time.
144 */
145 final long now() {
146 return System.nanoTime();
147 }
148
149 private class ScheduledFutureTask<V>
150 extends FutureTask<V> implements RunnableScheduledFuture<V> {
151
152 /** Sequence number to break ties FIFO */
153 private final long sequenceNumber;
154
155 /** The time the task is enabled to execute in nanoTime units */
156 private long time;
157
158 /**
159 * Period in nanoseconds for repeating tasks. A positive
160 * value indicates fixed-rate execution. A negative value
161 * indicates fixed-delay execution. A value of 0 indicates a
162 * non-repeating task.
163 */
164 private final long period;
165
166 /** The actual task to be re-enqueued by reExecutePeriodic */
167 RunnableScheduledFuture<V> outerTask = this;
168
169 /**
170 * Index into delay queue, to support faster cancellation.
171 */
172 int heapIndex;
173
174 /**
175 * Creates a one-shot action with given nanoTime-based trigger time.
176 */
177 ScheduledFutureTask(Runnable r, V result, long ns) {
178 super(r, result);
179 this.time = ns;
180 this.period = 0;
181 this.sequenceNumber = sequencer.getAndIncrement();
182 }
183
184 /**
185 * Creates a periodic action with given nano time and period.
186 */
187 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
188 super(r, result);
189 this.time = ns;
190 this.period = period;
191 this.sequenceNumber = sequencer.getAndIncrement();
192 }
193
194 /**
195 * Creates a one-shot action with given nanoTime-based trigger.
196 */
197 ScheduledFutureTask(Callable<V> callable, long ns) {
198 super(callable);
199 this.time = ns;
200 this.period = 0;
201 this.sequenceNumber = sequencer.getAndIncrement();
202 }
203
204 public long getDelay(TimeUnit unit) {
205 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
206 }
207
208 public int compareTo(Delayed other) {
209 if (other == this) // compare zero ONLY if same object
210 return 0;
211 if (other instanceof ScheduledFutureTask) {
212 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
213 long diff = time - x.time;
214 if (diff < 0)
215 return -1;
216 else if (diff > 0)
217 return 1;
218 else if (sequenceNumber < x.sequenceNumber)
219 return -1;
220 else
221 return 1;
222 }
223 long d = (getDelay(TimeUnit.NANOSECONDS) -
224 other.getDelay(TimeUnit.NANOSECONDS));
225 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
226 }
227
228 /**
229 * Returns true if this is a periodic (not a one-shot) action.
230 *
231 * @return true if periodic
232 */
233 public boolean isPeriodic() {
234 return period != 0;
235 }
236
237 /**
238 * Sets the next time to run for a periodic task.
239 */
240 private void setNextRunTime() {
241 long p = period;
242 if (p > 0)
243 time += p;
244 else
245 time = triggerTime(-p);
246 }
247
248 public boolean cancel(boolean mayInterruptIfRunning) {
249 boolean cancelled = super.cancel(mayInterruptIfRunning);
250 if (cancelled && removeOnCancel && heapIndex >= 0)
251 remove(this);
252 return cancelled;
253 }
254
255 /**
256 * Overrides FutureTask version so as to reset/requeue if periodic.
257 */
258 public void run() {
259 boolean periodic = isPeriodic();
260 if (!canRunInCurrentRunState(periodic))
261 cancel(false);
262 else if (!periodic)
263 ScheduledFutureTask.super.run();
264 else if (ScheduledFutureTask.super.runAndReset()) {
265 setNextRunTime();
266 reExecutePeriodic(outerTask);
267 }
268 }
269 }
270
271 /**
272 * Returns true if can run a task given current run state
273 * and run-after-shutdown parameters.
274 *
275 * @param periodic true if this task periodic, false if delayed
276 */
277 boolean canRunInCurrentRunState(boolean periodic) {
278 return isRunningOrShutdown(periodic ?
279 continueExistingPeriodicTasksAfterShutdown :
280 executeExistingDelayedTasksAfterShutdown);
281 }
282
283 /**
284 * Main execution method for delayed or periodic tasks. If pool
285 * is shut down, rejects the task. Otherwise adds task to queue
286 * and starts a thread, if necessary, to run it. (We cannot
287 * prestart the thread to run the task because the task (probably)
288 * shouldn't be run yet,) If the pool is shut down while the task
289 * is being added, cancel and remove it if required by state and
290 * run-after-shutdown parameters.
291 *
292 * @param task the task
293 */
294 private void delayedExecute(RunnableScheduledFuture<?> task) {
295 if (isShutdown())
296 reject(task);
297 else {
298 super.getQueue().add(task);
299 if (isShutdown() &&
300 !canRunInCurrentRunState(task.isPeriodic()) &&
301 remove(task))
302 task.cancel(false);
303 else
304 prestartCoreThread();
305 }
306 }
307
308 /**
309 * Requeues a periodic task unless current run state precludes it.
310 * Same idea as delayedExecute except drops task rather than rejecting.
311 *
312 * @param task the task
313 */
314 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
315 if (canRunInCurrentRunState(true)) {
316 super.getQueue().add(task);
317 if (!canRunInCurrentRunState(true) && remove(task))
318 task.cancel(false);
319 else
320 prestartCoreThread();
321 }
322 }
323
324 /**
325 * Cancels and clears the queue of all tasks that should not be run
326 * due to shutdown policy. Invoked within super.shutdown.
327 */
328 @Override void onShutdown() {
329 BlockingQueue<Runnable> q = super.getQueue();
330 boolean keepDelayed =
331 getExecuteExistingDelayedTasksAfterShutdownPolicy();
332 boolean keepPeriodic =
333 getContinueExistingPeriodicTasksAfterShutdownPolicy();
334 if (!keepDelayed && !keepPeriodic)
335 q.clear();
336 else {
337 // Traverse snapshot to avoid iterator exceptions
338 for (Object e : q.toArray()) {
339 if (e instanceof RunnableScheduledFuture) {
340 RunnableScheduledFuture<?> t =
341 (RunnableScheduledFuture<?>)e;
342 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
343 t.isCancelled()) { // also remove if already cancelled
344 if (q.remove(t))
345 t.cancel(false);
346 }
347 }
348 }
349 }
350 tryTerminate();
351 }
352
353 /**
354 * Modifies or replaces the task used to execute a runnable.
355 * This method can be used to override the concrete
356 * class used for managing internal tasks.
357 * The default implementation simply returns the given task.
358 *
359 * @param runnable the submitted Runnable
360 * @param task the task created to execute the runnable
361 * @return a task that can execute the runnable
362 * @since 1.6
363 */
364 protected <V> RunnableScheduledFuture<V> decorateTask(
365 Runnable runnable, RunnableScheduledFuture<V> task) {
366 return task;
367 }
368
369 /**
370 * Modifies or replaces the task used to execute a callable.
371 * This method can be used to override the concrete
372 * class used for managing internal tasks.
373 * The default implementation simply returns the given task.
374 *
375 * @param callable the submitted Callable
376 * @param task the task created to execute the callable
377 * @return a task that can execute the callable
378 * @since 1.6
379 */
380 protected <V> RunnableScheduledFuture<V> decorateTask(
381 Callable<V> callable, RunnableScheduledFuture<V> task) {
382 return task;
383 }
384
385 /**
386 * Creates a new {@code ScheduledThreadPoolExecutor} with the
387 * given core pool size.
388 *
389 * @param corePoolSize the number of threads to keep in the pool, even
390 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
391 * @throws IllegalArgumentException if {@code corePoolSize < 0}
392 */
393 public ScheduledThreadPoolExecutor(int corePoolSize) {
394 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
395 new DelayedWorkQueue());
396 }
397
398 /**
399 * Creates a new {@code ScheduledThreadPoolExecutor} with the
400 * given initial parameters.
401 *
402 * @param corePoolSize the number of threads to keep in the pool, even
403 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
404 * @param threadFactory the factory to use when the executor
405 * creates a new thread
406 * @throws IllegalArgumentException if {@code corePoolSize < 0}
407 * @throws NullPointerException if {@code threadFactory} is null
408 */
409 public ScheduledThreadPoolExecutor(int corePoolSize,
410 ThreadFactory threadFactory) {
411 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
412 new DelayedWorkQueue(), threadFactory);
413 }
414
415 /**
416 * Creates a new ScheduledThreadPoolExecutor with the given
417 * initial parameters.
418 *
419 * @param corePoolSize the number of threads to keep in the pool, even
420 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
421 * @param handler the handler to use when execution is blocked
422 * because the thread bounds and queue capacities are reached
423 * @throws IllegalArgumentException if {@code corePoolSize < 0}
424 * @throws NullPointerException if {@code handler} is null
425 */
426 public ScheduledThreadPoolExecutor(int corePoolSize,
427 RejectedExecutionHandler handler) {
428 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
429 new DelayedWorkQueue(), handler);
430 }
431
432 /**
433 * Creates a new ScheduledThreadPoolExecutor with the given
434 * initial parameters.
435 *
436 * @param corePoolSize the number of threads to keep in the pool, even
437 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
438 * @param threadFactory the factory to use when the executor
439 * creates a new thread
440 * @param handler the handler to use when execution is blocked
441 * because the thread bounds and queue capacities are reached
442 * @throws IllegalArgumentException if {@code corePoolSize < 0}
443 * @throws NullPointerException if {@code threadFactory} or
444 * {@code handler} is null
445 */
446 public ScheduledThreadPoolExecutor(int corePoolSize,
447 ThreadFactory threadFactory,
448 RejectedExecutionHandler handler) {
449 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
450 new DelayedWorkQueue(), threadFactory, handler);
451 }
452
453 /**
454 * Returns the trigger time of a delayed action.
455 */
456 private long triggerTime(long delay, TimeUnit unit) {
457 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
458 }
459
460 /**
461 * Returns the trigger time of a delayed action.
462 */
463 long triggerTime(long delay) {
464 return now() +
465 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
466 }
467
468 /**
469 * Constrains the values of all delays in the queue to be within
470 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
471 * This may occur if a task is eligible to be dequeued, but has
472 * not yet been, while some other task is added with a delay of
473 * Long.MAX_VALUE.
474 */
475 private long overflowFree(long delay) {
476 Delayed head = (Delayed) super.getQueue().peek();
477 if (head != null) {
478 long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
479 if (headDelay < 0 && (delay - headDelay < 0))
480 delay = Long.MAX_VALUE + headDelay;
481 }
482 return delay;
483 }
484
485 /**
486 * @throws RejectedExecutionException {@inheritDoc}
487 * @throws NullPointerException {@inheritDoc}
488 */
489 public ScheduledFuture<?> schedule(Runnable command,
490 long delay,
491 TimeUnit unit) {
492 if (command == null || unit == null)
493 throw new NullPointerException();
494 RunnableScheduledFuture<?> t = decorateTask(command,
495 new ScheduledFutureTask<Void>(command, null,
496 triggerTime(delay, unit)));
497 delayedExecute(t);
498 return t;
499 }
500
501 /**
502 * @throws RejectedExecutionException {@inheritDoc}
503 * @throws NullPointerException {@inheritDoc}
504 */
505 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
506 long delay,
507 TimeUnit unit) {
508 if (callable == null || unit == null)
509 throw new NullPointerException();
510 RunnableScheduledFuture<V> t = decorateTask(callable,
511 new ScheduledFutureTask<V>(callable,
512 triggerTime(delay, unit)));
513 delayedExecute(t);
514 return t;
515 }
516
517 /**
518 * @throws RejectedExecutionException {@inheritDoc}
519 * @throws NullPointerException {@inheritDoc}
520 * @throws IllegalArgumentException {@inheritDoc}
521 */
522 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
523 long initialDelay,
524 long period,
525 TimeUnit unit) {
526 if (command == null || unit == null)
527 throw new NullPointerException();
528 if (period <= 0)
529 throw new IllegalArgumentException();
530 ScheduledFutureTask<Void> sft =
531 new ScheduledFutureTask<Void>(command,
532 null,
533 triggerTime(initialDelay, unit),
534 unit.toNanos(period));
535 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
536 sft.outerTask = t;
537 delayedExecute(t);
538 return t;
539 }
540
541 /**
542 * @throws RejectedExecutionException {@inheritDoc}
543 * @throws NullPointerException {@inheritDoc}
544 * @throws IllegalArgumentException {@inheritDoc}
545 */
546 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
547 long initialDelay,
548 long delay,
549 TimeUnit unit) {
550 if (command == null || unit == null)
551 throw new NullPointerException();
552 if (delay <= 0)
553 throw new IllegalArgumentException();
554 ScheduledFutureTask<Void> sft =
555 new ScheduledFutureTask<Void>(command,
556 null,
557 triggerTime(initialDelay, unit),
558 unit.toNanos(-delay));
559 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
560 sft.outerTask = t;
561 delayedExecute(t);
562 return t;
563 }
564
565 /**
566 * Executes {@code command} with zero required delay.
567 * This has effect equivalent to
568 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
569 * Note that inspections of the queue and of the list returned by
570 * {@code shutdownNow} will access the zero-delayed
571 * {@link ScheduledFuture}, not the {@code command} itself.
572 *
573 * <p>A consequence of the use of {@code ScheduledFuture} objects is
574 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
575 * called with a null second {@code Throwable} argument, even if the
576 * {@code command} terminated abruptly. Instead, the {@code Throwable}
577 * thrown by such a task can be obtained via {@link Future#get}.
578 *
579 * @throws RejectedExecutionException at discretion of
580 * {@code RejectedExecutionHandler}, if the task
581 * cannot be accepted for execution because the
582 * executor has been shut down
583 * @throws NullPointerException {@inheritDoc}
584 */
585 public void execute(Runnable command) {
586 schedule(command, 0, TimeUnit.NANOSECONDS);
587 }
588
589 // Override AbstractExecutorService methods
590
591 /**
592 * @throws RejectedExecutionException {@inheritDoc}
593 * @throws NullPointerException {@inheritDoc}
594 */
595 public Future<?> submit(Runnable task) {
596 return schedule(task, 0, TimeUnit.NANOSECONDS);
597 }
598
599 /**
600 * @throws RejectedExecutionException {@inheritDoc}
601 * @throws NullPointerException {@inheritDoc}
602 */
603 public <T> Future<T> submit(Runnable task, T result) {
604 return schedule(Executors.callable(task, result),
605 0, TimeUnit.NANOSECONDS);
606 }
607
608 /**
609 * @throws RejectedExecutionException {@inheritDoc}
610 * @throws NullPointerException {@inheritDoc}
611 */
612 public <T> Future<T> submit(Callable<T> task) {
613 return schedule(task, 0, TimeUnit.NANOSECONDS);
614 }
615
616 /**
617 * Sets the policy on whether to continue executing existing
618 * periodic tasks even when this executor has been {@code shutdown}.
619 * In this case, these tasks will only terminate upon
620 * {@code shutdownNow} or after setting the policy to
621 * {@code false} when already shutdown.
622 * This value is by default {@code false}.
623 *
624 * @param value if {@code true}, continue after shutdown, else don't.
625 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
626 */
627 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
628 continueExistingPeriodicTasksAfterShutdown = value;
629 if (!value && isShutdown())
630 onShutdown();
631 }
632
633 /**
634 * Gets the policy on whether to continue executing existing
635 * periodic tasks even when this executor has been {@code shutdown}.
636 * In this case, these tasks will only terminate upon
637 * {@code shutdownNow} or after setting the policy to
638 * {@code false} when already shutdown.
639 * This value is by default {@code false}.
640 *
641 * @return {@code true} if will continue after shutdown
642 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
643 */
644 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
645 return continueExistingPeriodicTasksAfterShutdown;
646 }
647
648 /**
649 * Sets the policy on whether to execute existing delayed
650 * tasks even when this executor has been {@code shutdown}.
651 * In this case, these tasks will only terminate upon
652 * {@code shutdownNow}, or after setting the policy to
653 * {@code false} when already shutdown.
654 * This value is by default {@code true}.
655 *
656 * @param value if {@code true}, execute after shutdown, else don't.
657 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
658 */
659 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
660 executeExistingDelayedTasksAfterShutdown = value;
661 if (!value && isShutdown())
662 onShutdown();
663 }
664
665 /**
666 * Gets the policy on whether to execute existing delayed
667 * tasks even when this executor has been {@code shutdown}.
668 * In this case, these tasks will only terminate upon
669 * {@code shutdownNow}, or after setting the policy to
670 * {@code false} when already shutdown.
671 * This value is by default {@code true}.
672 *
673 * @return {@code true} if will execute after shutdown
674 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
675 */
676 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
677 return executeExistingDelayedTasksAfterShutdown;
678 }
679
680 /**
681 * Sets the policy on whether cancelled tasks should be immediately
682 * removed from the work queue at time of cancellation. This value is
683 * by default {@code false}.
684 *
685 * @param value if {@code true}, remove on cancellation, else don't
686 * @see #getRemoveOnCancelPolicy
687 * @since 1.7
688 */
689 public void setRemoveOnCancelPolicy(boolean value) {
690 removeOnCancel = value;
691 }
692
693 /**
694 * Gets the policy on whether cancelled tasks should be immediately
695 * removed from the work queue at time of cancellation. This value is
696 * by default {@code false}.
697 *
698 * @return {@code true} if cancelled tasks are immediately removed
699 * from the queue
700 * @see #setRemoveOnCancelPolicy
701 * @since 1.7
702 */
703 public boolean getRemoveOnCancelPolicy() {
704 return removeOnCancel;
705 }
706
707 /**
708 * Initiates an orderly shutdown in which previously submitted
709 * tasks are executed, but no new tasks will be accepted.
710 * Invocation has no additional effect if already shut down.
711 *
712 * <p>This method does not wait for previously submitted tasks to
713 * complete execution. Use {@link #awaitTermination awaitTermination}
714 * to do that.
715 *
716 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
717 * has been set {@code false}, existing delayed tasks whose delays
718 * have not yet elapsed are cancelled. And unless the {@code
719 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
720 * {@code true}, future executions of existing periodic tasks will
721 * be cancelled.
722 *
723 * @throws SecurityException {@inheritDoc}
724 */
725 public void shutdown() {
726 super.shutdown();
727 }
728
729 /**
730 * Attempts to stop all actively executing tasks, halts the
731 * processing of waiting tasks, and returns a list of the tasks
732 * that were awaiting execution.
733 *
734 * <p>This method does not wait for actively executing tasks to
735 * terminate. Use {@link #awaitTermination awaitTermination} to
736 * do that.
737 *
738 * <p>There are no guarantees beyond best-effort attempts to stop
739 * processing actively executing tasks. This implementation
740 * cancels tasks via {@link Thread#interrupt}, so any task that
741 * fails to respond to interrupts may never terminate.
742 *
743 * @return list of tasks that never commenced execution.
744 * Each element of this list is a {@link ScheduledFuture},
745 * including those tasks submitted using {@code execute},
746 * which are for scheduling purposes used as the basis of a
747 * zero-delay {@code ScheduledFuture}.
748 * @throws SecurityException {@inheritDoc}
749 */
750 public List<Runnable> shutdownNow() {
751 return super.shutdownNow();
752 }
753
754 /**
755 * Returns the task queue used by this executor. Each element of
756 * this queue is a {@link ScheduledFuture}, including those
757 * tasks submitted using {@code execute} which are for scheduling
758 * purposes used as the basis of a zero-delay
759 * {@code ScheduledFuture}. Iteration over this queue is
760 * <em>not</em> guaranteed to traverse tasks in the order in
761 * which they will execute.
762 *
763 * @return the task queue
764 */
765 public BlockingQueue<Runnable> getQueue() {
766 return super.getQueue();
767 }
768
769 /**
770 * Specialized delay queue. To mesh with TPE declarations, this
771 * class must be declared as a BlockingQueue<Runnable> even though
772 * it can only hold RunnableScheduledFutures.
773 */
774 static class DelayedWorkQueue extends AbstractQueue<Runnable>
775 implements BlockingQueue<Runnable> {
776
777 /*
778 * A DelayedWorkQueue is based on a heap-based data structure
779 * like those in DelayQueue and PriorityQueue, except that
780 * every ScheduledFutureTask also records its index into the
781 * heap array. This eliminates the need to find a task upon
782 * cancellation, greatly speeding up removal (down from O(n)
783 * to O(log n)), and reducing garbage retention that would
784 * otherwise occur by waiting for the element to rise to top
785 * before clearing. But because the queue may also hold
786 * RunnableScheduledFutures that are not ScheduledFutureTasks,
787 * we are not guaranteed to have such indices available, in
788 * which case we fall back to linear search. (We expect that
789 * most tasks will not be decorated, and that the faster cases
790 * will be much more common.)
791 *
792 * All heap operations must record index changes -- mainly
793 * within siftUp and siftDown. Upon removal, a task's
794 * heapIndex is set to -1. Note that ScheduledFutureTasks can
795 * appear at most once in the queue (this need not be true for
796 * other kinds of tasks or work queues), so are uniquely
797 * identified by heapIndex.
798 */
799
800 private static final int INITIAL_CAPACITY = 16;
801 private RunnableScheduledFuture[] queue =
802 new RunnableScheduledFuture[INITIAL_CAPACITY];
803 private final ReentrantLock lock = new ReentrantLock();
804 private int size = 0;
805
806 /**
807 * Thread designated to wait for the task at the head of the
808 * queue. This variant of the Leader-Follower pattern
809 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
810 * minimize unnecessary timed waiting. When a thread becomes
811 * the leader, it waits only for the next delay to elapse, but
812 * other threads await indefinitely. The leader thread must
813 * signal some other thread before returning from take() or
814 * poll(...), unless some other thread becomes leader in the
815 * interim. Whenever the head of the queue is replaced with a
816 * task with an earlier expiration time, the leader field is
817 * invalidated by being reset to null, and some waiting
818 * thread, but not necessarily the current leader, is
819 * signalled. So waiting threads must be prepared to acquire
820 * and lose leadership while waiting.
821 */
822 private Thread leader = null;
823
824 /**
825 * Condition signalled when a newer task becomes available at the
826 * head of the queue or a new thread may need to become leader.
827 */
828 private final Condition available = lock.newCondition();
829
830 /**
831 * Set f's heapIndex if it is a ScheduledFutureTask.
832 */
833 private void setIndex(RunnableScheduledFuture f, int idx) {
834 if (f instanceof ScheduledFutureTask)
835 ((ScheduledFutureTask)f).heapIndex = idx;
836 }
837
838 /**
839 * Sift element added at bottom up to its heap-ordered spot.
840 * Call only when holding lock.
841 */
842 private void siftUp(int k, RunnableScheduledFuture key) {
843 while (k > 0) {
844 int parent = (k - 1) >>> 1;
845 RunnableScheduledFuture e = queue[parent];
846 if (key.compareTo(e) >= 0)
847 break;
848 queue[k] = e;
849 setIndex(e, k);
850 k = parent;
851 }
852 queue[k] = key;
853 setIndex(key, k);
854 }
855
856 /**
857 * Sift element added at top down to its heap-ordered spot.
858 * Call only when holding lock.
859 */
860 private void siftDown(int k, RunnableScheduledFuture key) {
861 int half = size >>> 1;
862 while (k < half) {
863 int child = (k << 1) + 1;
864 RunnableScheduledFuture c = queue[child];
865 int right = child + 1;
866 if (right < size && c.compareTo(queue[right]) > 0)
867 c = queue[child = right];
868 if (key.compareTo(c) <= 0)
869 break;
870 queue[k] = c;
871 setIndex(c, k);
872 k = child;
873 }
874 queue[k] = key;
875 setIndex(key, k);
876 }
877
878 /**
879 * Resize the heap array. Call only when holding lock.
880 */
881 private void grow() {
882 int oldCapacity = queue.length;
883 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
884 if (newCapacity < 0) // overflow
885 newCapacity = Integer.MAX_VALUE;
886 queue = Arrays.copyOf(queue, newCapacity);
887 }
888
889 /**
890 * Find index of given object, or -1 if absent
891 */
892 private int indexOf(Object x) {
893 if (x != null) {
894 if (x instanceof ScheduledFutureTask) {
895 int i = ((ScheduledFutureTask) x).heapIndex;
896 // Sanity check; x could conceivably be a
897 // ScheduledFutureTask from some other pool.
898 if (i >= 0 && i < size && queue[i] == x)
899 return i;
900 } else {
901 for (int i = 0; i < size; i++)
902 if (x.equals(queue[i]))
903 return i;
904 }
905 }
906 return -1;
907 }
908
909 public boolean contains(Object x) {
910 final ReentrantLock lock = this.lock;
911 lock.lock();
912 try {
913 return indexOf(x) != -1;
914 } finally {
915 lock.unlock();
916 }
917 }
918
919 public boolean remove(Object x) {
920 final ReentrantLock lock = this.lock;
921 lock.lock();
922 try {
923 int i = indexOf(x);
924 if (i < 0)
925 return false;
926
927 setIndex(queue[i], -1);
928 int s = --size;
929 RunnableScheduledFuture replacement = queue[s];
930 queue[s] = null;
931 if (s != i) {
932 siftDown(i, replacement);
933 if (queue[i] == replacement)
934 siftUp(i, replacement);
935 }
936 return true;
937 } finally {
938 lock.unlock();
939 }
940 }
941
942 public int size() {
943 final ReentrantLock lock = this.lock;
944 lock.lock();
945 try {
946 return size;
947 } finally {
948 lock.unlock();
949 }
950 }
951
952 public boolean isEmpty() {
953 return size() == 0;
954 }
955
956 public int remainingCapacity() {
957 return Integer.MAX_VALUE;
958 }
959
960 public RunnableScheduledFuture peek() {
961 final ReentrantLock lock = this.lock;
962 lock.lock();
963 try {
964 return queue[0];
965 } finally {
966 lock.unlock();
967 }
968 }
969
970 public boolean offer(Runnable x) {
971 if (x == null)
972 throw new NullPointerException();
973 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
974 final ReentrantLock lock = this.lock;
975 lock.lock();
976 try {
977 int i = size;
978 if (i >= queue.length)
979 grow();
980 size = i + 1;
981 if (i == 0) {
982 queue[0] = e;
983 setIndex(e, 0);
984 } else {
985 siftUp(i, e);
986 }
987 if (queue[0] == e) {
988 leader = null;
989 available.signal();
990 }
991 } finally {
992 lock.unlock();
993 }
994 return true;
995 }
996
997 public void put(Runnable e) {
998 offer(e);
999 }
1000
1001 public boolean add(Runnable e) {
1002 return offer(e);
1003 }
1004
1005 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1006 return offer(e);
1007 }
1008
1009 /**
1010 * Performs common bookkeeping for poll and take: Replaces
1011 * first element with last and sifts it down. Call only when
1012 * holding lock.
1013 * @param f the task to remove and return
1014 */
1015 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1016 int s = --size;
1017 RunnableScheduledFuture x = queue[s];
1018 queue[s] = null;
1019 if (s != 0)
1020 siftDown(0, x);
1021 setIndex(f, -1);
1022 return f;
1023 }
1024
1025 public RunnableScheduledFuture poll() {
1026 final ReentrantLock lock = this.lock;
1027 lock.lock();
1028 try {
1029 RunnableScheduledFuture first = queue[0];
1030 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1031 return null;
1032 else
1033 return finishPoll(first);
1034 } finally {
1035 lock.unlock();
1036 }
1037 }
1038
1039 public RunnableScheduledFuture take() throws InterruptedException {
1040 final ReentrantLock lock = this.lock;
1041 lock.lockInterruptibly();
1042 try {
1043 for (;;) {
1044 RunnableScheduledFuture first = queue[0];
1045 if (first == null)
1046 available.await();
1047 else {
1048 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1049 if (delay <= 0)
1050 return finishPoll(first);
1051 else if (leader != null)
1052 available.await();
1053 else {
1054 Thread thisThread = Thread.currentThread();
1055 leader = thisThread;
1056 try {
1057 available.awaitNanos(delay);
1058 } finally {
1059 if (leader == thisThread)
1060 leader = null;
1061 }
1062 }
1063 }
1064 }
1065 } finally {
1066 if (leader == null && queue[0] != null)
1067 available.signal();
1068 lock.unlock();
1069 }
1070 }
1071
1072 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1073 throws InterruptedException {
1074 long nanos = unit.toNanos(timeout);
1075 final ReentrantLock lock = this.lock;
1076 lock.lockInterruptibly();
1077 try {
1078 for (;;) {
1079 RunnableScheduledFuture first = queue[0];
1080 if (first == null) {
1081 if (nanos <= 0)
1082 return null;
1083 else
1084 nanos = available.awaitNanos(nanos);
1085 } else {
1086 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1087 if (delay <= 0)
1088 return finishPoll(first);
1089 if (nanos <= 0)
1090 return null;
1091 if (nanos < delay || leader != null)
1092 nanos = available.awaitNanos(nanos);
1093 else {
1094 Thread thisThread = Thread.currentThread();
1095 leader = thisThread;
1096 try {
1097 long timeLeft = available.awaitNanos(delay);
1098 nanos -= delay - timeLeft;
1099 } finally {
1100 if (leader == thisThread)
1101 leader = null;
1102 }
1103 }
1104 }
1105 }
1106 } finally {
1107 if (leader == null && queue[0] != null)
1108 available.signal();
1109 lock.unlock();
1110 }
1111 }
1112
1113 public void clear() {
1114 final ReentrantLock lock = this.lock;
1115 lock.lock();
1116 try {
1117 for (int i = 0; i < size; i++) {
1118 RunnableScheduledFuture t = queue[i];
1119 if (t != null) {
1120 queue[i] = null;
1121 setIndex(t, -1);
1122 }
1123 }
1124 size = 0;
1125 } finally {
1126 lock.unlock();
1127 }
1128 }
1129
1130 /**
1131 * Return and remove first element only if it is expired.
1132 * Used only by drainTo. Call only when holding lock.
1133 */
1134 private RunnableScheduledFuture pollExpired() {
1135 RunnableScheduledFuture first = queue[0];
1136 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1137 return null;
1138 return finishPoll(first);
1139 }
1140
1141 public int drainTo(Collection<? super Runnable> c) {
1142 if (c == null)
1143 throw new NullPointerException();
1144 if (c == this)
1145 throw new IllegalArgumentException();
1146 final ReentrantLock lock = this.lock;
1147 lock.lock();
1148 try {
1149 RunnableScheduledFuture first;
1150 int n = 0;
1151 while ((first = pollExpired()) != null) {
1152 c.add(first);
1153 ++n;
1154 }
1155 return n;
1156 } finally {
1157 lock.unlock();
1158 }
1159 }
1160
1161 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1162 if (c == null)
1163 throw new NullPointerException();
1164 if (c == this)
1165 throw new IllegalArgumentException();
1166 if (maxElements <= 0)
1167 return 0;
1168 final ReentrantLock lock = this.lock;
1169 lock.lock();
1170 try {
1171 RunnableScheduledFuture first;
1172 int n = 0;
1173 while (n < maxElements && (first = pollExpired()) != null) {
1174 c.add(first);
1175 ++n;
1176 }
1177 return n;
1178 } finally {
1179 lock.unlock();
1180 }
1181 }
1182
1183 public Object[] toArray() {
1184 final ReentrantLock lock = this.lock;
1185 lock.lock();
1186 try {
1187 return Arrays.copyOf(queue, size, Object[].class);
1188 } finally {
1189 lock.unlock();
1190 }
1191 }
1192
1193 @SuppressWarnings("unchecked")
1194 public <T> T[] toArray(T[] a) {
1195 final ReentrantLock lock = this.lock;
1196 lock.lock();
1197 try {
1198 if (a.length < size)
1199 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1200 System.arraycopy(queue, 0, a, 0, size);
1201 if (a.length > size)
1202 a[size] = null;
1203 return a;
1204 } finally {
1205 lock.unlock();
1206 }
1207 }
1208
1209 public Iterator<Runnable> iterator() {
1210 return new Itr(Arrays.copyOf(queue, size));
1211 }
1212
1213 /**
1214 * Snapshot iterator that works off copy of underlying q array.
1215 */
1216 private class Itr implements Iterator<Runnable> {
1217 final RunnableScheduledFuture[] array;
1218 int cursor = 0; // index of next element to return
1219 int lastRet = -1; // index of last element, or -1 if no such
1220
1221 Itr(RunnableScheduledFuture[] array) {
1222 this.array = array;
1223 }
1224
1225 public boolean hasNext() {
1226 return cursor < array.length;
1227 }
1228
1229 public Runnable next() {
1230 if (cursor >= array.length)
1231 throw new NoSuchElementException();
1232 lastRet = cursor;
1233 return array[cursor++];
1234 }
1235
1236 public void remove() {
1237 if (lastRet < 0)
1238 throw new IllegalStateException();
1239 DelayedWorkQueue.this.remove(array[lastRet]);
1240 lastRet = -1;
1241 }
1242 }
1243 }
1244 }