ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.63
Committed: Wed Sep 21 13:52:48 2011 UTC (12 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.62: +2 -2 lines
Log Message:
Ensure at least one thread even if core 0 in STPE

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.atomic.AtomicLong;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13
14 /**
15 * A {@link ThreadPoolExecutor} that can additionally schedule
16 * commands to run after a given delay, or to execute
17 * periodically. This class is preferable to {@link java.util.Timer}
18 * when multiple worker threads are needed, or when the additional
19 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
20 * this class extends) are required.
21 *
22 * <p>Delayed tasks execute no sooner than they are enabled, but
23 * without any real-time guarantees about when, after they are
24 * enabled, they will commence. Tasks scheduled for exactly the same
25 * execution time are enabled in first-in-first-out (FIFO) order of
26 * submission.
27 *
28 * <p>When a submitted task is cancelled before it is run, execution
29 * is suppressed. By default, such a cancelled task is not
30 * automatically removed from the work queue until its delay
31 * elapses. While this enables further inspection and monitoring, it
32 * may also cause unbounded retention of cancelled tasks. To avoid
33 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
34 * causes tasks to be immediately removed from the work queue at
35 * time of cancellation.
36 *
37 * <p>Successive executions of a task scheduled via
38 * {@code scheduleAtFixedRate} or
39 * {@code scheduleWithFixedDelay} do not overlap. While different
40 * executions may be performed by different threads, the effects of
41 * prior executions <a
42 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
43 * those of subsequent ones.
44 *
45 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46 * of the inherited tuning methods are not useful for it. In
47 * particular, because it acts as a fixed-sized pool using
48 * {@code corePoolSize} threads and an unbounded queue, adjustments
49 * to {@code maximumPoolSize} have no useful effect. Additionally, it
50 * is almost never a good idea to set {@code corePoolSize} to zero or
51 * use {@code allowCoreThreadTimeOut} because this may leave the pool
52 * without threads to handle tasks once they become eligible to run.
53 *
54 * <p><b>Extension notes:</b> This class overrides the
55 * {@link ThreadPoolExecutor#execute execute} and
56 * {@link AbstractExecutorService#submit(Runnable) submit}
57 * methods to generate internal {@link ScheduledFuture} objects to
58 * control per-task delays and scheduling. To preserve
59 * functionality, any further overrides of these methods in
60 * subclasses must invoke superclass versions, which effectively
61 * disables additional task customization. However, this class
62 * provides alternative protected extension method
63 * {@code decorateTask} (one version each for {@code Runnable} and
64 * {@code Callable}) that can be used to customize the concrete task
65 * types used to execute commands entered via {@code execute},
66 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
67 * and {@code scheduleWithFixedDelay}. By default, a
68 * {@code ScheduledThreadPoolExecutor} uses a task type extending
69 * {@link FutureTask}. However, this may be modified or replaced using
70 * subclasses of the form:
71 *
72 * <pre> {@code
73 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
74 *
75 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
76 *
77 * protected <V> RunnableScheduledFuture<V> decorateTask(
78 * Runnable r, RunnableScheduledFuture<V> task) {
79 * return new CustomTask<V>(r, task);
80 * }
81 *
82 * protected <V> RunnableScheduledFuture<V> decorateTask(
83 * Callable<V> c, RunnableScheduledFuture<V> task) {
84 * return new CustomTask<V>(c, task);
85 * }
86 * // ... add constructors, etc.
87 * }}</pre>
88 *
89 * @since 1.5
90 * @author Doug Lea
91 */
92 public class ScheduledThreadPoolExecutor
93 extends ThreadPoolExecutor
94 implements ScheduledExecutorService {
95
96 /*
97 * This class specializes ThreadPoolExecutor implementation by
98 *
99 * 1. Using a custom task type, ScheduledFutureTask for
100 * tasks, even those that don't require scheduling (i.e.,
101 * those submitted using ExecutorService execute, not
102 * ScheduledExecutorService methods) which are treated as
103 * delayed tasks with a delay of zero.
104 *
105 * 2. Using a custom queue (DelayedWorkQueue), a variant of
106 * unbounded DelayQueue. The lack of capacity constraint and
107 * the fact that corePoolSize and maximumPoolSize are
108 * effectively identical simplifies some execution mechanics
109 * (see delayedExecute) compared to ThreadPoolExecutor.
110 *
111 * 3. Supporting optional run-after-shutdown parameters, which
112 * leads to overrides of shutdown methods to remove and cancel
113 * tasks that should NOT be run after shutdown, as well as
114 * different recheck logic when task (re)submission overlaps
115 * with a shutdown.
116 *
117 * 4. Task decoration methods to allow interception and
118 * instrumentation, which are needed because subclasses cannot
119 * otherwise override submit methods to get this effect. These
120 * don't have any impact on pool control logic though.
121 */
122
123 /**
124 * False if should cancel/suppress periodic tasks on shutdown.
125 */
126 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
127
128 /**
129 * False if should cancel non-periodic tasks on shutdown.
130 */
131 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
132
133 /**
134 * True if ScheduledFutureTask.cancel should remove from queue
135 */
136 private volatile boolean removeOnCancel = false;
137
138 /**
139 * Sequence number to break scheduling ties, and in turn to
140 * guarantee FIFO order among tied entries.
141 */
142 private static final AtomicLong sequencer = new AtomicLong();
143
144 /**
145 * Returns current nanosecond time.
146 */
147 final long now() {
148 return System.nanoTime();
149 }
150
151 private class ScheduledFutureTask<V>
152 extends FutureTask<V> implements RunnableScheduledFuture<V> {
153
154 /** Sequence number to break ties FIFO */
155 private final long sequenceNumber;
156
157 /** The time the task is enabled to execute in nanoTime units */
158 private long time;
159
160 /**
161 * Period in nanoseconds for repeating tasks. A positive
162 * value indicates fixed-rate execution. A negative value
163 * indicates fixed-delay execution. A value of 0 indicates a
164 * non-repeating task.
165 */
166 private final long period;
167
168 /** The actual task to be re-enqueued by reExecutePeriodic */
169 RunnableScheduledFuture<V> outerTask = this;
170
171 /**
172 * Index into delay queue, to support faster cancellation.
173 */
174 int heapIndex;
175
176 /**
177 * Creates a one-shot action with given nanoTime-based trigger time.
178 */
179 ScheduledFutureTask(Runnable r, V result, long ns) {
180 super(r, result);
181 this.time = ns;
182 this.period = 0;
183 this.sequenceNumber = sequencer.getAndIncrement();
184 }
185
186 /**
187 * Creates a periodic action with given nano time and period.
188 */
189 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
190 super(r, result);
191 this.time = ns;
192 this.period = period;
193 this.sequenceNumber = sequencer.getAndIncrement();
194 }
195
196 /**
197 * Creates a one-shot action with given nanoTime-based trigger.
198 */
199 ScheduledFutureTask(Callable<V> callable, long ns) {
200 super(callable);
201 this.time = ns;
202 this.period = 0;
203 this.sequenceNumber = sequencer.getAndIncrement();
204 }
205
206 public long getDelay(TimeUnit unit) {
207 return unit.convert(time - now(), NANOSECONDS);
208 }
209
210 public int compareTo(Delayed other) {
211 if (other == this) // compare zero if same object
212 return 0;
213 if (other instanceof ScheduledFutureTask) {
214 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
215 long diff = time - x.time;
216 if (diff < 0)
217 return -1;
218 else if (diff > 0)
219 return 1;
220 else if (sequenceNumber < x.sequenceNumber)
221 return -1;
222 else
223 return 1;
224 }
225 long diff = (getDelay(NANOSECONDS) -
226 other.getDelay(NANOSECONDS));
227 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
228 }
229
230 /**
231 * Returns true if this is a periodic (not a one-shot) action.
232 *
233 * @return true if periodic
234 */
235 public boolean isPeriodic() {
236 return period != 0;
237 }
238
239 /**
240 * Sets the next time to run for a periodic task.
241 */
242 private void setNextRunTime() {
243 long p = period;
244 if (p > 0)
245 time += p;
246 else
247 time = triggerTime(-p);
248 }
249
250 public boolean cancel(boolean mayInterruptIfRunning) {
251 boolean cancelled = super.cancel(mayInterruptIfRunning);
252 if (cancelled && removeOnCancel && heapIndex >= 0)
253 remove(this);
254 return cancelled;
255 }
256
257 /**
258 * Overrides FutureTask version so as to reset/requeue if periodic.
259 */
260 public void run() {
261 boolean periodic = isPeriodic();
262 if (!canRunInCurrentRunState(periodic))
263 cancel(false);
264 else if (!periodic)
265 ScheduledFutureTask.super.run();
266 else if (ScheduledFutureTask.super.runAndReset()) {
267 setNextRunTime();
268 reExecutePeriodic(outerTask);
269 }
270 }
271 }
272
273 /**
274 * Returns true if can run a task given current run state
275 * and run-after-shutdown parameters.
276 *
277 * @param periodic true if this task periodic, false if delayed
278 */
279 boolean canRunInCurrentRunState(boolean periodic) {
280 return isRunningOrShutdown(periodic ?
281 continueExistingPeriodicTasksAfterShutdown :
282 executeExistingDelayedTasksAfterShutdown);
283 }
284
285 /**
286 * Main execution method for delayed or periodic tasks. If pool
287 * is shut down, rejects the task. Otherwise adds task to queue
288 * and starts a thread, if necessary, to run it. (We cannot
289 * prestart the thread to run the task because the task (probably)
290 * shouldn't be run yet,) If the pool is shut down while the task
291 * is being added, cancel and remove it if required by state and
292 * run-after-shutdown parameters.
293 *
294 * @param task the task
295 */
296 private void delayedExecute(RunnableScheduledFuture<?> task) {
297 if (isShutdown())
298 reject(task);
299 else {
300 super.getQueue().add(task);
301 if (isShutdown() &&
302 !canRunInCurrentRunState(task.isPeriodic()) &&
303 remove(task))
304 task.cancel(false);
305 else
306 ensurePrestart();
307 }
308 }
309
310 /**
311 * Requeues a periodic task unless current run state precludes it.
312 * Same idea as delayedExecute except drops task rather than rejecting.
313 *
314 * @param task the task
315 */
316 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
317 if (canRunInCurrentRunState(true)) {
318 super.getQueue().add(task);
319 if (!canRunInCurrentRunState(true) && remove(task))
320 task.cancel(false);
321 else
322 ensurePrestart();
323 }
324 }
325
326 /**
327 * Cancels and clears the queue of all tasks that should not be run
328 * due to shutdown policy. Invoked within super.shutdown.
329 */
330 @Override void onShutdown() {
331 BlockingQueue<Runnable> q = super.getQueue();
332 boolean keepDelayed =
333 getExecuteExistingDelayedTasksAfterShutdownPolicy();
334 boolean keepPeriodic =
335 getContinueExistingPeriodicTasksAfterShutdownPolicy();
336 if (!keepDelayed && !keepPeriodic) {
337 for (Object e : q.toArray())
338 if (e instanceof RunnableScheduledFuture<?>)
339 ((RunnableScheduledFuture<?>) e).cancel(false);
340 q.clear();
341 }
342 else {
343 // Traverse snapshot to avoid iterator exceptions
344 for (Object e : q.toArray()) {
345 if (e instanceof RunnableScheduledFuture) {
346 RunnableScheduledFuture<?> t =
347 (RunnableScheduledFuture<?>)e;
348 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
349 t.isCancelled()) { // also remove if already cancelled
350 if (q.remove(t))
351 t.cancel(false);
352 }
353 }
354 }
355 }
356 tryTerminate();
357 }
358
359 /**
360 * Modifies or replaces the task used to execute a runnable.
361 * This method can be used to override the concrete
362 * class used for managing internal tasks.
363 * The default implementation simply returns the given task.
364 *
365 * @param runnable the submitted Runnable
366 * @param task the task created to execute the runnable
367 * @return a task that can execute the runnable
368 * @since 1.6
369 */
370 protected <V> RunnableScheduledFuture<V> decorateTask(
371 Runnable runnable, RunnableScheduledFuture<V> task) {
372 return task;
373 }
374
375 /**
376 * Modifies or replaces the task used to execute a callable.
377 * This method can be used to override the concrete
378 * class used for managing internal tasks.
379 * The default implementation simply returns the given task.
380 *
381 * @param callable the submitted Callable
382 * @param task the task created to execute the callable
383 * @return a task that can execute the callable
384 * @since 1.6
385 */
386 protected <V> RunnableScheduledFuture<V> decorateTask(
387 Callable<V> callable, RunnableScheduledFuture<V> task) {
388 return task;
389 }
390
391 /**
392 * Creates a new {@code ScheduledThreadPoolExecutor} with the
393 * given core pool size.
394 *
395 * @param corePoolSize the number of threads to keep in the pool, even
396 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
397 * @throws IllegalArgumentException if {@code corePoolSize < 0}
398 */
399 public ScheduledThreadPoolExecutor(int corePoolSize) {
400 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
401 new DelayedWorkQueue());
402 }
403
404 /**
405 * Creates a new {@code ScheduledThreadPoolExecutor} with the
406 * given initial parameters.
407 *
408 * @param corePoolSize the number of threads to keep in the pool, even
409 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
410 * @param threadFactory the factory to use when the executor
411 * creates a new thread
412 * @throws IllegalArgumentException if {@code corePoolSize < 0}
413 * @throws NullPointerException if {@code threadFactory} is null
414 */
415 public ScheduledThreadPoolExecutor(int corePoolSize,
416 ThreadFactory threadFactory) {
417 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
418 new DelayedWorkQueue(), threadFactory);
419 }
420
421 /**
422 * Creates a new ScheduledThreadPoolExecutor with the given
423 * initial parameters.
424 *
425 * @param corePoolSize the number of threads to keep in the pool, even
426 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
427 * @param handler the handler to use when execution is blocked
428 * because the thread bounds and queue capacities are reached
429 * @throws IllegalArgumentException if {@code corePoolSize < 0}
430 * @throws NullPointerException if {@code handler} is null
431 */
432 public ScheduledThreadPoolExecutor(int corePoolSize,
433 RejectedExecutionHandler handler) {
434 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
435 new DelayedWorkQueue(), handler);
436 }
437
438 /**
439 * Creates a new ScheduledThreadPoolExecutor with the given
440 * initial parameters.
441 *
442 * @param corePoolSize the number of threads to keep in the pool, even
443 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
444 * @param threadFactory the factory to use when the executor
445 * creates a new thread
446 * @param handler the handler to use when execution is blocked
447 * because the thread bounds and queue capacities are reached
448 * @throws IllegalArgumentException if {@code corePoolSize < 0}
449 * @throws NullPointerException if {@code threadFactory} or
450 * {@code handler} is null
451 */
452 public ScheduledThreadPoolExecutor(int corePoolSize,
453 ThreadFactory threadFactory,
454 RejectedExecutionHandler handler) {
455 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
456 new DelayedWorkQueue(), threadFactory, handler);
457 }
458
459 /**
460 * Returns the trigger time of a delayed action.
461 */
462 private long triggerTime(long delay, TimeUnit unit) {
463 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
464 }
465
466 /**
467 * Returns the trigger time of a delayed action.
468 */
469 long triggerTime(long delay) {
470 return now() +
471 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
472 }
473
474 /**
475 * Constrains the values of all delays in the queue to be within
476 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
477 * This may occur if a task is eligible to be dequeued, but has
478 * not yet been, while some other task is added with a delay of
479 * Long.MAX_VALUE.
480 */
481 private long overflowFree(long delay) {
482 Delayed head = (Delayed) super.getQueue().peek();
483 if (head != null) {
484 long headDelay = head.getDelay(NANOSECONDS);
485 if (headDelay < 0 && (delay - headDelay < 0))
486 delay = Long.MAX_VALUE + headDelay;
487 }
488 return delay;
489 }
490
491 /**
492 * @throws RejectedExecutionException {@inheritDoc}
493 * @throws NullPointerException {@inheritDoc}
494 */
495 public ScheduledFuture<?> schedule(Runnable command,
496 long delay,
497 TimeUnit unit) {
498 if (command == null || unit == null)
499 throw new NullPointerException();
500 RunnableScheduledFuture<?> t = decorateTask(command,
501 new ScheduledFutureTask<Void>(command, null,
502 triggerTime(delay, unit)));
503 delayedExecute(t);
504 return t;
505 }
506
507 /**
508 * @throws RejectedExecutionException {@inheritDoc}
509 * @throws NullPointerException {@inheritDoc}
510 */
511 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
512 long delay,
513 TimeUnit unit) {
514 if (callable == null || unit == null)
515 throw new NullPointerException();
516 RunnableScheduledFuture<V> t = decorateTask(callable,
517 new ScheduledFutureTask<V>(callable,
518 triggerTime(delay, unit)));
519 delayedExecute(t);
520 return t;
521 }
522
523 /**
524 * @throws RejectedExecutionException {@inheritDoc}
525 * @throws NullPointerException {@inheritDoc}
526 * @throws IllegalArgumentException {@inheritDoc}
527 */
528 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
529 long initialDelay,
530 long period,
531 TimeUnit unit) {
532 if (command == null || unit == null)
533 throw new NullPointerException();
534 if (period <= 0)
535 throw new IllegalArgumentException();
536 ScheduledFutureTask<Void> sft =
537 new ScheduledFutureTask<Void>(command,
538 null,
539 triggerTime(initialDelay, unit),
540 unit.toNanos(period));
541 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
542 sft.outerTask = t;
543 delayedExecute(t);
544 return t;
545 }
546
547 /**
548 * @throws RejectedExecutionException {@inheritDoc}
549 * @throws NullPointerException {@inheritDoc}
550 * @throws IllegalArgumentException {@inheritDoc}
551 */
552 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
553 long initialDelay,
554 long delay,
555 TimeUnit unit) {
556 if (command == null || unit == null)
557 throw new NullPointerException();
558 if (delay <= 0)
559 throw new IllegalArgumentException();
560 ScheduledFutureTask<Void> sft =
561 new ScheduledFutureTask<Void>(command,
562 null,
563 triggerTime(initialDelay, unit),
564 unit.toNanos(-delay));
565 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
566 sft.outerTask = t;
567 delayedExecute(t);
568 return t;
569 }
570
571 /**
572 * Executes {@code command} with zero required delay.
573 * This has effect equivalent to
574 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
575 * Note that inspections of the queue and of the list returned by
576 * {@code shutdownNow} will access the zero-delayed
577 * {@link ScheduledFuture}, not the {@code command} itself.
578 *
579 * <p>A consequence of the use of {@code ScheduledFuture} objects is
580 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
581 * called with a null second {@code Throwable} argument, even if the
582 * {@code command} terminated abruptly. Instead, the {@code Throwable}
583 * thrown by such a task can be obtained via {@link Future#get}.
584 *
585 * @throws RejectedExecutionException at discretion of
586 * {@code RejectedExecutionHandler}, if the task
587 * cannot be accepted for execution because the
588 * executor has been shut down
589 * @throws NullPointerException {@inheritDoc}
590 */
591 public void execute(Runnable command) {
592 schedule(command, 0, NANOSECONDS);
593 }
594
595 // Override AbstractExecutorService methods
596
597 /**
598 * @throws RejectedExecutionException {@inheritDoc}
599 * @throws NullPointerException {@inheritDoc}
600 */
601 public Future<?> submit(Runnable task) {
602 return schedule(task, 0, NANOSECONDS);
603 }
604
605 /**
606 * @throws RejectedExecutionException {@inheritDoc}
607 * @throws NullPointerException {@inheritDoc}
608 */
609 public <T> Future<T> submit(Runnable task, T result) {
610 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
611 }
612
613 /**
614 * @throws RejectedExecutionException {@inheritDoc}
615 * @throws NullPointerException {@inheritDoc}
616 */
617 public <T> Future<T> submit(Callable<T> task) {
618 return schedule(task, 0, NANOSECONDS);
619 }
620
621 /**
622 * Sets the policy on whether to continue executing existing
623 * periodic tasks even when this executor has been {@code shutdown}.
624 * In this case, these tasks will only terminate upon
625 * {@code shutdownNow} or after setting the policy to
626 * {@code false} when already shutdown.
627 * This value is by default {@code false}.
628 *
629 * @param value if {@code true}, continue after shutdown, else don't.
630 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
631 */
632 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
633 continueExistingPeriodicTasksAfterShutdown = value;
634 if (!value && isShutdown())
635 onShutdown();
636 }
637
638 /**
639 * Gets the policy on whether to continue executing existing
640 * periodic tasks even when this executor has been {@code shutdown}.
641 * In this case, these tasks will only terminate upon
642 * {@code shutdownNow} or after setting the policy to
643 * {@code false} when already shutdown.
644 * This value is by default {@code false}.
645 *
646 * @return {@code true} if will continue after shutdown
647 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
648 */
649 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
650 return continueExistingPeriodicTasksAfterShutdown;
651 }
652
653 /**
654 * Sets the policy on whether to execute existing delayed
655 * tasks even when this executor has been {@code shutdown}.
656 * In this case, these tasks will only terminate upon
657 * {@code shutdownNow}, or after setting the policy to
658 * {@code false} when already shutdown.
659 * This value is by default {@code true}.
660 *
661 * @param value if {@code true}, execute after shutdown, else don't.
662 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
663 */
664 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
665 executeExistingDelayedTasksAfterShutdown = value;
666 if (!value && isShutdown())
667 onShutdown();
668 }
669
670 /**
671 * Gets the policy on whether to execute existing delayed
672 * tasks even when this executor has been {@code shutdown}.
673 * In this case, these tasks will only terminate upon
674 * {@code shutdownNow}, or after setting the policy to
675 * {@code false} when already shutdown.
676 * This value is by default {@code true}.
677 *
678 * @return {@code true} if will execute after shutdown
679 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
680 */
681 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
682 return executeExistingDelayedTasksAfterShutdown;
683 }
684
685 /**
686 * Sets the policy on whether cancelled tasks should be immediately
687 * removed from the work queue at time of cancellation. This value is
688 * by default {@code false}.
689 *
690 * @param value if {@code true}, remove on cancellation, else don't
691 * @see #getRemoveOnCancelPolicy
692 * @since 1.7
693 */
694 public void setRemoveOnCancelPolicy(boolean value) {
695 removeOnCancel = value;
696 }
697
698 /**
699 * Gets the policy on whether cancelled tasks should be immediately
700 * removed from the work queue at time of cancellation. This value is
701 * by default {@code false}.
702 *
703 * @return {@code true} if cancelled tasks are immediately removed
704 * from the queue
705 * @see #setRemoveOnCancelPolicy
706 * @since 1.7
707 */
708 public boolean getRemoveOnCancelPolicy() {
709 return removeOnCancel;
710 }
711
712 /**
713 * Initiates an orderly shutdown in which previously submitted
714 * tasks are executed, but no new tasks will be accepted.
715 * Invocation has no additional effect if already shut down.
716 *
717 * <p>This method does not wait for previously submitted tasks to
718 * complete execution. Use {@link #awaitTermination awaitTermination}
719 * to do that.
720 *
721 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
722 * has been set {@code false}, existing delayed tasks whose delays
723 * have not yet elapsed are cancelled. And unless the {@code
724 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
725 * {@code true}, future executions of existing periodic tasks will
726 * be cancelled.
727 *
728 * @throws SecurityException {@inheritDoc}
729 */
730 public void shutdown() {
731 super.shutdown();
732 }
733
734 /**
735 * Attempts to stop all actively executing tasks, halts the
736 * processing of waiting tasks, and returns a list of the tasks
737 * that were awaiting execution.
738 *
739 * <p>This method does not wait for actively executing tasks to
740 * terminate. Use {@link #awaitTermination awaitTermination} to
741 * do that.
742 *
743 * <p>There are no guarantees beyond best-effort attempts to stop
744 * processing actively executing tasks. This implementation
745 * cancels tasks via {@link Thread#interrupt}, so any task that
746 * fails to respond to interrupts may never terminate.
747 *
748 * @return list of tasks that never commenced execution.
749 * Each element of this list is a {@link ScheduledFuture},
750 * including those tasks submitted using {@code execute},
751 * which are for scheduling purposes used as the basis of a
752 * zero-delay {@code ScheduledFuture}.
753 * @throws SecurityException {@inheritDoc}
754 */
755 public List<Runnable> shutdownNow() {
756 return super.shutdownNow();
757 }
758
759 /**
760 * Returns the task queue used by this executor. Each element of
761 * this queue is a {@link ScheduledFuture}, including those
762 * tasks submitted using {@code execute} which are for scheduling
763 * purposes used as the basis of a zero-delay
764 * {@code ScheduledFuture}. Iteration over this queue is
765 * <em>not</em> guaranteed to traverse tasks in the order in
766 * which they will execute.
767 *
768 * @return the task queue
769 */
770 public BlockingQueue<Runnable> getQueue() {
771 return super.getQueue();
772 }
773
774 /**
775 * Specialized delay queue. To mesh with TPE declarations, this
776 * class must be declared as a BlockingQueue<Runnable> even though
777 * it can only hold RunnableScheduledFutures.
778 */
779 static class DelayedWorkQueue extends AbstractQueue<Runnable>
780 implements BlockingQueue<Runnable> {
781
782 /*
783 * A DelayedWorkQueue is based on a heap-based data structure
784 * like those in DelayQueue and PriorityQueue, except that
785 * every ScheduledFutureTask also records its index into the
786 * heap array. This eliminates the need to find a task upon
787 * cancellation, greatly speeding up removal (down from O(n)
788 * to O(log n)), and reducing garbage retention that would
789 * otherwise occur by waiting for the element to rise to top
790 * before clearing. But because the queue may also hold
791 * RunnableScheduledFutures that are not ScheduledFutureTasks,
792 * we are not guaranteed to have such indices available, in
793 * which case we fall back to linear search. (We expect that
794 * most tasks will not be decorated, and that the faster cases
795 * will be much more common.)
796 *
797 * All heap operations must record index changes -- mainly
798 * within siftUp and siftDown. Upon removal, a task's
799 * heapIndex is set to -1. Note that ScheduledFutureTasks can
800 * appear at most once in the queue (this need not be true for
801 * other kinds of tasks or work queues), so are uniquely
802 * identified by heapIndex.
803 */
804
805 private static final int INITIAL_CAPACITY = 16;
806 private RunnableScheduledFuture<?>[] queue =
807 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
808 private final ReentrantLock lock = new ReentrantLock();
809 private int size = 0;
810
811 /**
812 * Thread designated to wait for the task at the head of the
813 * queue. This variant of the Leader-Follower pattern
814 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
815 * minimize unnecessary timed waiting. When a thread becomes
816 * the leader, it waits only for the next delay to elapse, but
817 * other threads await indefinitely. The leader thread must
818 * signal some other thread before returning from take() or
819 * poll(...), unless some other thread becomes leader in the
820 * interim. Whenever the head of the queue is replaced with a
821 * task with an earlier expiration time, the leader field is
822 * invalidated by being reset to null, and some waiting
823 * thread, but not necessarily the current leader, is
824 * signalled. So waiting threads must be prepared to acquire
825 * and lose leadership while waiting.
826 */
827 private Thread leader = null;
828
829 /**
830 * Condition signalled when a newer task becomes available at the
831 * head of the queue or a new thread may need to become leader.
832 */
833 private final Condition available = lock.newCondition();
834
835 /**
836 * Set f's heapIndex if it is a ScheduledFutureTask.
837 */
838 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
839 if (f instanceof ScheduledFutureTask)
840 ((ScheduledFutureTask)f).heapIndex = idx;
841 }
842
843 /**
844 * Sift element added at bottom up to its heap-ordered spot.
845 * Call only when holding lock.
846 */
847 private void siftUp(int k, RunnableScheduledFuture<?> key) {
848 while (k > 0) {
849 int parent = (k - 1) >>> 1;
850 RunnableScheduledFuture<?> e = queue[parent];
851 if (key.compareTo(e) >= 0)
852 break;
853 queue[k] = e;
854 setIndex(e, k);
855 k = parent;
856 }
857 queue[k] = key;
858 setIndex(key, k);
859 }
860
861 /**
862 * Sift element added at top down to its heap-ordered spot.
863 * Call only when holding lock.
864 */
865 private void siftDown(int k, RunnableScheduledFuture<?> key) {
866 int half = size >>> 1;
867 while (k < half) {
868 int child = (k << 1) + 1;
869 RunnableScheduledFuture<?> c = queue[child];
870 int right = child + 1;
871 if (right < size && c.compareTo(queue[right]) > 0)
872 c = queue[child = right];
873 if (key.compareTo(c) <= 0)
874 break;
875 queue[k] = c;
876 setIndex(c, k);
877 k = child;
878 }
879 queue[k] = key;
880 setIndex(key, k);
881 }
882
883 /**
884 * Resize the heap array. Call only when holding lock.
885 */
886 private void grow() {
887 int oldCapacity = queue.length;
888 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
889 if (newCapacity < 0) // overflow
890 newCapacity = Integer.MAX_VALUE;
891 queue = Arrays.copyOf(queue, newCapacity);
892 }
893
894 /**
895 * Find index of given object, or -1 if absent
896 */
897 private int indexOf(Object x) {
898 if (x != null) {
899 if (x instanceof ScheduledFutureTask) {
900 int i = ((ScheduledFutureTask) x).heapIndex;
901 // Sanity check; x could conceivably be a
902 // ScheduledFutureTask from some other pool.
903 if (i >= 0 && i < size && queue[i] == x)
904 return i;
905 } else {
906 for (int i = 0; i < size; i++)
907 if (x.equals(queue[i]))
908 return i;
909 }
910 }
911 return -1;
912 }
913
914 public boolean contains(Object x) {
915 final ReentrantLock lock = this.lock;
916 lock.lock();
917 try {
918 return indexOf(x) != -1;
919 } finally {
920 lock.unlock();
921 }
922 }
923
924 public boolean remove(Object x) {
925 final ReentrantLock lock = this.lock;
926 lock.lock();
927 try {
928 int i = indexOf(x);
929 if (i < 0)
930 return false;
931
932 setIndex(queue[i], -1);
933 int s = --size;
934 RunnableScheduledFuture<?> replacement = queue[s];
935 queue[s] = null;
936 if (s != i) {
937 siftDown(i, replacement);
938 if (queue[i] == replacement)
939 siftUp(i, replacement);
940 }
941 return true;
942 } finally {
943 lock.unlock();
944 }
945 }
946
947 public int size() {
948 final ReentrantLock lock = this.lock;
949 lock.lock();
950 try {
951 return size;
952 } finally {
953 lock.unlock();
954 }
955 }
956
957 public boolean isEmpty() {
958 return size() == 0;
959 }
960
961 public int remainingCapacity() {
962 return Integer.MAX_VALUE;
963 }
964
965 public RunnableScheduledFuture<?> peek() {
966 final ReentrantLock lock = this.lock;
967 lock.lock();
968 try {
969 return queue[0];
970 } finally {
971 lock.unlock();
972 }
973 }
974
975 public boolean offer(Runnable x) {
976 if (x == null)
977 throw new NullPointerException();
978 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
979 final ReentrantLock lock = this.lock;
980 lock.lock();
981 try {
982 int i = size;
983 if (i >= queue.length)
984 grow();
985 size = i + 1;
986 if (i == 0) {
987 queue[0] = e;
988 setIndex(e, 0);
989 } else {
990 siftUp(i, e);
991 }
992 if (queue[0] == e) {
993 leader = null;
994 available.signal();
995 }
996 } finally {
997 lock.unlock();
998 }
999 return true;
1000 }
1001
1002 public void put(Runnable e) {
1003 offer(e);
1004 }
1005
1006 public boolean add(Runnable e) {
1007 return offer(e);
1008 }
1009
1010 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1011 return offer(e);
1012 }
1013
1014 /**
1015 * Performs common bookkeeping for poll and take: Replaces
1016 * first element with last and sifts it down. Call only when
1017 * holding lock.
1018 * @param f the task to remove and return
1019 */
1020 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1021 int s = --size;
1022 RunnableScheduledFuture<?> x = queue[s];
1023 queue[s] = null;
1024 if (s != 0)
1025 siftDown(0, x);
1026 setIndex(f, -1);
1027 return f;
1028 }
1029
1030 public RunnableScheduledFuture<?> poll() {
1031 final ReentrantLock lock = this.lock;
1032 lock.lock();
1033 try {
1034 RunnableScheduledFuture<?> first = queue[0];
1035 if (first == null || first.getDelay(NANOSECONDS) > 0)
1036 return null;
1037 else
1038 return finishPoll(first);
1039 } finally {
1040 lock.unlock();
1041 }
1042 }
1043
1044 public RunnableScheduledFuture<?> take() throws InterruptedException {
1045 final ReentrantLock lock = this.lock;
1046 lock.lockInterruptibly();
1047 try {
1048 for (;;) {
1049 RunnableScheduledFuture<?> first = queue[0];
1050 if (first == null)
1051 available.await();
1052 else {
1053 long delay = first.getDelay(NANOSECONDS);
1054 if (delay <= 0)
1055 return finishPoll(first);
1056 else if (leader != null)
1057 available.await();
1058 else {
1059 Thread thisThread = Thread.currentThread();
1060 leader = thisThread;
1061 try {
1062 available.awaitNanos(delay);
1063 } finally {
1064 if (leader == thisThread)
1065 leader = null;
1066 }
1067 }
1068 }
1069 }
1070 } finally {
1071 if (leader == null && queue[0] != null)
1072 available.signal();
1073 lock.unlock();
1074 }
1075 }
1076
1077 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1078 throws InterruptedException {
1079 long nanos = unit.toNanos(timeout);
1080 final ReentrantLock lock = this.lock;
1081 lock.lockInterruptibly();
1082 try {
1083 for (;;) {
1084 RunnableScheduledFuture<?> first = queue[0];
1085 if (first == null) {
1086 if (nanos <= 0)
1087 return null;
1088 else
1089 nanos = available.awaitNanos(nanos);
1090 } else {
1091 long delay = first.getDelay(NANOSECONDS);
1092 if (delay <= 0)
1093 return finishPoll(first);
1094 if (nanos <= 0)
1095 return null;
1096 if (nanos < delay || leader != null)
1097 nanos = available.awaitNanos(nanos);
1098 else {
1099 Thread thisThread = Thread.currentThread();
1100 leader = thisThread;
1101 try {
1102 long timeLeft = available.awaitNanos(delay);
1103 nanos -= delay - timeLeft;
1104 } finally {
1105 if (leader == thisThread)
1106 leader = null;
1107 }
1108 }
1109 }
1110 }
1111 } finally {
1112 if (leader == null && queue[0] != null)
1113 available.signal();
1114 lock.unlock();
1115 }
1116 }
1117
1118 public void clear() {
1119 final ReentrantLock lock = this.lock;
1120 lock.lock();
1121 try {
1122 for (int i = 0; i < size; i++) {
1123 RunnableScheduledFuture<?> t = queue[i];
1124 if (t != null) {
1125 queue[i] = null;
1126 setIndex(t, -1);
1127 }
1128 }
1129 size = 0;
1130 } finally {
1131 lock.unlock();
1132 }
1133 }
1134
1135 /**
1136 * Return first element only if it is expired.
1137 * Used only by drainTo. Call only when holding lock.
1138 */
1139 private RunnableScheduledFuture<?> peekExpired() {
1140 // assert lock.isHeldByCurrentThread();
1141 RunnableScheduledFuture<?> first = queue[0];
1142 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1143 null : first;
1144 }
1145
1146 public int drainTo(Collection<? super Runnable> c) {
1147 if (c == null)
1148 throw new NullPointerException();
1149 if (c == this)
1150 throw new IllegalArgumentException();
1151 final ReentrantLock lock = this.lock;
1152 lock.lock();
1153 try {
1154 RunnableScheduledFuture<?> first;
1155 int n = 0;
1156 while ((first = peekExpired()) != null) {
1157 c.add(first); // In this order, in case add() throws.
1158 finishPoll(first);
1159 ++n;
1160 }
1161 return n;
1162 } finally {
1163 lock.unlock();
1164 }
1165 }
1166
1167 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1168 if (c == null)
1169 throw new NullPointerException();
1170 if (c == this)
1171 throw new IllegalArgumentException();
1172 if (maxElements <= 0)
1173 return 0;
1174 final ReentrantLock lock = this.lock;
1175 lock.lock();
1176 try {
1177 RunnableScheduledFuture<?> first;
1178 int n = 0;
1179 while (n < maxElements && (first = peekExpired()) != null) {
1180 c.add(first); // In this order, in case add() throws.
1181 finishPoll(first);
1182 ++n;
1183 }
1184 return n;
1185 } finally {
1186 lock.unlock();
1187 }
1188 }
1189
1190 public Object[] toArray() {
1191 final ReentrantLock lock = this.lock;
1192 lock.lock();
1193 try {
1194 return Arrays.copyOf(queue, size, Object[].class);
1195 } finally {
1196 lock.unlock();
1197 }
1198 }
1199
1200 @SuppressWarnings("unchecked")
1201 public <T> T[] toArray(T[] a) {
1202 final ReentrantLock lock = this.lock;
1203 lock.lock();
1204 try {
1205 if (a.length < size)
1206 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1207 System.arraycopy(queue, 0, a, 0, size);
1208 if (a.length > size)
1209 a[size] = null;
1210 return a;
1211 } finally {
1212 lock.unlock();
1213 }
1214 }
1215
1216 public Iterator<Runnable> iterator() {
1217 return new Itr(Arrays.copyOf(queue, size));
1218 }
1219
1220 /**
1221 * Snapshot iterator that works off copy of underlying q array.
1222 */
1223 private class Itr implements Iterator<Runnable> {
1224 final RunnableScheduledFuture[] array;
1225 int cursor = 0; // index of next element to return
1226 int lastRet = -1; // index of last element, or -1 if no such
1227
1228 Itr(RunnableScheduledFuture[] array) {
1229 this.array = array;
1230 }
1231
1232 public boolean hasNext() {
1233 return cursor < array.length;
1234 }
1235
1236 public Runnable next() {
1237 if (cursor >= array.length)
1238 throw new NoSuchElementException();
1239 lastRet = cursor;
1240 return array[cursor++];
1241 }
1242
1243 public void remove() {
1244 if (lastRet < 0)
1245 throw new IllegalStateException();
1246 DelayedWorkQueue.this.remove(array[lastRet]);
1247 lastRet = -1;
1248 }
1249 }
1250 }
1251 }