ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.8
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.7: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8 jsr166 1.8
9 dl 1.1 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 jsr166 1.7 import static java.util.concurrent.TimeUnit.MILLISECONDS;
11 dl 1.1 import java.util.concurrent.atomic.AtomicLong;
12     import java.util.concurrent.locks.Condition;
13     import java.util.concurrent.locks.ReentrantLock;
14     import java.util.*;
15    
16     /**
17     * A {@link ThreadPoolExecutor} that can additionally schedule
18 jsr166 1.7 * commands to run after a given delay, or to execute periodically.
19     * This class is preferable to {@link java.util.Timer} when multiple
20     * worker threads are needed, or when the additional flexibility or
21     * capabilities of {@link ThreadPoolExecutor} (which this class
22     * extends) are required.
23 dl 1.1 *
24     * <p>Delayed tasks execute no sooner than they are enabled, but
25     * without any real-time guarantees about when, after they are
26     * enabled, they will commence. Tasks scheduled for exactly the same
27     * execution time are enabled in first-in-first-out (FIFO) order of
28     * submission.
29     *
30     * <p>When a submitted task is cancelled before it is run, execution
31 jsr166 1.7 * is suppressed. By default, such a cancelled task is not
32     * automatically removed from the work queue until its delay elapses.
33     * While this enables further inspection and monitoring, it may also
34     * cause unbounded retention of cancelled tasks. To avoid this, use
35     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
36     * removed from the work queue at time of cancellation.
37 dl 1.1 *
38 jsr166 1.7 * <p>Successive executions of a periodic task scheduled via
39     * {@link #scheduleAtFixedRate} or
40     * {@link #scheduleWithFixedDelay} do not overlap. While different
41 dl 1.1 * executions may be performed by different threads, the effects of
42     * prior executions <a
43     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
44     * those of subsequent ones.
45     *
46     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
47     * of the inherited tuning methods are not useful for it. In
48     * particular, because it acts as a fixed-sized pool using
49     * {@code corePoolSize} threads and an unbounded queue, adjustments
50     * to {@code maximumPoolSize} have no useful effect. Additionally, it
51     * is almost never a good idea to set {@code corePoolSize} to zero or
52     * use {@code allowCoreThreadTimeOut} because this may leave the pool
53     * without threads to handle tasks once they become eligible to run.
54     *
55     * <p><b>Extension notes:</b> This class overrides the
56 jsr166 1.4 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
57 dl 1.1 * {@link AbstractExecutorService#submit(Runnable) submit}
58     * methods to generate internal {@link ScheduledFuture} objects to
59     * control per-task delays and scheduling. To preserve
60     * functionality, any further overrides of these methods in
61     * subclasses must invoke superclass versions, which effectively
62     * disables additional task customization. However, this class
63     * provides alternative protected extension method
64     * {@code decorateTask} (one version each for {@code Runnable} and
65     * {@code Callable}) that can be used to customize the concrete task
66     * types used to execute commands entered via {@code execute},
67     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
68     * and {@code scheduleWithFixedDelay}. By default, a
69     * {@code ScheduledThreadPoolExecutor} uses a task type extending
70     * {@link FutureTask}. However, this may be modified or replaced using
71     * subclasses of the form:
72     *
73     * <pre> {@code
74     * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
75     *
76     * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
77     *
78     * protected <V> RunnableScheduledFuture<V> decorateTask(
79     * Runnable r, RunnableScheduledFuture<V> task) {
80     * return new CustomTask<V>(r, task);
81     * }
82     *
83     * protected <V> RunnableScheduledFuture<V> decorateTask(
84     * Callable<V> c, RunnableScheduledFuture<V> task) {
85     * return new CustomTask<V>(c, task);
86     * }
87     * // ... add constructors, etc.
88     * }}</pre>
89     *
90     * @since 1.5
91     * @author Doug Lea
92     */
93     public class ScheduledThreadPoolExecutor
94     extends ThreadPoolExecutor
95     implements ScheduledExecutorService {
96    
97     /*
98     * This class specializes ThreadPoolExecutor implementation by
99     *
100     * 1. Using a custom task type, ScheduledFutureTask for
101     * tasks, even those that don't require scheduling (i.e.,
102     * those submitted using ExecutorService execute, not
103     * ScheduledExecutorService methods) which are treated as
104     * delayed tasks with a delay of zero.
105     *
106     * 2. Using a custom queue (DelayedWorkQueue), a variant of
107     * unbounded DelayQueue. The lack of capacity constraint and
108     * the fact that corePoolSize and maximumPoolSize are
109     * effectively identical simplifies some execution mechanics
110     * (see delayedExecute) compared to ThreadPoolExecutor.
111     *
112     * 3. Supporting optional run-after-shutdown parameters, which
113     * leads to overrides of shutdown methods to remove and cancel
114     * tasks that should NOT be run after shutdown, as well as
115     * different recheck logic when task (re)submission overlaps
116     * with a shutdown.
117     *
118     * 4. Task decoration methods to allow interception and
119     * instrumentation, which are needed because subclasses cannot
120     * otherwise override submit methods to get this effect. These
121     * don't have any impact on pool control logic though.
122     */
123    
124     /**
125     * False if should cancel/suppress periodic tasks on shutdown.
126     */
127     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
128    
129     /**
130     * False if should cancel non-periodic tasks on shutdown.
131     */
132     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
133    
134     /**
135 jsr166 1.7 * True if ScheduledFutureTask.cancel should remove from queue.
136 dl 1.1 */
137     private volatile boolean removeOnCancel = false;
138    
139     /**
140     * Sequence number to break scheduling ties, and in turn to
141     * guarantee FIFO order among tied entries.
142     */
143     private static final AtomicLong sequencer = new AtomicLong();
144    
145     /**
146     * Returns current nanosecond time.
147     */
148     final long now() {
149     return System.nanoTime();
150     }
151    
152     private class ScheduledFutureTask<V>
153     extends FutureTask<V> implements RunnableScheduledFuture<V> {
154    
155     /** Sequence number to break ties FIFO */
156     private final long sequenceNumber;
157    
158     /** The time the task is enabled to execute in nanoTime units */
159     private long time;
160    
161     /**
162 jsr166 1.7 * Period in nanoseconds for repeating tasks.
163     * A positive value indicates fixed-rate execution.
164     * A negative value indicates fixed-delay execution.
165     * A value of 0 indicates a non-repeating (one-shot) task.
166 dl 1.1 */
167     private final long period;
168    
169     /** The actual task to be re-enqueued by reExecutePeriodic */
170     RunnableScheduledFuture<V> outerTask = this;
171    
172     /**
173     * Index into delay queue, to support faster cancellation.
174     */
175     int heapIndex;
176    
177     /**
178     * Creates a one-shot action with given nanoTime-based trigger time.
179     */
180 jsr166 1.7 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
181 dl 1.1 super(r, result);
182 jsr166 1.7 this.time = triggerTime;
183 dl 1.1 this.period = 0;
184     this.sequenceNumber = sequencer.getAndIncrement();
185     }
186    
187     /**
188 jsr166 1.7 * Creates a periodic action with given nanoTime-based initial
189     * trigger time and period.
190 dl 1.1 */
191 jsr166 1.7 ScheduledFutureTask(Runnable r, V result, long triggerTime,
192     long period) {
193 dl 1.1 super(r, result);
194 jsr166 1.7 this.time = triggerTime;
195 dl 1.1 this.period = period;
196     this.sequenceNumber = sequencer.getAndIncrement();
197     }
198    
199     /**
200     * Creates a one-shot action with given nanoTime-based trigger time.
201     */
202 jsr166 1.7 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
203 dl 1.1 super(callable);
204 jsr166 1.7 this.time = triggerTime;
205 dl 1.1 this.period = 0;
206     this.sequenceNumber = sequencer.getAndIncrement();
207     }
208    
209     public long getDelay(TimeUnit unit) {
210     return unit.convert(time - now(), NANOSECONDS);
211     }
212    
213     public int compareTo(Delayed other) {
214     if (other == this) // compare zero if same object
215     return 0;
216     if (other instanceof ScheduledFutureTask) {
217     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
218     long diff = time - x.time;
219     if (diff < 0)
220     return -1;
221     else if (diff > 0)
222     return 1;
223     else if (sequenceNumber < x.sequenceNumber)
224     return -1;
225     else
226     return 1;
227     }
228     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
229     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
230     }
231    
232     /**
233 jsr166 1.4 * Returns {@code true} if this is a periodic (not a one-shot) action.
234 dl 1.1 *
235 jsr166 1.4 * @return {@code true} if periodic
236 dl 1.1 */
237     public boolean isPeriodic() {
238     return period != 0;
239     }
240    
241     /**
242     * Sets the next time to run for a periodic task.
243     */
244     private void setNextRunTime() {
245     long p = period;
246     if (p > 0)
247     time += p;
248     else
249     time = triggerTime(-p);
250     }
251    
252     public boolean cancel(boolean mayInterruptIfRunning) {
253     boolean cancelled = super.cancel(mayInterruptIfRunning);
254     if (cancelled && removeOnCancel && heapIndex >= 0)
255     remove(this);
256     return cancelled;
257     }
258    
259     /**
260     * Overrides FutureTask version so as to reset/requeue if periodic.
261     */
262     public void run() {
263     boolean periodic = isPeriodic();
264     if (!canRunInCurrentRunState(periodic))
265     cancel(false);
266     else if (!periodic)
267     ScheduledFutureTask.super.run();
268     else if (ScheduledFutureTask.super.runAndReset()) {
269     setNextRunTime();
270     reExecutePeriodic(outerTask);
271     }
272     }
273     }
274    
275     /**
276     * Returns true if can run a task given current run state
277     * and run-after-shutdown parameters.
278     *
279     * @param periodic true if this task periodic, false if delayed
280     */
281     boolean canRunInCurrentRunState(boolean periodic) {
282     return isRunningOrShutdown(periodic ?
283     continueExistingPeriodicTasksAfterShutdown :
284     executeExistingDelayedTasksAfterShutdown);
285     }
286    
287     /**
288     * Main execution method for delayed or periodic tasks. If pool
289     * is shut down, rejects the task. Otherwise adds task to queue
290     * and starts a thread, if necessary, to run it. (We cannot
291     * prestart the thread to run the task because the task (probably)
292 jsr166 1.2 * shouldn't be run yet.) If the pool is shut down while the task
293 dl 1.1 * is being added, cancel and remove it if required by state and
294     * run-after-shutdown parameters.
295     *
296     * @param task the task
297     */
298     private void delayedExecute(RunnableScheduledFuture<?> task) {
299     if (isShutdown())
300     reject(task);
301     else {
302     super.getQueue().add(task);
303     if (isShutdown() &&
304     !canRunInCurrentRunState(task.isPeriodic()) &&
305     remove(task))
306     task.cancel(false);
307     else
308     ensurePrestart();
309     }
310     }
311    
312     /**
313     * Requeues a periodic task unless current run state precludes it.
314     * Same idea as delayedExecute except drops task rather than rejecting.
315     *
316     * @param task the task
317     */
318     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
319     if (canRunInCurrentRunState(true)) {
320     super.getQueue().add(task);
321     if (!canRunInCurrentRunState(true) && remove(task))
322     task.cancel(false);
323     else
324     ensurePrestart();
325     }
326     }
327    
328     /**
329     * Cancels and clears the queue of all tasks that should not be run
330     * due to shutdown policy. Invoked within super.shutdown.
331     */
332     @Override void onShutdown() {
333     BlockingQueue<Runnable> q = super.getQueue();
334     boolean keepDelayed =
335     getExecuteExistingDelayedTasksAfterShutdownPolicy();
336     boolean keepPeriodic =
337     getContinueExistingPeriodicTasksAfterShutdownPolicy();
338     if (!keepDelayed && !keepPeriodic) {
339     for (Object e : q.toArray())
340     if (e instanceof RunnableScheduledFuture<?>)
341     ((RunnableScheduledFuture<?>) e).cancel(false);
342     q.clear();
343     }
344     else {
345     // Traverse snapshot to avoid iterator exceptions
346     for (Object e : q.toArray()) {
347     if (e instanceof RunnableScheduledFuture) {
348     RunnableScheduledFuture<?> t =
349     (RunnableScheduledFuture<?>)e;
350     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
351     t.isCancelled()) { // also remove if already cancelled
352     if (q.remove(t))
353     t.cancel(false);
354     }
355     }
356     }
357     }
358     tryTerminate();
359     }
360    
361     /**
362     * Modifies or replaces the task used to execute a runnable.
363     * This method can be used to override the concrete
364     * class used for managing internal tasks.
365     * The default implementation simply returns the given task.
366     *
367     * @param runnable the submitted Runnable
368     * @param task the task created to execute the runnable
369 jsr166 1.5 * @param <V> the type of the task's result
370 dl 1.1 * @return a task that can execute the runnable
371     * @since 1.6
372     */
373     protected <V> RunnableScheduledFuture<V> decorateTask(
374     Runnable runnable, RunnableScheduledFuture<V> task) {
375     return task;
376     }
377    
378     /**
379     * Modifies or replaces the task used to execute a callable.
380     * This method can be used to override the concrete
381     * class used for managing internal tasks.
382     * The default implementation simply returns the given task.
383     *
384     * @param callable the submitted Callable
385     * @param task the task created to execute the callable
386 jsr166 1.5 * @param <V> the type of the task's result
387 dl 1.1 * @return a task that can execute the callable
388     * @since 1.6
389     */
390     protected <V> RunnableScheduledFuture<V> decorateTask(
391     Callable<V> callable, RunnableScheduledFuture<V> task) {
392     return task;
393     }
394    
395     /**
396 jsr166 1.7 * The default keep-alive time for pool threads.
397     *
398     * Normally, this value is unused because all pool threads will be
399     * core threads, but if a user creates a pool with a corePoolSize
400     * of zero (against our advice), we keep a thread alive as long as
401     * there are queued tasks. If the keep alive time is zero (the
402     * historic value), we end up hot-spinning in getTask, wasting a
403     * CPU. But on the other hand, if we set the value too high, and
404     * users create a one-shot pool which they don't cleanly shutdown,
405     * the pool's non-daemon threads will prevent JVM termination. A
406     * small but non-zero value (relative to a JVM's lifetime) seems
407     * best.
408     */
409     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
410    
411     /**
412 dl 1.1 * Creates a new {@code ScheduledThreadPoolExecutor} with the
413     * given core pool size.
414     *
415     * @param corePoolSize the number of threads to keep in the pool, even
416     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
417     * @throws IllegalArgumentException if {@code corePoolSize < 0}
418     */
419     public ScheduledThreadPoolExecutor(int corePoolSize) {
420 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
421     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
422 dl 1.1 new DelayedWorkQueue());
423     }
424    
425     /**
426     * Creates a new {@code ScheduledThreadPoolExecutor} with the
427     * given initial parameters.
428     *
429     * @param corePoolSize the number of threads to keep in the pool, even
430     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431     * @param threadFactory the factory to use when the executor
432     * creates a new thread
433     * @throws IllegalArgumentException if {@code corePoolSize < 0}
434     * @throws NullPointerException if {@code threadFactory} is null
435     */
436     public ScheduledThreadPoolExecutor(int corePoolSize,
437     ThreadFactory threadFactory) {
438 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
439     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
440 dl 1.1 new DelayedWorkQueue(), threadFactory);
441     }
442    
443     /**
444 jsr166 1.7 * Creates a new {@code ScheduledThreadPoolExecutor} with the
445     * given initial parameters.
446 dl 1.1 *
447     * @param corePoolSize the number of threads to keep in the pool, even
448     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
449     * @param handler the handler to use when execution is blocked
450     * because the thread bounds and queue capacities are reached
451     * @throws IllegalArgumentException if {@code corePoolSize < 0}
452     * @throws NullPointerException if {@code handler} is null
453     */
454     public ScheduledThreadPoolExecutor(int corePoolSize,
455     RejectedExecutionHandler handler) {
456 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
457     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
458 dl 1.1 new DelayedWorkQueue(), handler);
459     }
460    
461     /**
462 jsr166 1.7 * Creates a new {@code ScheduledThreadPoolExecutor} with the
463     * given initial parameters.
464 dl 1.1 *
465     * @param corePoolSize the number of threads to keep in the pool, even
466     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
467     * @param threadFactory the factory to use when the executor
468     * creates a new thread
469     * @param handler the handler to use when execution is blocked
470     * because the thread bounds and queue capacities are reached
471     * @throws IllegalArgumentException if {@code corePoolSize < 0}
472     * @throws NullPointerException if {@code threadFactory} or
473     * {@code handler} is null
474     */
475     public ScheduledThreadPoolExecutor(int corePoolSize,
476     ThreadFactory threadFactory,
477     RejectedExecutionHandler handler) {
478 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
479     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
480 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
481     }
482    
483     /**
484 jsr166 1.7 * Returns the nanoTime-based trigger time of a delayed action.
485 dl 1.1 */
486     private long triggerTime(long delay, TimeUnit unit) {
487     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
488     }
489    
490     /**
491 jsr166 1.7 * Returns the nanoTime-based trigger time of a delayed action.
492 dl 1.1 */
493     long triggerTime(long delay) {
494     return now() +
495     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
496     }
497    
498     /**
499     * Constrains the values of all delays in the queue to be within
500     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
501     * This may occur if a task is eligible to be dequeued, but has
502     * not yet been, while some other task is added with a delay of
503     * Long.MAX_VALUE.
504     */
505     private long overflowFree(long delay) {
506     Delayed head = (Delayed) super.getQueue().peek();
507     if (head != null) {
508     long headDelay = head.getDelay(NANOSECONDS);
509     if (headDelay < 0 && (delay - headDelay < 0))
510     delay = Long.MAX_VALUE + headDelay;
511     }
512     return delay;
513     }
514    
515     /**
516     * @throws RejectedExecutionException {@inheritDoc}
517     * @throws NullPointerException {@inheritDoc}
518     */
519     public ScheduledFuture<?> schedule(Runnable command,
520     long delay,
521     TimeUnit unit) {
522     if (command == null || unit == null)
523     throw new NullPointerException();
524 jsr166 1.7 RunnableScheduledFuture<Void> t = decorateTask(command,
525 dl 1.1 new ScheduledFutureTask<Void>(command, null,
526     triggerTime(delay, unit)));
527     delayedExecute(t);
528     return t;
529     }
530    
531     /**
532     * @throws RejectedExecutionException {@inheritDoc}
533     * @throws NullPointerException {@inheritDoc}
534     */
535     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
536     long delay,
537     TimeUnit unit) {
538     if (callable == null || unit == null)
539     throw new NullPointerException();
540     RunnableScheduledFuture<V> t = decorateTask(callable,
541     new ScheduledFutureTask<V>(callable,
542     triggerTime(delay, unit)));
543     delayedExecute(t);
544     return t;
545     }
546    
547     /**
548     * @throws RejectedExecutionException {@inheritDoc}
549     * @throws NullPointerException {@inheritDoc}
550     * @throws IllegalArgumentException {@inheritDoc}
551     */
552     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
553     long initialDelay,
554     long period,
555     TimeUnit unit) {
556     if (command == null || unit == null)
557     throw new NullPointerException();
558     if (period <= 0)
559     throw new IllegalArgumentException();
560     ScheduledFutureTask<Void> sft =
561     new ScheduledFutureTask<Void>(command,
562     null,
563     triggerTime(initialDelay, unit),
564     unit.toNanos(period));
565     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
566     sft.outerTask = t;
567     delayedExecute(t);
568     return t;
569     }
570    
571     /**
572     * @throws RejectedExecutionException {@inheritDoc}
573     * @throws NullPointerException {@inheritDoc}
574     * @throws IllegalArgumentException {@inheritDoc}
575     */
576     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
577     long initialDelay,
578     long delay,
579     TimeUnit unit) {
580     if (command == null || unit == null)
581     throw new NullPointerException();
582     if (delay <= 0)
583     throw new IllegalArgumentException();
584     ScheduledFutureTask<Void> sft =
585     new ScheduledFutureTask<Void>(command,
586     null,
587     triggerTime(initialDelay, unit),
588     unit.toNanos(-delay));
589     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
590     sft.outerTask = t;
591     delayedExecute(t);
592     return t;
593     }
594    
595     /**
596     * Executes {@code command} with zero required delay.
597     * This has effect equivalent to
598     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
599     * Note that inspections of the queue and of the list returned by
600     * {@code shutdownNow} will access the zero-delayed
601     * {@link ScheduledFuture}, not the {@code command} itself.
602     *
603     * <p>A consequence of the use of {@code ScheduledFuture} objects is
604     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
605     * called with a null second {@code Throwable} argument, even if the
606     * {@code command} terminated abruptly. Instead, the {@code Throwable}
607     * thrown by such a task can be obtained via {@link Future#get}.
608     *
609     * @throws RejectedExecutionException at discretion of
610     * {@code RejectedExecutionHandler}, if the task
611     * cannot be accepted for execution because the
612     * executor has been shut down
613     * @throws NullPointerException {@inheritDoc}
614     */
615     public void execute(Runnable command) {
616     schedule(command, 0, NANOSECONDS);
617     }
618    
619     // Override AbstractExecutorService methods
620    
621     /**
622     * @throws RejectedExecutionException {@inheritDoc}
623     * @throws NullPointerException {@inheritDoc}
624     */
625     public Future<?> submit(Runnable task) {
626     return schedule(task, 0, NANOSECONDS);
627     }
628    
629     /**
630     * @throws RejectedExecutionException {@inheritDoc}
631     * @throws NullPointerException {@inheritDoc}
632     */
633     public <T> Future<T> submit(Runnable task, T result) {
634     return schedule(Executors.callable(task, result), 0, NANOSECONDS);
635     }
636    
637     /**
638     * @throws RejectedExecutionException {@inheritDoc}
639     * @throws NullPointerException {@inheritDoc}
640     */
641     public <T> Future<T> submit(Callable<T> task) {
642     return schedule(task, 0, NANOSECONDS);
643     }
644    
645     /**
646     * Sets the policy on whether to continue executing existing
647     * periodic tasks even when this executor has been {@code shutdown}.
648     * In this case, these tasks will only terminate upon
649     * {@code shutdownNow} or after setting the policy to
650     * {@code false} when already shutdown.
651     * This value is by default {@code false}.
652     *
653 jsr166 1.3 * @param value if {@code true}, continue after shutdown, else don't
654 dl 1.1 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
655     */
656     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
657     continueExistingPeriodicTasksAfterShutdown = value;
658     if (!value && isShutdown())
659     onShutdown();
660     }
661    
662     /**
663     * Gets the policy on whether to continue executing existing
664     * periodic tasks even when this executor has been {@code shutdown}.
665     * In this case, these tasks will only terminate upon
666     * {@code shutdownNow} or after setting the policy to
667     * {@code false} when already shutdown.
668     * This value is by default {@code false}.
669     *
670     * @return {@code true} if will continue after shutdown
671     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
672     */
673     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
674     return continueExistingPeriodicTasksAfterShutdown;
675     }
676    
677     /**
678     * Sets the policy on whether to execute existing delayed
679     * tasks even when this executor has been {@code shutdown}.
680     * In this case, these tasks will only terminate upon
681     * {@code shutdownNow}, or after setting the policy to
682     * {@code false} when already shutdown.
683     * This value is by default {@code true}.
684     *
685 jsr166 1.3 * @param value if {@code true}, execute after shutdown, else don't
686 dl 1.1 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
687     */
688     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
689     executeExistingDelayedTasksAfterShutdown = value;
690     if (!value && isShutdown())
691     onShutdown();
692     }
693    
694     /**
695     * Gets the policy on whether to execute existing delayed
696     * tasks even when this executor has been {@code shutdown}.
697     * In this case, these tasks will only terminate upon
698     * {@code shutdownNow}, or after setting the policy to
699     * {@code false} when already shutdown.
700     * This value is by default {@code true}.
701     *
702     * @return {@code true} if will execute after shutdown
703     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
704     */
705     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
706     return executeExistingDelayedTasksAfterShutdown;
707     }
708    
709     /**
710     * Sets the policy on whether cancelled tasks should be immediately
711     * removed from the work queue at time of cancellation. This value is
712     * by default {@code false}.
713     *
714     * @param value if {@code true}, remove on cancellation, else don't
715     * @see #getRemoveOnCancelPolicy
716     * @since 1.7
717     */
718     public void setRemoveOnCancelPolicy(boolean value) {
719     removeOnCancel = value;
720     }
721    
722     /**
723     * Gets the policy on whether cancelled tasks should be immediately
724     * removed from the work queue at time of cancellation. This value is
725     * by default {@code false}.
726     *
727     * @return {@code true} if cancelled tasks are immediately removed
728     * from the queue
729     * @see #setRemoveOnCancelPolicy
730     * @since 1.7
731     */
732     public boolean getRemoveOnCancelPolicy() {
733     return removeOnCancel;
734     }
735    
736     /**
737     * Initiates an orderly shutdown in which previously submitted
738     * tasks are executed, but no new tasks will be accepted.
739     * Invocation has no additional effect if already shut down.
740     *
741     * <p>This method does not wait for previously submitted tasks to
742     * complete execution. Use {@link #awaitTermination awaitTermination}
743     * to do that.
744     *
745     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
746     * has been set {@code false}, existing delayed tasks whose delays
747     * have not yet elapsed are cancelled. And unless the {@code
748     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
749     * {@code true}, future executions of existing periodic tasks will
750     * be cancelled.
751     *
752     * @throws SecurityException {@inheritDoc}
753     */
754     public void shutdown() {
755     super.shutdown();
756     }
757    
758     /**
759     * Attempts to stop all actively executing tasks, halts the
760     * processing of waiting tasks, and returns a list of the tasks
761     * that were awaiting execution.
762     *
763     * <p>This method does not wait for actively executing tasks to
764     * terminate. Use {@link #awaitTermination awaitTermination} to
765     * do that.
766     *
767     * <p>There are no guarantees beyond best-effort attempts to stop
768     * processing actively executing tasks. This implementation
769     * cancels tasks via {@link Thread#interrupt}, so any task that
770     * fails to respond to interrupts may never terminate.
771     *
772     * @return list of tasks that never commenced execution.
773 jsr166 1.7 * Each element of this list is a {@link ScheduledFuture}.
774     * For tasks submitted via one of the {@code schedule}
775     * methods, the element will be identical to the returned
776     * {@code ScheduledFuture}. For tasks submitted using
777     * {@link #execute}, the element will be a zero-delay {@code
778     * ScheduledFuture}.
779 dl 1.1 * @throws SecurityException {@inheritDoc}
780     */
781     public List<Runnable> shutdownNow() {
782     return super.shutdownNow();
783     }
784    
785     /**
786 jsr166 1.7 * Returns the task queue used by this executor.
787     * Each element of this list is a {@link ScheduledFuture}.
788     * For tasks submitted via one of the {@code schedule} methods, the
789     * element will be identical to the returned {@code ScheduledFuture}.
790     * For tasks submitted using {@link #execute}, the element will be a
791     * zero-delay {@code ScheduledFuture}.
792     *
793     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
794     * tasks in the order in which they will execute.
795 dl 1.1 *
796     * @return the task queue
797     */
798     public BlockingQueue<Runnable> getQueue() {
799     return super.getQueue();
800     }
801    
802     /**
803     * Specialized delay queue. To mesh with TPE declarations, this
804     * class must be declared as a BlockingQueue<Runnable> even though
805     * it can only hold RunnableScheduledFutures.
806     */
807     static class DelayedWorkQueue extends AbstractQueue<Runnable>
808     implements BlockingQueue<Runnable> {
809    
810     /*
811     * A DelayedWorkQueue is based on a heap-based data structure
812     * like those in DelayQueue and PriorityQueue, except that
813     * every ScheduledFutureTask also records its index into the
814     * heap array. This eliminates the need to find a task upon
815     * cancellation, greatly speeding up removal (down from O(n)
816     * to O(log n)), and reducing garbage retention that would
817     * otherwise occur by waiting for the element to rise to top
818     * before clearing. But because the queue may also hold
819     * RunnableScheduledFutures that are not ScheduledFutureTasks,
820     * we are not guaranteed to have such indices available, in
821     * which case we fall back to linear search. (We expect that
822     * most tasks will not be decorated, and that the faster cases
823     * will be much more common.)
824     *
825     * All heap operations must record index changes -- mainly
826     * within siftUp and siftDown. Upon removal, a task's
827     * heapIndex is set to -1. Note that ScheduledFutureTasks can
828     * appear at most once in the queue (this need not be true for
829     * other kinds of tasks or work queues), so are uniquely
830     * identified by heapIndex.
831     */
832    
833     private static final int INITIAL_CAPACITY = 16;
834     private RunnableScheduledFuture<?>[] queue =
835     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
836     private final ReentrantLock lock = new ReentrantLock();
837 jsr166 1.6 private int size;
838 dl 1.1
839     /**
840     * Thread designated to wait for the task at the head of the
841     * queue. This variant of the Leader-Follower pattern
842     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
843     * minimize unnecessary timed waiting. When a thread becomes
844     * the leader, it waits only for the next delay to elapse, but
845     * other threads await indefinitely. The leader thread must
846     * signal some other thread before returning from take() or
847     * poll(...), unless some other thread becomes leader in the
848     * interim. Whenever the head of the queue is replaced with a
849     * task with an earlier expiration time, the leader field is
850     * invalidated by being reset to null, and some waiting
851     * thread, but not necessarily the current leader, is
852     * signalled. So waiting threads must be prepared to acquire
853     * and lose leadership while waiting.
854     */
855 jsr166 1.6 private Thread leader;
856 dl 1.1
857     /**
858     * Condition signalled when a newer task becomes available at the
859     * head of the queue or a new thread may need to become leader.
860     */
861     private final Condition available = lock.newCondition();
862    
863     /**
864     * Sets f's heapIndex if it is a ScheduledFutureTask.
865     */
866     private void setIndex(RunnableScheduledFuture<?> f, int idx) {
867     if (f instanceof ScheduledFutureTask)
868     ((ScheduledFutureTask)f).heapIndex = idx;
869     }
870    
871     /**
872     * Sifts element added at bottom up to its heap-ordered spot.
873     * Call only when holding lock.
874     */
875     private void siftUp(int k, RunnableScheduledFuture<?> key) {
876     while (k > 0) {
877     int parent = (k - 1) >>> 1;
878     RunnableScheduledFuture<?> e = queue[parent];
879     if (key.compareTo(e) >= 0)
880     break;
881     queue[k] = e;
882     setIndex(e, k);
883     k = parent;
884     }
885     queue[k] = key;
886     setIndex(key, k);
887     }
888    
889     /**
890     * Sifts element added at top down to its heap-ordered spot.
891     * Call only when holding lock.
892     */
893     private void siftDown(int k, RunnableScheduledFuture<?> key) {
894     int half = size >>> 1;
895     while (k < half) {
896     int child = (k << 1) + 1;
897     RunnableScheduledFuture<?> c = queue[child];
898     int right = child + 1;
899     if (right < size && c.compareTo(queue[right]) > 0)
900     c = queue[child = right];
901     if (key.compareTo(c) <= 0)
902     break;
903     queue[k] = c;
904     setIndex(c, k);
905     k = child;
906     }
907     queue[k] = key;
908     setIndex(key, k);
909     }
910    
911     /**
912     * Resizes the heap array. Call only when holding lock.
913     */
914     private void grow() {
915     int oldCapacity = queue.length;
916     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
917     if (newCapacity < 0) // overflow
918     newCapacity = Integer.MAX_VALUE;
919     queue = Arrays.copyOf(queue, newCapacity);
920     }
921    
922     /**
923     * Finds index of given object, or -1 if absent.
924     */
925     private int indexOf(Object x) {
926     if (x != null) {
927     if (x instanceof ScheduledFutureTask) {
928     int i = ((ScheduledFutureTask) x).heapIndex;
929     // Sanity check; x could conceivably be a
930     // ScheduledFutureTask from some other pool.
931     if (i >= 0 && i < size && queue[i] == x)
932     return i;
933     } else {
934     for (int i = 0; i < size; i++)
935     if (x.equals(queue[i]))
936     return i;
937     }
938     }
939     return -1;
940     }
941    
942     public boolean contains(Object x) {
943     final ReentrantLock lock = this.lock;
944     lock.lock();
945     try {
946     return indexOf(x) != -1;
947     } finally {
948     lock.unlock();
949     }
950     }
951    
952     public boolean remove(Object x) {
953     final ReentrantLock lock = this.lock;
954     lock.lock();
955     try {
956     int i = indexOf(x);
957     if (i < 0)
958     return false;
959    
960     setIndex(queue[i], -1);
961     int s = --size;
962     RunnableScheduledFuture<?> replacement = queue[s];
963     queue[s] = null;
964     if (s != i) {
965     siftDown(i, replacement);
966     if (queue[i] == replacement)
967     siftUp(i, replacement);
968     }
969     return true;
970     } finally {
971     lock.unlock();
972     }
973     }
974    
975     public int size() {
976     final ReentrantLock lock = this.lock;
977     lock.lock();
978     try {
979     return size;
980     } finally {
981     lock.unlock();
982     }
983     }
984    
985     public boolean isEmpty() {
986     return size() == 0;
987     }
988    
989     public int remainingCapacity() {
990     return Integer.MAX_VALUE;
991     }
992    
993     public RunnableScheduledFuture<?> peek() {
994     final ReentrantLock lock = this.lock;
995     lock.lock();
996     try {
997     return queue[0];
998     } finally {
999     lock.unlock();
1000     }
1001     }
1002    
1003     public boolean offer(Runnable x) {
1004     if (x == null)
1005     throw new NullPointerException();
1006     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1007     final ReentrantLock lock = this.lock;
1008     lock.lock();
1009     try {
1010     int i = size;
1011     if (i >= queue.length)
1012     grow();
1013     size = i + 1;
1014     if (i == 0) {
1015     queue[0] = e;
1016     setIndex(e, 0);
1017     } else {
1018     siftUp(i, e);
1019     }
1020     if (queue[0] == e) {
1021     leader = null;
1022     available.signal();
1023     }
1024     } finally {
1025     lock.unlock();
1026     }
1027     return true;
1028     }
1029    
1030     public void put(Runnable e) {
1031     offer(e);
1032     }
1033    
1034     public boolean add(Runnable e) {
1035     return offer(e);
1036     }
1037    
1038     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1039     return offer(e);
1040     }
1041    
1042     /**
1043     * Performs common bookkeeping for poll and take: Replaces
1044     * first element with last and sifts it down. Call only when
1045     * holding lock.
1046     * @param f the task to remove and return
1047     */
1048     private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1049     int s = --size;
1050     RunnableScheduledFuture<?> x = queue[s];
1051     queue[s] = null;
1052     if (s != 0)
1053     siftDown(0, x);
1054     setIndex(f, -1);
1055     return f;
1056     }
1057    
1058     public RunnableScheduledFuture<?> poll() {
1059     final ReentrantLock lock = this.lock;
1060     lock.lock();
1061     try {
1062     RunnableScheduledFuture<?> first = queue[0];
1063     if (first == null || first.getDelay(NANOSECONDS) > 0)
1064     return null;
1065     else
1066     return finishPoll(first);
1067     } finally {
1068     lock.unlock();
1069     }
1070     }
1071    
1072     public RunnableScheduledFuture<?> take() throws InterruptedException {
1073     final ReentrantLock lock = this.lock;
1074     lock.lockInterruptibly();
1075     try {
1076     for (;;) {
1077     RunnableScheduledFuture<?> first = queue[0];
1078     if (first == null)
1079     available.await();
1080     else {
1081     long delay = first.getDelay(NANOSECONDS);
1082     if (delay <= 0)
1083     return finishPoll(first);
1084 jsr166 1.4 first = null; // don't retain ref while waiting
1085     if (leader != null)
1086 dl 1.1 available.await();
1087     else {
1088     Thread thisThread = Thread.currentThread();
1089     leader = thisThread;
1090     try {
1091     available.awaitNanos(delay);
1092     } finally {
1093     if (leader == thisThread)
1094     leader = null;
1095     }
1096     }
1097     }
1098     }
1099     } finally {
1100     if (leader == null && queue[0] != null)
1101     available.signal();
1102     lock.unlock();
1103     }
1104     }
1105    
1106     public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1107     throws InterruptedException {
1108     long nanos = unit.toNanos(timeout);
1109     final ReentrantLock lock = this.lock;
1110     lock.lockInterruptibly();
1111     try {
1112     for (;;) {
1113     RunnableScheduledFuture<?> first = queue[0];
1114     if (first == null) {
1115     if (nanos <= 0)
1116     return null;
1117     else
1118     nanos = available.awaitNanos(nanos);
1119     } else {
1120     long delay = first.getDelay(NANOSECONDS);
1121     if (delay <= 0)
1122     return finishPoll(first);
1123     if (nanos <= 0)
1124     return null;
1125 jsr166 1.4 first = null; // don't retain ref while waiting
1126 dl 1.1 if (nanos < delay || leader != null)
1127     nanos = available.awaitNanos(nanos);
1128     else {
1129     Thread thisThread = Thread.currentThread();
1130     leader = thisThread;
1131     try {
1132     long timeLeft = available.awaitNanos(delay);
1133     nanos -= delay - timeLeft;
1134     } finally {
1135     if (leader == thisThread)
1136     leader = null;
1137     }
1138     }
1139     }
1140     }
1141     } finally {
1142     if (leader == null && queue[0] != null)
1143     available.signal();
1144     lock.unlock();
1145     }
1146     }
1147    
1148     public void clear() {
1149     final ReentrantLock lock = this.lock;
1150     lock.lock();
1151     try {
1152     for (int i = 0; i < size; i++) {
1153     RunnableScheduledFuture<?> t = queue[i];
1154     if (t != null) {
1155     queue[i] = null;
1156     setIndex(t, -1);
1157     }
1158     }
1159     size = 0;
1160     } finally {
1161     lock.unlock();
1162     }
1163     }
1164    
1165     /**
1166     * Returns first element only if it is expired.
1167     * Used only by drainTo. Call only when holding lock.
1168     */
1169     private RunnableScheduledFuture<?> peekExpired() {
1170     // assert lock.isHeldByCurrentThread();
1171     RunnableScheduledFuture<?> first = queue[0];
1172     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1173     null : first;
1174     }
1175    
1176     public int drainTo(Collection<? super Runnable> c) {
1177     if (c == null)
1178     throw new NullPointerException();
1179     if (c == this)
1180     throw new IllegalArgumentException();
1181     final ReentrantLock lock = this.lock;
1182     lock.lock();
1183     try {
1184     RunnableScheduledFuture<?> first;
1185     int n = 0;
1186     while ((first = peekExpired()) != null) {
1187     c.add(first); // In this order, in case add() throws.
1188     finishPoll(first);
1189     ++n;
1190     }
1191     return n;
1192     } finally {
1193     lock.unlock();
1194     }
1195     }
1196    
1197     public int drainTo(Collection<? super Runnable> c, int maxElements) {
1198     if (c == null)
1199     throw new NullPointerException();
1200     if (c == this)
1201     throw new IllegalArgumentException();
1202     if (maxElements <= 0)
1203     return 0;
1204     final ReentrantLock lock = this.lock;
1205     lock.lock();
1206     try {
1207     RunnableScheduledFuture<?> first;
1208     int n = 0;
1209     while (n < maxElements && (first = peekExpired()) != null) {
1210     c.add(first); // In this order, in case add() throws.
1211     finishPoll(first);
1212     ++n;
1213     }
1214     return n;
1215     } finally {
1216     lock.unlock();
1217     }
1218     }
1219    
1220     public Object[] toArray() {
1221     final ReentrantLock lock = this.lock;
1222     lock.lock();
1223     try {
1224     return Arrays.copyOf(queue, size, Object[].class);
1225     } finally {
1226     lock.unlock();
1227     }
1228     }
1229    
1230     @SuppressWarnings("unchecked")
1231     public <T> T[] toArray(T[] a) {
1232     final ReentrantLock lock = this.lock;
1233     lock.lock();
1234     try {
1235     if (a.length < size)
1236     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1237     System.arraycopy(queue, 0, a, 0, size);
1238     if (a.length > size)
1239     a[size] = null;
1240     return a;
1241     } finally {
1242     lock.unlock();
1243     }
1244     }
1245    
1246     public Iterator<Runnable> iterator() {
1247     return new Itr(Arrays.copyOf(queue, size));
1248     }
1249    
1250     /**
1251     * Snapshot iterator that works off copy of underlying q array.
1252     */
1253     private class Itr implements Iterator<Runnable> {
1254 jsr166 1.7 final RunnableScheduledFuture<?>[] array;
1255 dl 1.1 int cursor = 0; // index of next element to return
1256     int lastRet = -1; // index of last element, or -1 if no such
1257    
1258 jsr166 1.7 Itr(RunnableScheduledFuture<?>[] array) {
1259 dl 1.1 this.array = array;
1260     }
1261    
1262     public boolean hasNext() {
1263     return cursor < array.length;
1264     }
1265    
1266     public Runnable next() {
1267     if (cursor >= array.length)
1268     throw new NoSuchElementException();
1269     lastRet = cursor;
1270     return array[cursor++];
1271     }
1272    
1273     public void remove() {
1274     if (lastRet < 0)
1275     throw new IllegalStateException();
1276     DelayedWorkQueue.this.remove(array[lastRet]);
1277     lastRet = -1;
1278     }
1279     }
1280     }
1281     }