ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.3
Committed: Tue Feb 5 19:54:07 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.2: +2 -2 lines
Log Message:
javadoc style

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import static java.util.concurrent.TimeUnit.NANOSECONDS;
9     import java.util.concurrent.atomic.AtomicLong;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.*;
13    
14     /**
15     * A {@link ThreadPoolExecutor} that can additionally schedule
16     * commands to run after a given delay, or to execute
17     * periodically. This class is preferable to {@link java.util.Timer}
18     * when multiple worker threads are needed, or when the additional
19     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
20     * this class extends) are required.
21     *
22     * <p>Delayed tasks execute no sooner than they are enabled, but
23     * without any real-time guarantees about when, after they are
24     * enabled, they will commence. Tasks scheduled for exactly the same
25     * execution time are enabled in first-in-first-out (FIFO) order of
26     * submission.
27     *
28     * <p>When a submitted task is cancelled before it is run, execution
29     * is suppressed. By default, such a cancelled task is not
30     * automatically removed from the work queue until its delay
31     * elapses. While this enables further inspection and monitoring, it
32     * may also cause unbounded retention of cancelled tasks. To avoid
33     * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
34     * causes tasks to be immediately removed from the work queue at
35     * time of cancellation.
36     *
37     * <p>Successive executions of a task scheduled via
38     * {@code scheduleAtFixedRate} or
39     * {@code scheduleWithFixedDelay} do not overlap. While different
40     * executions may be performed by different threads, the effects of
41     * prior executions <a
42     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
43     * those of subsequent ones.
44     *
45     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46     * of the inherited tuning methods are not useful for it. In
47     * particular, because it acts as a fixed-sized pool using
48     * {@code corePoolSize} threads and an unbounded queue, adjustments
49     * to {@code maximumPoolSize} have no useful effect. Additionally, it
50     * is almost never a good idea to set {@code corePoolSize} to zero or
51     * use {@code allowCoreThreadTimeOut} because this may leave the pool
52     * without threads to handle tasks once they become eligible to run.
53     *
54     * <p><b>Extension notes:</b> This class overrides the
55     * {@link ThreadPoolExecutor#execute execute} and
56     * {@link AbstractExecutorService#submit(Runnable) submit}
57     * methods to generate internal {@link ScheduledFuture} objects to
58     * control per-task delays and scheduling. To preserve
59     * functionality, any further overrides of these methods in
60     * subclasses must invoke superclass versions, which effectively
61     * disables additional task customization. However, this class
62     * provides alternative protected extension method
63     * {@code decorateTask} (one version each for {@code Runnable} and
64     * {@code Callable}) that can be used to customize the concrete task
65     * types used to execute commands entered via {@code execute},
66     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
67     * and {@code scheduleWithFixedDelay}. By default, a
68     * {@code ScheduledThreadPoolExecutor} uses a task type extending
69     * {@link FutureTask}. However, this may be modified or replaced using
70     * subclasses of the form:
71     *
72     * <pre> {@code
73     * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
74     *
75     * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
76     *
77     * protected <V> RunnableScheduledFuture<V> decorateTask(
78     * Runnable r, RunnableScheduledFuture<V> task) {
79     * return new CustomTask<V>(r, task);
80     * }
81     *
82     * protected <V> RunnableScheduledFuture<V> decorateTask(
83     * Callable<V> c, RunnableScheduledFuture<V> task) {
84     * return new CustomTask<V>(c, task);
85     * }
86     * // ... add constructors, etc.
87     * }}</pre>
88     *
89     * @since 1.5
90     * @author Doug Lea
91     */
92     public class ScheduledThreadPoolExecutor
93     extends ThreadPoolExecutor
94     implements ScheduledExecutorService {
95    
96     /*
97     * This class specializes ThreadPoolExecutor implementation by
98     *
99     * 1. Using a custom task type, ScheduledFutureTask for
100     * tasks, even those that don't require scheduling (i.e.,
101     * those submitted using ExecutorService execute, not
102     * ScheduledExecutorService methods) which are treated as
103     * delayed tasks with a delay of zero.
104     *
105     * 2. Using a custom queue (DelayedWorkQueue), a variant of
106     * unbounded DelayQueue. The lack of capacity constraint and
107     * the fact that corePoolSize and maximumPoolSize are
108     * effectively identical simplifies some execution mechanics
109     * (see delayedExecute) compared to ThreadPoolExecutor.
110     *
111     * 3. Supporting optional run-after-shutdown parameters, which
112     * leads to overrides of shutdown methods to remove and cancel
113     * tasks that should NOT be run after shutdown, as well as
114     * different recheck logic when task (re)submission overlaps
115     * with a shutdown.
116     *
117     * 4. Task decoration methods to allow interception and
118     * instrumentation, which are needed because subclasses cannot
119     * otherwise override submit methods to get this effect. These
120     * don't have any impact on pool control logic though.
121     */
122    
123     /**
124     * False if should cancel/suppress periodic tasks on shutdown.
125     */
126     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
127    
128     /**
129     * False if should cancel non-periodic tasks on shutdown.
130     */
131     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
132    
133     /**
134     * True if ScheduledFutureTask.cancel should remove from queue
135     */
136     private volatile boolean removeOnCancel = false;
137    
138     /**
139     * Sequence number to break scheduling ties, and in turn to
140     * guarantee FIFO order among tied entries.
141     */
142     private static final AtomicLong sequencer = new AtomicLong();
143    
144     /**
145     * Returns current nanosecond time.
146     */
147     final long now() {
148     return System.nanoTime();
149     }
150    
151     private class ScheduledFutureTask<V>
152     extends FutureTask<V> implements RunnableScheduledFuture<V> {
153    
154     /** Sequence number to break ties FIFO */
155     private final long sequenceNumber;
156    
157     /** The time the task is enabled to execute in nanoTime units */
158     private long time;
159    
160     /**
161     * Period in nanoseconds for repeating tasks. A positive
162     * value indicates fixed-rate execution. A negative value
163     * indicates fixed-delay execution. A value of 0 indicates a
164     * non-repeating task.
165     */
166     private final long period;
167    
168     /** The actual task to be re-enqueued by reExecutePeriodic */
169     RunnableScheduledFuture<V> outerTask = this;
170    
171     /**
172     * Index into delay queue, to support faster cancellation.
173     */
174     int heapIndex;
175    
176     /**
177     * Creates a one-shot action with given nanoTime-based trigger time.
178     */
179     ScheduledFutureTask(Runnable r, V result, long ns) {
180     super(r, result);
181     this.time = ns;
182     this.period = 0;
183     this.sequenceNumber = sequencer.getAndIncrement();
184     }
185    
186     /**
187     * Creates a periodic action with given nano time and period.
188     */
189     ScheduledFutureTask(Runnable r, V result, long ns, long period) {
190     super(r, result);
191     this.time = ns;
192     this.period = period;
193     this.sequenceNumber = sequencer.getAndIncrement();
194     }
195    
196     /**
197     * Creates a one-shot action with given nanoTime-based trigger time.
198     */
199     ScheduledFutureTask(Callable<V> callable, long ns) {
200     super(callable);
201     this.time = ns;
202     this.period = 0;
203     this.sequenceNumber = sequencer.getAndIncrement();
204     }
205    
206     public long getDelay(TimeUnit unit) {
207     return unit.convert(time - now(), NANOSECONDS);
208     }
209    
210     public int compareTo(Delayed other) {
211     if (other == this) // compare zero if same object
212     return 0;
213     if (other instanceof ScheduledFutureTask) {
214     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
215     long diff = time - x.time;
216     if (diff < 0)
217     return -1;
218     else if (diff > 0)
219     return 1;
220     else if (sequenceNumber < x.sequenceNumber)
221     return -1;
222     else
223     return 1;
224     }
225     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
226     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
227     }
228    
229     /**
230     * Returns true if this is a periodic (not a one-shot) action.
231     *
232     * @return true if periodic
233     */
234     public boolean isPeriodic() {
235     return period != 0;
236     }
237    
238     /**
239     * Sets the next time to run for a periodic task.
240     */
241     private void setNextRunTime() {
242     long p = period;
243     if (p > 0)
244     time += p;
245     else
246     time = triggerTime(-p);
247     }
248    
249     public boolean cancel(boolean mayInterruptIfRunning) {
250     boolean cancelled = super.cancel(mayInterruptIfRunning);
251     if (cancelled && removeOnCancel && heapIndex >= 0)
252     remove(this);
253     return cancelled;
254     }
255    
256     /**
257     * Overrides FutureTask version so as to reset/requeue if periodic.
258     */
259     public void run() {
260     boolean periodic = isPeriodic();
261     if (!canRunInCurrentRunState(periodic))
262     cancel(false);
263     else if (!periodic)
264     ScheduledFutureTask.super.run();
265     else if (ScheduledFutureTask.super.runAndReset()) {
266     setNextRunTime();
267     reExecutePeriodic(outerTask);
268     }
269     }
270     }
271    
272     /**
273     * Returns true if can run a task given current run state
274     * and run-after-shutdown parameters.
275     *
276     * @param periodic true if this task periodic, false if delayed
277     */
278     boolean canRunInCurrentRunState(boolean periodic) {
279     return isRunningOrShutdown(periodic ?
280     continueExistingPeriodicTasksAfterShutdown :
281     executeExistingDelayedTasksAfterShutdown);
282     }
283    
284     /**
285     * Main execution method for delayed or periodic tasks. If pool
286     * is shut down, rejects the task. Otherwise adds task to queue
287     * and starts a thread, if necessary, to run it. (We cannot
288     * prestart the thread to run the task because the task (probably)
289 jsr166 1.2 * shouldn't be run yet.) If the pool is shut down while the task
290 dl 1.1 * is being added, cancel and remove it if required by state and
291     * run-after-shutdown parameters.
292     *
293     * @param task the task
294     */
295     private void delayedExecute(RunnableScheduledFuture<?> task) {
296     if (isShutdown())
297     reject(task);
298     else {
299     super.getQueue().add(task);
300     if (isShutdown() &&
301     !canRunInCurrentRunState(task.isPeriodic()) &&
302     remove(task))
303     task.cancel(false);
304     else
305     ensurePrestart();
306     }
307     }
308    
309     /**
310     * Requeues a periodic task unless current run state precludes it.
311     * Same idea as delayedExecute except drops task rather than rejecting.
312     *
313     * @param task the task
314     */
315     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
316     if (canRunInCurrentRunState(true)) {
317     super.getQueue().add(task);
318     if (!canRunInCurrentRunState(true) && remove(task))
319     task.cancel(false);
320     else
321     ensurePrestart();
322     }
323     }
324    
325     /**
326     * Cancels and clears the queue of all tasks that should not be run
327     * due to shutdown policy. Invoked within super.shutdown.
328     */
329     @Override void onShutdown() {
330     BlockingQueue<Runnable> q = super.getQueue();
331     boolean keepDelayed =
332     getExecuteExistingDelayedTasksAfterShutdownPolicy();
333     boolean keepPeriodic =
334     getContinueExistingPeriodicTasksAfterShutdownPolicy();
335     if (!keepDelayed && !keepPeriodic) {
336     for (Object e : q.toArray())
337     if (e instanceof RunnableScheduledFuture<?>)
338     ((RunnableScheduledFuture<?>) e).cancel(false);
339     q.clear();
340     }
341     else {
342     // Traverse snapshot to avoid iterator exceptions
343     for (Object e : q.toArray()) {
344     if (e instanceof RunnableScheduledFuture) {
345     RunnableScheduledFuture<?> t =
346     (RunnableScheduledFuture<?>)e;
347     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
348     t.isCancelled()) { // also remove if already cancelled
349     if (q.remove(t))
350     t.cancel(false);
351     }
352     }
353     }
354     }
355     tryTerminate();
356     }
357    
358     /**
359     * Modifies or replaces the task used to execute a runnable.
360     * This method can be used to override the concrete
361     * class used for managing internal tasks.
362     * The default implementation simply returns the given task.
363     *
364     * @param runnable the submitted Runnable
365     * @param task the task created to execute the runnable
366     * @return a task that can execute the runnable
367     * @since 1.6
368     */
369     protected <V> RunnableScheduledFuture<V> decorateTask(
370     Runnable runnable, RunnableScheduledFuture<V> task) {
371     return task;
372     }
373    
374     /**
375     * Modifies or replaces the task used to execute a callable.
376     * This method can be used to override the concrete
377     * class used for managing internal tasks.
378     * The default implementation simply returns the given task.
379     *
380     * @param callable the submitted Callable
381     * @param task the task created to execute the callable
382     * @return a task that can execute the callable
383     * @since 1.6
384     */
385     protected <V> RunnableScheduledFuture<V> decorateTask(
386     Callable<V> callable, RunnableScheduledFuture<V> task) {
387     return task;
388     }
389    
390     /**
391     * Creates a new {@code ScheduledThreadPoolExecutor} with the
392     * given core pool size.
393     *
394     * @param corePoolSize the number of threads to keep in the pool, even
395     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
396     * @throws IllegalArgumentException if {@code corePoolSize < 0}
397     */
398     public ScheduledThreadPoolExecutor(int corePoolSize) {
399     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
400     new DelayedWorkQueue());
401     }
402    
403     /**
404     * Creates a new {@code ScheduledThreadPoolExecutor} with the
405     * given initial parameters.
406     *
407     * @param corePoolSize the number of threads to keep in the pool, even
408     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
409     * @param threadFactory the factory to use when the executor
410     * creates a new thread
411     * @throws IllegalArgumentException if {@code corePoolSize < 0}
412     * @throws NullPointerException if {@code threadFactory} is null
413     */
414     public ScheduledThreadPoolExecutor(int corePoolSize,
415     ThreadFactory threadFactory) {
416     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
417     new DelayedWorkQueue(), threadFactory);
418     }
419    
420     /**
421     * Creates a new ScheduledThreadPoolExecutor with the given
422     * initial parameters.
423     *
424     * @param corePoolSize the number of threads to keep in the pool, even
425     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
426     * @param handler the handler to use when execution is blocked
427     * because the thread bounds and queue capacities are reached
428     * @throws IllegalArgumentException if {@code corePoolSize < 0}
429     * @throws NullPointerException if {@code handler} is null
430     */
431     public ScheduledThreadPoolExecutor(int corePoolSize,
432     RejectedExecutionHandler handler) {
433     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
434     new DelayedWorkQueue(), handler);
435     }
436    
437     /**
438     * Creates a new ScheduledThreadPoolExecutor with the given
439     * initial parameters.
440     *
441     * @param corePoolSize the number of threads to keep in the pool, even
442     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
443     * @param threadFactory the factory to use when the executor
444     * creates a new thread
445     * @param handler the handler to use when execution is blocked
446     * because the thread bounds and queue capacities are reached
447     * @throws IllegalArgumentException if {@code corePoolSize < 0}
448     * @throws NullPointerException if {@code threadFactory} or
449     * {@code handler} is null
450     */
451     public ScheduledThreadPoolExecutor(int corePoolSize,
452     ThreadFactory threadFactory,
453     RejectedExecutionHandler handler) {
454     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
455     new DelayedWorkQueue(), threadFactory, handler);
456     }
457    
458     /**
459     * Returns the trigger time of a delayed action.
460     */
461     private long triggerTime(long delay, TimeUnit unit) {
462     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
463     }
464    
465     /**
466     * Returns the trigger time of a delayed action.
467     */
468     long triggerTime(long delay) {
469     return now() +
470     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
471     }
472    
473     /**
474     * Constrains the values of all delays in the queue to be within
475     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
476     * This may occur if a task is eligible to be dequeued, but has
477     * not yet been, while some other task is added with a delay of
478     * Long.MAX_VALUE.
479     */
480     private long overflowFree(long delay) {
481     Delayed head = (Delayed) super.getQueue().peek();
482     if (head != null) {
483     long headDelay = head.getDelay(NANOSECONDS);
484     if (headDelay < 0 && (delay - headDelay < 0))
485     delay = Long.MAX_VALUE + headDelay;
486     }
487     return delay;
488     }
489    
490     /**
491     * @throws RejectedExecutionException {@inheritDoc}
492     * @throws NullPointerException {@inheritDoc}
493     */
494     public ScheduledFuture<?> schedule(Runnable command,
495     long delay,
496     TimeUnit unit) {
497     if (command == null || unit == null)
498     throw new NullPointerException();
499     RunnableScheduledFuture<?> t = decorateTask(command,
500     new ScheduledFutureTask<Void>(command, null,
501     triggerTime(delay, unit)));
502     delayedExecute(t);
503     return t;
504     }
505    
506     /**
507     * @throws RejectedExecutionException {@inheritDoc}
508     * @throws NullPointerException {@inheritDoc}
509     */
510     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
511     long delay,
512     TimeUnit unit) {
513     if (callable == null || unit == null)
514     throw new NullPointerException();
515     RunnableScheduledFuture<V> t = decorateTask(callable,
516     new ScheduledFutureTask<V>(callable,
517     triggerTime(delay, unit)));
518     delayedExecute(t);
519     return t;
520     }
521    
522     /**
523     * @throws RejectedExecutionException {@inheritDoc}
524     * @throws NullPointerException {@inheritDoc}
525     * @throws IllegalArgumentException {@inheritDoc}
526     */
527     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
528     long initialDelay,
529     long period,
530     TimeUnit unit) {
531     if (command == null || unit == null)
532     throw new NullPointerException();
533     if (period <= 0)
534     throw new IllegalArgumentException();
535     ScheduledFutureTask<Void> sft =
536     new ScheduledFutureTask<Void>(command,
537     null,
538     triggerTime(initialDelay, unit),
539     unit.toNanos(period));
540     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
541     sft.outerTask = t;
542     delayedExecute(t);
543     return t;
544     }
545    
546     /**
547     * @throws RejectedExecutionException {@inheritDoc}
548     * @throws NullPointerException {@inheritDoc}
549     * @throws IllegalArgumentException {@inheritDoc}
550     */
551     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
552     long initialDelay,
553     long delay,
554     TimeUnit unit) {
555     if (command == null || unit == null)
556     throw new NullPointerException();
557     if (delay <= 0)
558     throw new IllegalArgumentException();
559     ScheduledFutureTask<Void> sft =
560     new ScheduledFutureTask<Void>(command,
561     null,
562     triggerTime(initialDelay, unit),
563     unit.toNanos(-delay));
564     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
565     sft.outerTask = t;
566     delayedExecute(t);
567     return t;
568     }
569    
570     /**
571     * Executes {@code command} with zero required delay.
572     * This has effect equivalent to
573     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
574     * Note that inspections of the queue and of the list returned by
575     * {@code shutdownNow} will access the zero-delayed
576     * {@link ScheduledFuture}, not the {@code command} itself.
577     *
578     * <p>A consequence of the use of {@code ScheduledFuture} objects is
579     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
580     * called with a null second {@code Throwable} argument, even if the
581     * {@code command} terminated abruptly. Instead, the {@code Throwable}
582     * thrown by such a task can be obtained via {@link Future#get}.
583     *
584     * @throws RejectedExecutionException at discretion of
585     * {@code RejectedExecutionHandler}, if the task
586     * cannot be accepted for execution because the
587     * executor has been shut down
588     * @throws NullPointerException {@inheritDoc}
589     */
590     public void execute(Runnable command) {
591     schedule(command, 0, NANOSECONDS);
592     }
593    
594     // Override AbstractExecutorService methods
595    
596     /**
597     * @throws RejectedExecutionException {@inheritDoc}
598     * @throws NullPointerException {@inheritDoc}
599     */
600     public Future<?> submit(Runnable task) {
601     return schedule(task, 0, NANOSECONDS);
602     }
603    
604     /**
605     * @throws RejectedExecutionException {@inheritDoc}
606     * @throws NullPointerException {@inheritDoc}
607     */
608     public <T> Future<T> submit(Runnable task, T result) {
609     return schedule(Executors.callable(task, result), 0, NANOSECONDS);
610     }
611    
612     /**
613     * @throws RejectedExecutionException {@inheritDoc}
614     * @throws NullPointerException {@inheritDoc}
615     */
616     public <T> Future<T> submit(Callable<T> task) {
617     return schedule(task, 0, NANOSECONDS);
618     }
619    
620     /**
621     * Sets the policy on whether to continue executing existing
622     * periodic tasks even when this executor has been {@code shutdown}.
623     * In this case, these tasks will only terminate upon
624     * {@code shutdownNow} or after setting the policy to
625     * {@code false} when already shutdown.
626     * This value is by default {@code false}.
627     *
628 jsr166 1.3 * @param value if {@code true}, continue after shutdown, else don't
629 dl 1.1 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
630     */
631     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
632     continueExistingPeriodicTasksAfterShutdown = value;
633     if (!value && isShutdown())
634     onShutdown();
635     }
636    
637     /**
638     * Gets the policy on whether to continue executing existing
639     * periodic tasks even when this executor has been {@code shutdown}.
640     * In this case, these tasks will only terminate upon
641     * {@code shutdownNow} or after setting the policy to
642     * {@code false} when already shutdown.
643     * This value is by default {@code false}.
644     *
645     * @return {@code true} if will continue after shutdown
646     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
647     */
648     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
649     return continueExistingPeriodicTasksAfterShutdown;
650     }
651    
652     /**
653     * Sets the policy on whether to execute existing delayed
654     * tasks even when this executor has been {@code shutdown}.
655     * In this case, these tasks will only terminate upon
656     * {@code shutdownNow}, or after setting the policy to
657     * {@code false} when already shutdown.
658     * This value is by default {@code true}.
659     *
660 jsr166 1.3 * @param value if {@code true}, execute after shutdown, else don't
661 dl 1.1 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
662     */
663     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
664     executeExistingDelayedTasksAfterShutdown = value;
665     if (!value && isShutdown())
666     onShutdown();
667     }
668    
669     /**
670     * Gets the policy on whether to execute existing delayed
671     * tasks even when this executor has been {@code shutdown}.
672     * In this case, these tasks will only terminate upon
673     * {@code shutdownNow}, or after setting the policy to
674     * {@code false} when already shutdown.
675     * This value is by default {@code true}.
676     *
677     * @return {@code true} if will execute after shutdown
678     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
679     */
680     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
681     return executeExistingDelayedTasksAfterShutdown;
682     }
683    
684     /**
685     * Sets the policy on whether cancelled tasks should be immediately
686     * removed from the work queue at time of cancellation. This value is
687     * by default {@code false}.
688     *
689     * @param value if {@code true}, remove on cancellation, else don't
690     * @see #getRemoveOnCancelPolicy
691     * @since 1.7
692     */
693     public void setRemoveOnCancelPolicy(boolean value) {
694     removeOnCancel = value;
695     }
696    
697     /**
698     * Gets the policy on whether cancelled tasks should be immediately
699     * removed from the work queue at time of cancellation. This value is
700     * by default {@code false}.
701     *
702     * @return {@code true} if cancelled tasks are immediately removed
703     * from the queue
704     * @see #setRemoveOnCancelPolicy
705     * @since 1.7
706     */
707     public boolean getRemoveOnCancelPolicy() {
708     return removeOnCancel;
709     }
710    
711     /**
712     * Initiates an orderly shutdown in which previously submitted
713     * tasks are executed, but no new tasks will be accepted.
714     * Invocation has no additional effect if already shut down.
715     *
716     * <p>This method does not wait for previously submitted tasks to
717     * complete execution. Use {@link #awaitTermination awaitTermination}
718     * to do that.
719     *
720     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
721     * has been set {@code false}, existing delayed tasks whose delays
722     * have not yet elapsed are cancelled. And unless the {@code
723     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
724     * {@code true}, future executions of existing periodic tasks will
725     * be cancelled.
726     *
727     * @throws SecurityException {@inheritDoc}
728     */
729     public void shutdown() {
730     super.shutdown();
731     }
732    
733     /**
734     * Attempts to stop all actively executing tasks, halts the
735     * processing of waiting tasks, and returns a list of the tasks
736     * that were awaiting execution.
737     *
738     * <p>This method does not wait for actively executing tasks to
739     * terminate. Use {@link #awaitTermination awaitTermination} to
740     * do that.
741     *
742     * <p>There are no guarantees beyond best-effort attempts to stop
743     * processing actively executing tasks. This implementation
744     * cancels tasks via {@link Thread#interrupt}, so any task that
745     * fails to respond to interrupts may never terminate.
746     *
747     * @return list of tasks that never commenced execution.
748     * Each element of this list is a {@link ScheduledFuture},
749     * including those tasks submitted using {@code execute},
750     * which are for scheduling purposes used as the basis of a
751     * zero-delay {@code ScheduledFuture}.
752     * @throws SecurityException {@inheritDoc}
753     */
754     public List<Runnable> shutdownNow() {
755     return super.shutdownNow();
756     }
757    
758     /**
759     * Returns the task queue used by this executor. Each element of
760     * this queue is a {@link ScheduledFuture}, including those
761     * tasks submitted using {@code execute} which are for scheduling
762     * purposes used as the basis of a zero-delay
763     * {@code ScheduledFuture}. Iteration over this queue is
764     * <em>not</em> guaranteed to traverse tasks in the order in
765     * which they will execute.
766     *
767     * @return the task queue
768     */
769     public BlockingQueue<Runnable> getQueue() {
770     return super.getQueue();
771     }
772    
773     /**
774     * Specialized delay queue. To mesh with TPE declarations, this
775     * class must be declared as a BlockingQueue<Runnable> even though
776     * it can only hold RunnableScheduledFutures.
777     */
778     static class DelayedWorkQueue extends AbstractQueue<Runnable>
779     implements BlockingQueue<Runnable> {
780    
781     /*
782     * A DelayedWorkQueue is based on a heap-based data structure
783     * like those in DelayQueue and PriorityQueue, except that
784     * every ScheduledFutureTask also records its index into the
785     * heap array. This eliminates the need to find a task upon
786     * cancellation, greatly speeding up removal (down from O(n)
787     * to O(log n)), and reducing garbage retention that would
788     * otherwise occur by waiting for the element to rise to top
789     * before clearing. But because the queue may also hold
790     * RunnableScheduledFutures that are not ScheduledFutureTasks,
791     * we are not guaranteed to have such indices available, in
792     * which case we fall back to linear search. (We expect that
793     * most tasks will not be decorated, and that the faster cases
794     * will be much more common.)
795     *
796     * All heap operations must record index changes -- mainly
797     * within siftUp and siftDown. Upon removal, a task's
798     * heapIndex is set to -1. Note that ScheduledFutureTasks can
799     * appear at most once in the queue (this need not be true for
800     * other kinds of tasks or work queues), so are uniquely
801     * identified by heapIndex.
802     */
803    
804     private static final int INITIAL_CAPACITY = 16;
805     private RunnableScheduledFuture<?>[] queue =
806     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
807     private final ReentrantLock lock = new ReentrantLock();
808     private int size = 0;
809    
810     /**
811     * Thread designated to wait for the task at the head of the
812     * queue. This variant of the Leader-Follower pattern
813     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
814     * minimize unnecessary timed waiting. When a thread becomes
815     * the leader, it waits only for the next delay to elapse, but
816     * other threads await indefinitely. The leader thread must
817     * signal some other thread before returning from take() or
818     * poll(...), unless some other thread becomes leader in the
819     * interim. Whenever the head of the queue is replaced with a
820     * task with an earlier expiration time, the leader field is
821     * invalidated by being reset to null, and some waiting
822     * thread, but not necessarily the current leader, is
823     * signalled. So waiting threads must be prepared to acquire
824     * and lose leadership while waiting.
825     */
826     private Thread leader = null;
827    
828     /**
829     * Condition signalled when a newer task becomes available at the
830     * head of the queue or a new thread may need to become leader.
831     */
832     private final Condition available = lock.newCondition();
833    
834     /**
835     * Sets f's heapIndex if it is a ScheduledFutureTask.
836     */
837     private void setIndex(RunnableScheduledFuture<?> f, int idx) {
838     if (f instanceof ScheduledFutureTask)
839     ((ScheduledFutureTask)f).heapIndex = idx;
840     }
841    
842     /**
843     * Sifts element added at bottom up to its heap-ordered spot.
844     * Call only when holding lock.
845     */
846     private void siftUp(int k, RunnableScheduledFuture<?> key) {
847     while (k > 0) {
848     int parent = (k - 1) >>> 1;
849     RunnableScheduledFuture<?> e = queue[parent];
850     if (key.compareTo(e) >= 0)
851     break;
852     queue[k] = e;
853     setIndex(e, k);
854     k = parent;
855     }
856     queue[k] = key;
857     setIndex(key, k);
858     }
859    
860     /**
861     * Sifts element added at top down to its heap-ordered spot.
862     * Call only when holding lock.
863     */
864     private void siftDown(int k, RunnableScheduledFuture<?> key) {
865     int half = size >>> 1;
866     while (k < half) {
867     int child = (k << 1) + 1;
868     RunnableScheduledFuture<?> c = queue[child];
869     int right = child + 1;
870     if (right < size && c.compareTo(queue[right]) > 0)
871     c = queue[child = right];
872     if (key.compareTo(c) <= 0)
873     break;
874     queue[k] = c;
875     setIndex(c, k);
876     k = child;
877     }
878     queue[k] = key;
879     setIndex(key, k);
880     }
881    
882     /**
883     * Resizes the heap array. Call only when holding lock.
884     */
885     private void grow() {
886     int oldCapacity = queue.length;
887     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
888     if (newCapacity < 0) // overflow
889     newCapacity = Integer.MAX_VALUE;
890     queue = Arrays.copyOf(queue, newCapacity);
891     }
892    
893     /**
894     * Finds index of given object, or -1 if absent.
895     */
896     private int indexOf(Object x) {
897     if (x != null) {
898     if (x instanceof ScheduledFutureTask) {
899     int i = ((ScheduledFutureTask) x).heapIndex;
900     // Sanity check; x could conceivably be a
901     // ScheduledFutureTask from some other pool.
902     if (i >= 0 && i < size && queue[i] == x)
903     return i;
904     } else {
905     for (int i = 0; i < size; i++)
906     if (x.equals(queue[i]))
907     return i;
908     }
909     }
910     return -1;
911     }
912    
913     public boolean contains(Object x) {
914     final ReentrantLock lock = this.lock;
915     lock.lock();
916     try {
917     return indexOf(x) != -1;
918     } finally {
919     lock.unlock();
920     }
921     }
922    
923     public boolean remove(Object x) {
924     final ReentrantLock lock = this.lock;
925     lock.lock();
926     try {
927     int i = indexOf(x);
928     if (i < 0)
929     return false;
930    
931     setIndex(queue[i], -1);
932     int s = --size;
933     RunnableScheduledFuture<?> replacement = queue[s];
934     queue[s] = null;
935     if (s != i) {
936     siftDown(i, replacement);
937     if (queue[i] == replacement)
938     siftUp(i, replacement);
939     }
940     return true;
941     } finally {
942     lock.unlock();
943     }
944     }
945    
946     public int size() {
947     final ReentrantLock lock = this.lock;
948     lock.lock();
949     try {
950     return size;
951     } finally {
952     lock.unlock();
953     }
954     }
955    
956     public boolean isEmpty() {
957     return size() == 0;
958     }
959    
960     public int remainingCapacity() {
961     return Integer.MAX_VALUE;
962     }
963    
964     public RunnableScheduledFuture<?> peek() {
965     final ReentrantLock lock = this.lock;
966     lock.lock();
967     try {
968     return queue[0];
969     } finally {
970     lock.unlock();
971     }
972     }
973    
974     public boolean offer(Runnable x) {
975     if (x == null)
976     throw new NullPointerException();
977     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
978     final ReentrantLock lock = this.lock;
979     lock.lock();
980     try {
981     int i = size;
982     if (i >= queue.length)
983     grow();
984     size = i + 1;
985     if (i == 0) {
986     queue[0] = e;
987     setIndex(e, 0);
988     } else {
989     siftUp(i, e);
990     }
991     if (queue[0] == e) {
992     leader = null;
993     available.signal();
994     }
995     } finally {
996     lock.unlock();
997     }
998     return true;
999     }
1000    
1001     public void put(Runnable e) {
1002     offer(e);
1003     }
1004    
1005     public boolean add(Runnable e) {
1006     return offer(e);
1007     }
1008    
1009     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1010     return offer(e);
1011     }
1012    
1013     /**
1014     * Performs common bookkeeping for poll and take: Replaces
1015     * first element with last and sifts it down. Call only when
1016     * holding lock.
1017     * @param f the task to remove and return
1018     */
1019     private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1020     int s = --size;
1021     RunnableScheduledFuture<?> x = queue[s];
1022     queue[s] = null;
1023     if (s != 0)
1024     siftDown(0, x);
1025     setIndex(f, -1);
1026     return f;
1027     }
1028    
1029     public RunnableScheduledFuture<?> poll() {
1030     final ReentrantLock lock = this.lock;
1031     lock.lock();
1032     try {
1033     RunnableScheduledFuture<?> first = queue[0];
1034     if (first == null || first.getDelay(NANOSECONDS) > 0)
1035     return null;
1036     else
1037     return finishPoll(first);
1038     } finally {
1039     lock.unlock();
1040     }
1041     }
1042    
1043     public RunnableScheduledFuture<?> take() throws InterruptedException {
1044     final ReentrantLock lock = this.lock;
1045     lock.lockInterruptibly();
1046     try {
1047     for (;;) {
1048     RunnableScheduledFuture<?> first = queue[0];
1049     if (first == null)
1050     available.await();
1051     else {
1052     long delay = first.getDelay(NANOSECONDS);
1053     if (delay <= 0)
1054     return finishPoll(first);
1055     else if (leader != null)
1056     available.await();
1057     else {
1058     Thread thisThread = Thread.currentThread();
1059     leader = thisThread;
1060     try {
1061     available.awaitNanos(delay);
1062     } finally {
1063     if (leader == thisThread)
1064     leader = null;
1065     }
1066     }
1067     }
1068     }
1069     } finally {
1070     if (leader == null && queue[0] != null)
1071     available.signal();
1072     lock.unlock();
1073     }
1074     }
1075    
1076     public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1077     throws InterruptedException {
1078     long nanos = unit.toNanos(timeout);
1079     final ReentrantLock lock = this.lock;
1080     lock.lockInterruptibly();
1081     try {
1082     for (;;) {
1083     RunnableScheduledFuture<?> first = queue[0];
1084     if (first == null) {
1085     if (nanos <= 0)
1086     return null;
1087     else
1088     nanos = available.awaitNanos(nanos);
1089     } else {
1090     long delay = first.getDelay(NANOSECONDS);
1091     if (delay <= 0)
1092     return finishPoll(first);
1093     if (nanos <= 0)
1094     return null;
1095     if (nanos < delay || leader != null)
1096     nanos = available.awaitNanos(nanos);
1097     else {
1098     Thread thisThread = Thread.currentThread();
1099     leader = thisThread;
1100     try {
1101     long timeLeft = available.awaitNanos(delay);
1102     nanos -= delay - timeLeft;
1103     } finally {
1104     if (leader == thisThread)
1105     leader = null;
1106     }
1107     }
1108     }
1109     }
1110     } finally {
1111     if (leader == null && queue[0] != null)
1112     available.signal();
1113     lock.unlock();
1114     }
1115     }
1116    
1117     public void clear() {
1118     final ReentrantLock lock = this.lock;
1119     lock.lock();
1120     try {
1121     for (int i = 0; i < size; i++) {
1122     RunnableScheduledFuture<?> t = queue[i];
1123     if (t != null) {
1124     queue[i] = null;
1125     setIndex(t, -1);
1126     }
1127     }
1128     size = 0;
1129     } finally {
1130     lock.unlock();
1131     }
1132     }
1133    
1134     /**
1135     * Returns first element only if it is expired.
1136     * Used only by drainTo. Call only when holding lock.
1137     */
1138     private RunnableScheduledFuture<?> peekExpired() {
1139     // assert lock.isHeldByCurrentThread();
1140     RunnableScheduledFuture<?> first = queue[0];
1141     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1142     null : first;
1143     }
1144    
1145     public int drainTo(Collection<? super Runnable> c) {
1146     if (c == null)
1147     throw new NullPointerException();
1148     if (c == this)
1149     throw new IllegalArgumentException();
1150     final ReentrantLock lock = this.lock;
1151     lock.lock();
1152     try {
1153     RunnableScheduledFuture<?> first;
1154     int n = 0;
1155     while ((first = peekExpired()) != null) {
1156     c.add(first); // In this order, in case add() throws.
1157     finishPoll(first);
1158     ++n;
1159     }
1160     return n;
1161     } finally {
1162     lock.unlock();
1163     }
1164     }
1165    
1166     public int drainTo(Collection<? super Runnable> c, int maxElements) {
1167     if (c == null)
1168     throw new NullPointerException();
1169     if (c == this)
1170     throw new IllegalArgumentException();
1171     if (maxElements <= 0)
1172     return 0;
1173     final ReentrantLock lock = this.lock;
1174     lock.lock();
1175     try {
1176     RunnableScheduledFuture<?> first;
1177     int n = 0;
1178     while (n < maxElements && (first = peekExpired()) != null) {
1179     c.add(first); // In this order, in case add() throws.
1180     finishPoll(first);
1181     ++n;
1182     }
1183     return n;
1184     } finally {
1185     lock.unlock();
1186     }
1187     }
1188    
1189     public Object[] toArray() {
1190     final ReentrantLock lock = this.lock;
1191     lock.lock();
1192     try {
1193     return Arrays.copyOf(queue, size, Object[].class);
1194     } finally {
1195     lock.unlock();
1196     }
1197     }
1198    
1199     @SuppressWarnings("unchecked")
1200     public <T> T[] toArray(T[] a) {
1201     final ReentrantLock lock = this.lock;
1202     lock.lock();
1203     try {
1204     if (a.length < size)
1205     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1206     System.arraycopy(queue, 0, a, 0, size);
1207     if (a.length > size)
1208     a[size] = null;
1209     return a;
1210     } finally {
1211     lock.unlock();
1212     }
1213     }
1214    
1215     public Iterator<Runnable> iterator() {
1216     return new Itr(Arrays.copyOf(queue, size));
1217     }
1218    
1219     /**
1220     * Snapshot iterator that works off copy of underlying q array.
1221     */
1222     private class Itr implements Iterator<Runnable> {
1223     final RunnableScheduledFuture[] array;
1224     int cursor = 0; // index of next element to return
1225     int lastRet = -1; // index of last element, or -1 if no such
1226    
1227     Itr(RunnableScheduledFuture[] array) {
1228     this.array = array;
1229     }
1230    
1231     public boolean hasNext() {
1232     return cursor < array.length;
1233     }
1234    
1235     public Runnable next() {
1236     if (cursor >= array.length)
1237     throw new NoSuchElementException();
1238     lastRet = cursor;
1239     return array[cursor++];
1240     }
1241    
1242     public void remove() {
1243     if (lastRet < 0)
1244     throw new IllegalStateException();
1245     DelayedWorkQueue.this.remove(array[lastRet]);
1246     lastRet = -1;
1247     }
1248     }
1249     }
1250     }