ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.9
Committed: Thu Nov 5 16:45:06 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.8: +60 -51 lines
Log Message:
sync from main to fix tck failure

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8 jsr166 1.8
9 jsr166 1.9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 dl 1.1 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.9
12     import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 dl 1.1 import java.util.concurrent.atomic.AtomicLong;
19     import java.util.concurrent.locks.Condition;
20     import java.util.concurrent.locks.ReentrantLock;
21    
22     /**
23     * A {@link ThreadPoolExecutor} that can additionally schedule
24 jsr166 1.7 * commands to run after a given delay, or to execute periodically.
25     * This class is preferable to {@link java.util.Timer} when multiple
26     * worker threads are needed, or when the additional flexibility or
27     * capabilities of {@link ThreadPoolExecutor} (which this class
28     * extends) are required.
29 dl 1.1 *
30     * <p>Delayed tasks execute no sooner than they are enabled, but
31     * without any real-time guarantees about when, after they are
32     * enabled, they will commence. Tasks scheduled for exactly the same
33     * execution time are enabled in first-in-first-out (FIFO) order of
34     * submission.
35     *
36     * <p>When a submitted task is cancelled before it is run, execution
37 jsr166 1.7 * is suppressed. By default, such a cancelled task is not
38     * automatically removed from the work queue until its delay elapses.
39     * While this enables further inspection and monitoring, it may also
40     * cause unbounded retention of cancelled tasks. To avoid this, use
41     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42     * removed from the work queue at time of cancellation.
43 dl 1.1 *
44 jsr166 1.7 * <p>Successive executions of a periodic task scheduled via
45 jsr166 1.9 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47     * do not overlap. While different executions may be performed by
48     * different threads, the effects of prior executions
49     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 dl 1.1 * those of subsequent ones.
51     *
52     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53     * of the inherited tuning methods are not useful for it. In
54     * particular, because it acts as a fixed-sized pool using
55     * {@code corePoolSize} threads and an unbounded queue, adjustments
56     * to {@code maximumPoolSize} have no useful effect. Additionally, it
57     * is almost never a good idea to set {@code corePoolSize} to zero or
58     * use {@code allowCoreThreadTimeOut} because this may leave the pool
59     * without threads to handle tasks once they become eligible to run.
60     *
61     * <p><b>Extension notes:</b> This class overrides the
62 jsr166 1.4 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 dl 1.1 * {@link AbstractExecutorService#submit(Runnable) submit}
64     * methods to generate internal {@link ScheduledFuture} objects to
65     * control per-task delays and scheduling. To preserve
66     * functionality, any further overrides of these methods in
67     * subclasses must invoke superclass versions, which effectively
68     * disables additional task customization. However, this class
69     * provides alternative protected extension method
70     * {@code decorateTask} (one version each for {@code Runnable} and
71     * {@code Callable}) that can be used to customize the concrete task
72     * types used to execute commands entered via {@code execute},
73     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74     * and {@code scheduleWithFixedDelay}. By default, a
75     * {@code ScheduledThreadPoolExecutor} uses a task type extending
76     * {@link FutureTask}. However, this may be modified or replaced using
77     * subclasses of the form:
78     *
79 jsr166 1.9 * <pre> {@code
80 dl 1.1 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81     *
82     * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83     *
84     * protected <V> RunnableScheduledFuture<V> decorateTask(
85     * Runnable r, RunnableScheduledFuture<V> task) {
86     * return new CustomTask<V>(r, task);
87     * }
88     *
89     * protected <V> RunnableScheduledFuture<V> decorateTask(
90     * Callable<V> c, RunnableScheduledFuture<V> task) {
91     * return new CustomTask<V>(c, task);
92     * }
93     * // ... add constructors, etc.
94     * }}</pre>
95     *
96     * @since 1.5
97     * @author Doug Lea
98     */
99     public class ScheduledThreadPoolExecutor
100     extends ThreadPoolExecutor
101     implements ScheduledExecutorService {
102    
103     /*
104     * This class specializes ThreadPoolExecutor implementation by
105     *
106     * 1. Using a custom task type, ScheduledFutureTask for
107     * tasks, even those that don't require scheduling (i.e.,
108     * those submitted using ExecutorService execute, not
109     * ScheduledExecutorService methods) which are treated as
110     * delayed tasks with a delay of zero.
111     *
112     * 2. Using a custom queue (DelayedWorkQueue), a variant of
113     * unbounded DelayQueue. The lack of capacity constraint and
114     * the fact that corePoolSize and maximumPoolSize are
115     * effectively identical simplifies some execution mechanics
116     * (see delayedExecute) compared to ThreadPoolExecutor.
117     *
118     * 3. Supporting optional run-after-shutdown parameters, which
119     * leads to overrides of shutdown methods to remove and cancel
120     * tasks that should NOT be run after shutdown, as well as
121     * different recheck logic when task (re)submission overlaps
122     * with a shutdown.
123     *
124     * 4. Task decoration methods to allow interception and
125     * instrumentation, which are needed because subclasses cannot
126     * otherwise override submit methods to get this effect. These
127     * don't have any impact on pool control logic though.
128     */
129    
130     /**
131     * False if should cancel/suppress periodic tasks on shutdown.
132     */
133     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
134    
135     /**
136     * False if should cancel non-periodic tasks on shutdown.
137     */
138     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
139    
140     /**
141 jsr166 1.7 * True if ScheduledFutureTask.cancel should remove from queue.
142 dl 1.1 */
143 jsr166 1.9 volatile boolean removeOnCancel;
144 dl 1.1
145     /**
146     * Sequence number to break scheduling ties, and in turn to
147     * guarantee FIFO order among tied entries.
148     */
149     private static final AtomicLong sequencer = new AtomicLong();
150    
151     private class ScheduledFutureTask<V>
152     extends FutureTask<V> implements RunnableScheduledFuture<V> {
153    
154     /** Sequence number to break ties FIFO */
155     private final long sequenceNumber;
156    
157     /** The time the task is enabled to execute in nanoTime units */
158 jsr166 1.9 private volatile long time;
159 dl 1.1
160     /**
161 jsr166 1.7 * Period in nanoseconds for repeating tasks.
162     * A positive value indicates fixed-rate execution.
163     * A negative value indicates fixed-delay execution.
164     * A value of 0 indicates a non-repeating (one-shot) task.
165 dl 1.1 */
166     private final long period;
167    
168     /** The actual task to be re-enqueued by reExecutePeriodic */
169     RunnableScheduledFuture<V> outerTask = this;
170    
171     /**
172     * Index into delay queue, to support faster cancellation.
173     */
174     int heapIndex;
175    
176     /**
177     * Creates a one-shot action with given nanoTime-based trigger time.
178     */
179 jsr166 1.9 ScheduledFutureTask(Runnable r, V result, long triggerTime,
180     long sequenceNumber) {
181 dl 1.1 super(r, result);
182 jsr166 1.7 this.time = triggerTime;
183 dl 1.1 this.period = 0;
184 jsr166 1.9 this.sequenceNumber = sequenceNumber;
185 dl 1.1 }
186    
187     /**
188 jsr166 1.7 * Creates a periodic action with given nanoTime-based initial
189     * trigger time and period.
190 dl 1.1 */
191 jsr166 1.7 ScheduledFutureTask(Runnable r, V result, long triggerTime,
192 jsr166 1.9 long period, long sequenceNumber) {
193 dl 1.1 super(r, result);
194 jsr166 1.7 this.time = triggerTime;
195 dl 1.1 this.period = period;
196 jsr166 1.9 this.sequenceNumber = sequenceNumber;
197 dl 1.1 }
198    
199     /**
200     * Creates a one-shot action with given nanoTime-based trigger time.
201     */
202 jsr166 1.9 ScheduledFutureTask(Callable<V> callable, long triggerTime,
203     long sequenceNumber) {
204 dl 1.1 super(callable);
205 jsr166 1.7 this.time = triggerTime;
206 dl 1.1 this.period = 0;
207 jsr166 1.9 this.sequenceNumber = sequenceNumber;
208 dl 1.1 }
209    
210     public long getDelay(TimeUnit unit) {
211 jsr166 1.9 return unit.convert(time - System.nanoTime(), NANOSECONDS);
212 dl 1.1 }
213    
214     public int compareTo(Delayed other) {
215     if (other == this) // compare zero if same object
216     return 0;
217     if (other instanceof ScheduledFutureTask) {
218     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
219     long diff = time - x.time;
220     if (diff < 0)
221     return -1;
222     else if (diff > 0)
223     return 1;
224     else if (sequenceNumber < x.sequenceNumber)
225     return -1;
226     else
227     return 1;
228     }
229     long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
230     return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
231     }
232    
233     /**
234 jsr166 1.4 * Returns {@code true} if this is a periodic (not a one-shot) action.
235 dl 1.1 *
236 jsr166 1.4 * @return {@code true} if periodic
237 dl 1.1 */
238     public boolean isPeriodic() {
239     return period != 0;
240     }
241    
242     /**
243     * Sets the next time to run for a periodic task.
244     */
245     private void setNextRunTime() {
246     long p = period;
247     if (p > 0)
248     time += p;
249     else
250     time = triggerTime(-p);
251     }
252    
253     public boolean cancel(boolean mayInterruptIfRunning) {
254     boolean cancelled = super.cancel(mayInterruptIfRunning);
255     if (cancelled && removeOnCancel && heapIndex >= 0)
256     remove(this);
257     return cancelled;
258     }
259    
260     /**
261     * Overrides FutureTask version so as to reset/requeue if periodic.
262     */
263     public void run() {
264     boolean periodic = isPeriodic();
265     if (!canRunInCurrentRunState(periodic))
266     cancel(false);
267     else if (!periodic)
268 jsr166 1.9 super.run();
269     else if (super.runAndReset()) {
270 dl 1.1 setNextRunTime();
271     reExecutePeriodic(outerTask);
272     }
273     }
274     }
275    
276     /**
277     * Returns true if can run a task given current run state
278     * and run-after-shutdown parameters.
279     *
280     * @param periodic true if this task periodic, false if delayed
281     */
282     boolean canRunInCurrentRunState(boolean periodic) {
283     return isRunningOrShutdown(periodic ?
284     continueExistingPeriodicTasksAfterShutdown :
285     executeExistingDelayedTasksAfterShutdown);
286     }
287    
288     /**
289     * Main execution method for delayed or periodic tasks. If pool
290     * is shut down, rejects the task. Otherwise adds task to queue
291     * and starts a thread, if necessary, to run it. (We cannot
292     * prestart the thread to run the task because the task (probably)
293 jsr166 1.2 * shouldn't be run yet.) If the pool is shut down while the task
294 dl 1.1 * is being added, cancel and remove it if required by state and
295     * run-after-shutdown parameters.
296     *
297     * @param task the task
298     */
299     private void delayedExecute(RunnableScheduledFuture<?> task) {
300     if (isShutdown())
301     reject(task);
302     else {
303     super.getQueue().add(task);
304     if (isShutdown() &&
305     !canRunInCurrentRunState(task.isPeriodic()) &&
306     remove(task))
307     task.cancel(false);
308     else
309     ensurePrestart();
310     }
311     }
312    
313     /**
314     * Requeues a periodic task unless current run state precludes it.
315     * Same idea as delayedExecute except drops task rather than rejecting.
316     *
317     * @param task the task
318     */
319     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
320     if (canRunInCurrentRunState(true)) {
321     super.getQueue().add(task);
322     if (!canRunInCurrentRunState(true) && remove(task))
323     task.cancel(false);
324     else
325     ensurePrestart();
326     }
327     }
328    
329     /**
330     * Cancels and clears the queue of all tasks that should not be run
331     * due to shutdown policy. Invoked within super.shutdown.
332     */
333     @Override void onShutdown() {
334     BlockingQueue<Runnable> q = super.getQueue();
335     boolean keepDelayed =
336     getExecuteExistingDelayedTasksAfterShutdownPolicy();
337     boolean keepPeriodic =
338     getContinueExistingPeriodicTasksAfterShutdownPolicy();
339     if (!keepDelayed && !keepPeriodic) {
340     for (Object e : q.toArray())
341     if (e instanceof RunnableScheduledFuture<?>)
342     ((RunnableScheduledFuture<?>) e).cancel(false);
343     q.clear();
344     }
345     else {
346     // Traverse snapshot to avoid iterator exceptions
347     for (Object e : q.toArray()) {
348     if (e instanceof RunnableScheduledFuture) {
349     RunnableScheduledFuture<?> t =
350     (RunnableScheduledFuture<?>)e;
351     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
352     t.isCancelled()) { // also remove if already cancelled
353     if (q.remove(t))
354     t.cancel(false);
355     }
356     }
357     }
358     }
359     tryTerminate();
360     }
361    
362     /**
363     * Modifies or replaces the task used to execute a runnable.
364     * This method can be used to override the concrete
365     * class used for managing internal tasks.
366     * The default implementation simply returns the given task.
367     *
368     * @param runnable the submitted Runnable
369     * @param task the task created to execute the runnable
370 jsr166 1.5 * @param <V> the type of the task's result
371 dl 1.1 * @return a task that can execute the runnable
372     * @since 1.6
373     */
374     protected <V> RunnableScheduledFuture<V> decorateTask(
375     Runnable runnable, RunnableScheduledFuture<V> task) {
376     return task;
377     }
378    
379     /**
380     * Modifies or replaces the task used to execute a callable.
381     * This method can be used to override the concrete
382     * class used for managing internal tasks.
383     * The default implementation simply returns the given task.
384     *
385     * @param callable the submitted Callable
386     * @param task the task created to execute the callable
387 jsr166 1.5 * @param <V> the type of the task's result
388 dl 1.1 * @return a task that can execute the callable
389     * @since 1.6
390     */
391     protected <V> RunnableScheduledFuture<V> decorateTask(
392     Callable<V> callable, RunnableScheduledFuture<V> task) {
393     return task;
394     }
395    
396     /**
397 jsr166 1.7 * The default keep-alive time for pool threads.
398     *
399     * Normally, this value is unused because all pool threads will be
400     * core threads, but if a user creates a pool with a corePoolSize
401     * of zero (against our advice), we keep a thread alive as long as
402     * there are queued tasks. If the keep alive time is zero (the
403     * historic value), we end up hot-spinning in getTask, wasting a
404     * CPU. But on the other hand, if we set the value too high, and
405     * users create a one-shot pool which they don't cleanly shutdown,
406     * the pool's non-daemon threads will prevent JVM termination. A
407     * small but non-zero value (relative to a JVM's lifetime) seems
408     * best.
409     */
410     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
411    
412     /**
413 dl 1.1 * Creates a new {@code ScheduledThreadPoolExecutor} with the
414     * given core pool size.
415     *
416     * @param corePoolSize the number of threads to keep in the pool, even
417     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
418     * @throws IllegalArgumentException if {@code corePoolSize < 0}
419     */
420     public ScheduledThreadPoolExecutor(int corePoolSize) {
421 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
422     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
423 dl 1.1 new DelayedWorkQueue());
424     }
425    
426     /**
427     * Creates a new {@code ScheduledThreadPoolExecutor} with the
428     * given initial parameters.
429     *
430     * @param corePoolSize the number of threads to keep in the pool, even
431     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
432     * @param threadFactory the factory to use when the executor
433     * creates a new thread
434     * @throws IllegalArgumentException if {@code corePoolSize < 0}
435     * @throws NullPointerException if {@code threadFactory} is null
436     */
437     public ScheduledThreadPoolExecutor(int corePoolSize,
438     ThreadFactory threadFactory) {
439 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
440     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
441 dl 1.1 new DelayedWorkQueue(), threadFactory);
442     }
443    
444     /**
445 jsr166 1.7 * Creates a new {@code ScheduledThreadPoolExecutor} with the
446     * given initial parameters.
447 dl 1.1 *
448     * @param corePoolSize the number of threads to keep in the pool, even
449     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
450     * @param handler the handler to use when execution is blocked
451     * because the thread bounds and queue capacities are reached
452     * @throws IllegalArgumentException if {@code corePoolSize < 0}
453     * @throws NullPointerException if {@code handler} is null
454     */
455     public ScheduledThreadPoolExecutor(int corePoolSize,
456     RejectedExecutionHandler handler) {
457 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
458     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
459 dl 1.1 new DelayedWorkQueue(), handler);
460     }
461    
462     /**
463 jsr166 1.7 * Creates a new {@code ScheduledThreadPoolExecutor} with the
464     * given initial parameters.
465 dl 1.1 *
466     * @param corePoolSize the number of threads to keep in the pool, even
467     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
468     * @param threadFactory the factory to use when the executor
469     * creates a new thread
470     * @param handler the handler to use when execution is blocked
471     * because the thread bounds and queue capacities are reached
472     * @throws IllegalArgumentException if {@code corePoolSize < 0}
473     * @throws NullPointerException if {@code threadFactory} or
474     * {@code handler} is null
475     */
476     public ScheduledThreadPoolExecutor(int corePoolSize,
477     ThreadFactory threadFactory,
478     RejectedExecutionHandler handler) {
479 jsr166 1.7 super(corePoolSize, Integer.MAX_VALUE,
480     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
481 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
482     }
483    
484     /**
485 jsr166 1.7 * Returns the nanoTime-based trigger time of a delayed action.
486 dl 1.1 */
487     private long triggerTime(long delay, TimeUnit unit) {
488     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
489     }
490    
491     /**
492 jsr166 1.7 * Returns the nanoTime-based trigger time of a delayed action.
493 dl 1.1 */
494     long triggerTime(long delay) {
495 jsr166 1.9 return System.nanoTime() +
496 dl 1.1 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
497     }
498    
499     /**
500     * Constrains the values of all delays in the queue to be within
501     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
502     * This may occur if a task is eligible to be dequeued, but has
503     * not yet been, while some other task is added with a delay of
504     * Long.MAX_VALUE.
505     */
506     private long overflowFree(long delay) {
507     Delayed head = (Delayed) super.getQueue().peek();
508     if (head != null) {
509     long headDelay = head.getDelay(NANOSECONDS);
510     if (headDelay < 0 && (delay - headDelay < 0))
511     delay = Long.MAX_VALUE + headDelay;
512     }
513     return delay;
514     }
515    
516     /**
517     * @throws RejectedExecutionException {@inheritDoc}
518     * @throws NullPointerException {@inheritDoc}
519     */
520     public ScheduledFuture<?> schedule(Runnable command,
521     long delay,
522     TimeUnit unit) {
523     if (command == null || unit == null)
524     throw new NullPointerException();
525 jsr166 1.7 RunnableScheduledFuture<Void> t = decorateTask(command,
526 dl 1.1 new ScheduledFutureTask<Void>(command, null,
527 jsr166 1.9 triggerTime(delay, unit),
528     sequencer.getAndIncrement()));
529 dl 1.1 delayedExecute(t);
530     return t;
531     }
532    
533     /**
534     * @throws RejectedExecutionException {@inheritDoc}
535     * @throws NullPointerException {@inheritDoc}
536     */
537     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
538     long delay,
539     TimeUnit unit) {
540     if (callable == null || unit == null)
541     throw new NullPointerException();
542     RunnableScheduledFuture<V> t = decorateTask(callable,
543     new ScheduledFutureTask<V>(callable,
544 jsr166 1.9 triggerTime(delay, unit),
545     sequencer.getAndIncrement()));
546 dl 1.1 delayedExecute(t);
547     return t;
548     }
549    
550     /**
551     * @throws RejectedExecutionException {@inheritDoc}
552     * @throws NullPointerException {@inheritDoc}
553     * @throws IllegalArgumentException {@inheritDoc}
554     */
555     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
556     long initialDelay,
557     long period,
558     TimeUnit unit) {
559     if (command == null || unit == null)
560     throw new NullPointerException();
561 jsr166 1.9 if (period <= 0L)
562 dl 1.1 throw new IllegalArgumentException();
563     ScheduledFutureTask<Void> sft =
564     new ScheduledFutureTask<Void>(command,
565     null,
566     triggerTime(initialDelay, unit),
567 jsr166 1.9 unit.toNanos(period),
568     sequencer.getAndIncrement());
569 dl 1.1 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
570     sft.outerTask = t;
571     delayedExecute(t);
572     return t;
573     }
574    
575     /**
576     * @throws RejectedExecutionException {@inheritDoc}
577     * @throws NullPointerException {@inheritDoc}
578     * @throws IllegalArgumentException {@inheritDoc}
579     */
580     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
581     long initialDelay,
582     long delay,
583     TimeUnit unit) {
584     if (command == null || unit == null)
585     throw new NullPointerException();
586 jsr166 1.9 if (delay <= 0L)
587 dl 1.1 throw new IllegalArgumentException();
588     ScheduledFutureTask<Void> sft =
589     new ScheduledFutureTask<Void>(command,
590     null,
591     triggerTime(initialDelay, unit),
592 jsr166 1.9 -unit.toNanos(delay),
593     sequencer.getAndIncrement());
594 dl 1.1 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
595     sft.outerTask = t;
596     delayedExecute(t);
597     return t;
598     }
599    
600     /**
601     * Executes {@code command} with zero required delay.
602     * This has effect equivalent to
603     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
604     * Note that inspections of the queue and of the list returned by
605     * {@code shutdownNow} will access the zero-delayed
606     * {@link ScheduledFuture}, not the {@code command} itself.
607     *
608     * <p>A consequence of the use of {@code ScheduledFuture} objects is
609     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
610     * called with a null second {@code Throwable} argument, even if the
611     * {@code command} terminated abruptly. Instead, the {@code Throwable}
612     * thrown by such a task can be obtained via {@link Future#get}.
613     *
614     * @throws RejectedExecutionException at discretion of
615     * {@code RejectedExecutionHandler}, if the task
616     * cannot be accepted for execution because the
617     * executor has been shut down
618     * @throws NullPointerException {@inheritDoc}
619     */
620     public void execute(Runnable command) {
621     schedule(command, 0, NANOSECONDS);
622     }
623    
624     // Override AbstractExecutorService methods
625    
626     /**
627     * @throws RejectedExecutionException {@inheritDoc}
628     * @throws NullPointerException {@inheritDoc}
629     */
630     public Future<?> submit(Runnable task) {
631     return schedule(task, 0, NANOSECONDS);
632     }
633    
634     /**
635     * @throws RejectedExecutionException {@inheritDoc}
636     * @throws NullPointerException {@inheritDoc}
637     */
638     public <T> Future<T> submit(Runnable task, T result) {
639     return schedule(Executors.callable(task, result), 0, NANOSECONDS);
640     }
641    
642     /**
643     * @throws RejectedExecutionException {@inheritDoc}
644     * @throws NullPointerException {@inheritDoc}
645     */
646     public <T> Future<T> submit(Callable<T> task) {
647     return schedule(task, 0, NANOSECONDS);
648     }
649    
650     /**
651     * Sets the policy on whether to continue executing existing
652     * periodic tasks even when this executor has been {@code shutdown}.
653     * In this case, these tasks will only terminate upon
654     * {@code shutdownNow} or after setting the policy to
655     * {@code false} when already shutdown.
656     * This value is by default {@code false}.
657     *
658 jsr166 1.3 * @param value if {@code true}, continue after shutdown, else don't
659 dl 1.1 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
660     */
661     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
662     continueExistingPeriodicTasksAfterShutdown = value;
663     if (!value && isShutdown())
664     onShutdown();
665     }
666    
667     /**
668     * Gets the policy on whether to continue executing existing
669     * periodic tasks even when this executor has been {@code shutdown}.
670     * In this case, these tasks will only terminate upon
671     * {@code shutdownNow} or after setting the policy to
672     * {@code false} when already shutdown.
673     * This value is by default {@code false}.
674     *
675     * @return {@code true} if will continue after shutdown
676     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
677     */
678     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
679     return continueExistingPeriodicTasksAfterShutdown;
680     }
681    
682     /**
683     * Sets the policy on whether to execute existing delayed
684     * tasks even when this executor has been {@code shutdown}.
685     * In this case, these tasks will only terminate upon
686     * {@code shutdownNow}, or after setting the policy to
687     * {@code false} when already shutdown.
688     * This value is by default {@code true}.
689     *
690 jsr166 1.3 * @param value if {@code true}, execute after shutdown, else don't
691 dl 1.1 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
692     */
693     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
694     executeExistingDelayedTasksAfterShutdown = value;
695     if (!value && isShutdown())
696     onShutdown();
697     }
698    
699     /**
700     * Gets the policy on whether to execute existing delayed
701     * tasks even when this executor has been {@code shutdown}.
702     * In this case, these tasks will only terminate upon
703     * {@code shutdownNow}, or after setting the policy to
704     * {@code false} when already shutdown.
705     * This value is by default {@code true}.
706     *
707     * @return {@code true} if will execute after shutdown
708     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
709     */
710     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
711     return executeExistingDelayedTasksAfterShutdown;
712     }
713    
714     /**
715     * Sets the policy on whether cancelled tasks should be immediately
716     * removed from the work queue at time of cancellation. This value is
717     * by default {@code false}.
718     *
719     * @param value if {@code true}, remove on cancellation, else don't
720     * @see #getRemoveOnCancelPolicy
721     * @since 1.7
722     */
723     public void setRemoveOnCancelPolicy(boolean value) {
724     removeOnCancel = value;
725     }
726    
727     /**
728     * Gets the policy on whether cancelled tasks should be immediately
729     * removed from the work queue at time of cancellation. This value is
730     * by default {@code false}.
731     *
732     * @return {@code true} if cancelled tasks are immediately removed
733     * from the queue
734     * @see #setRemoveOnCancelPolicy
735     * @since 1.7
736     */
737     public boolean getRemoveOnCancelPolicy() {
738     return removeOnCancel;
739     }
740    
741     /**
742     * Initiates an orderly shutdown in which previously submitted
743     * tasks are executed, but no new tasks will be accepted.
744     * Invocation has no additional effect if already shut down.
745     *
746     * <p>This method does not wait for previously submitted tasks to
747     * complete execution. Use {@link #awaitTermination awaitTermination}
748     * to do that.
749     *
750     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
751     * has been set {@code false}, existing delayed tasks whose delays
752     * have not yet elapsed are cancelled. And unless the {@code
753     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
754     * {@code true}, future executions of existing periodic tasks will
755     * be cancelled.
756     *
757     * @throws SecurityException {@inheritDoc}
758     */
759     public void shutdown() {
760     super.shutdown();
761     }
762    
763     /**
764     * Attempts to stop all actively executing tasks, halts the
765     * processing of waiting tasks, and returns a list of the tasks
766 jsr166 1.9 * that were awaiting execution. These tasks are drained (removed)
767     * from the task queue upon return from this method.
768 dl 1.1 *
769     * <p>This method does not wait for actively executing tasks to
770     * terminate. Use {@link #awaitTermination awaitTermination} to
771     * do that.
772     *
773     * <p>There are no guarantees beyond best-effort attempts to stop
774     * processing actively executing tasks. This implementation
775 jsr166 1.9 * interrupts tasks via {@link Thread#interrupt}; any task that
776 dl 1.1 * fails to respond to interrupts may never terminate.
777     *
778     * @return list of tasks that never commenced execution.
779 jsr166 1.7 * Each element of this list is a {@link ScheduledFuture}.
780     * For tasks submitted via one of the {@code schedule}
781     * methods, the element will be identical to the returned
782     * {@code ScheduledFuture}. For tasks submitted using
783 jsr166 1.9 * {@link #execute execute}, the element will be a
784     * zero-delay {@code ScheduledFuture}.
785 dl 1.1 * @throws SecurityException {@inheritDoc}
786     */
787     public List<Runnable> shutdownNow() {
788     return super.shutdownNow();
789     }
790    
791     /**
792 jsr166 1.9 * Returns the task queue used by this executor. Access to the
793     * task queue is intended primarily for debugging and monitoring.
794     * This queue may be in active use. Retrieving the task queue
795     * does not prevent queued tasks from executing.
796     *
797     * <p>Each element of this queue is a {@link ScheduledFuture}.
798 jsr166 1.7 * For tasks submitted via one of the {@code schedule} methods, the
799     * element will be identical to the returned {@code ScheduledFuture}.
800 jsr166 1.9 * For tasks submitted using {@link #execute execute}, the element
801     * will be a zero-delay {@code ScheduledFuture}.
802 jsr166 1.7 *
803     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
804     * tasks in the order in which they will execute.
805 dl 1.1 *
806     * @return the task queue
807     */
808     public BlockingQueue<Runnable> getQueue() {
809     return super.getQueue();
810     }
811    
812     /**
813     * Specialized delay queue. To mesh with TPE declarations, this
814     * class must be declared as a BlockingQueue<Runnable> even though
815     * it can only hold RunnableScheduledFutures.
816     */
817     static class DelayedWorkQueue extends AbstractQueue<Runnable>
818     implements BlockingQueue<Runnable> {
819    
820     /*
821     * A DelayedWorkQueue is based on a heap-based data structure
822     * like those in DelayQueue and PriorityQueue, except that
823     * every ScheduledFutureTask also records its index into the
824     * heap array. This eliminates the need to find a task upon
825     * cancellation, greatly speeding up removal (down from O(n)
826     * to O(log n)), and reducing garbage retention that would
827     * otherwise occur by waiting for the element to rise to top
828     * before clearing. But because the queue may also hold
829     * RunnableScheduledFutures that are not ScheduledFutureTasks,
830     * we are not guaranteed to have such indices available, in
831     * which case we fall back to linear search. (We expect that
832     * most tasks will not be decorated, and that the faster cases
833     * will be much more common.)
834     *
835     * All heap operations must record index changes -- mainly
836     * within siftUp and siftDown. Upon removal, a task's
837     * heapIndex is set to -1. Note that ScheduledFutureTasks can
838     * appear at most once in the queue (this need not be true for
839     * other kinds of tasks or work queues), so are uniquely
840     * identified by heapIndex.
841     */
842    
843     private static final int INITIAL_CAPACITY = 16;
844     private RunnableScheduledFuture<?>[] queue =
845     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
846     private final ReentrantLock lock = new ReentrantLock();
847 jsr166 1.6 private int size;
848 dl 1.1
849     /**
850     * Thread designated to wait for the task at the head of the
851     * queue. This variant of the Leader-Follower pattern
852     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
853     * minimize unnecessary timed waiting. When a thread becomes
854     * the leader, it waits only for the next delay to elapse, but
855     * other threads await indefinitely. The leader thread must
856     * signal some other thread before returning from take() or
857     * poll(...), unless some other thread becomes leader in the
858     * interim. Whenever the head of the queue is replaced with a
859     * task with an earlier expiration time, the leader field is
860     * invalidated by being reset to null, and some waiting
861     * thread, but not necessarily the current leader, is
862     * signalled. So waiting threads must be prepared to acquire
863     * and lose leadership while waiting.
864     */
865 jsr166 1.6 private Thread leader;
866 dl 1.1
867     /**
868     * Condition signalled when a newer task becomes available at the
869     * head of the queue or a new thread may need to become leader.
870     */
871     private final Condition available = lock.newCondition();
872    
873     /**
874     * Sets f's heapIndex if it is a ScheduledFutureTask.
875     */
876     private void setIndex(RunnableScheduledFuture<?> f, int idx) {
877     if (f instanceof ScheduledFutureTask)
878     ((ScheduledFutureTask)f).heapIndex = idx;
879     }
880    
881     /**
882     * Sifts element added at bottom up to its heap-ordered spot.
883     * Call only when holding lock.
884     */
885     private void siftUp(int k, RunnableScheduledFuture<?> key) {
886     while (k > 0) {
887     int parent = (k - 1) >>> 1;
888     RunnableScheduledFuture<?> e = queue[parent];
889     if (key.compareTo(e) >= 0)
890     break;
891     queue[k] = e;
892     setIndex(e, k);
893     k = parent;
894     }
895     queue[k] = key;
896     setIndex(key, k);
897     }
898    
899     /**
900     * Sifts element added at top down to its heap-ordered spot.
901     * Call only when holding lock.
902     */
903     private void siftDown(int k, RunnableScheduledFuture<?> key) {
904     int half = size >>> 1;
905     while (k < half) {
906     int child = (k << 1) + 1;
907     RunnableScheduledFuture<?> c = queue[child];
908     int right = child + 1;
909     if (right < size && c.compareTo(queue[right]) > 0)
910     c = queue[child = right];
911     if (key.compareTo(c) <= 0)
912     break;
913     queue[k] = c;
914     setIndex(c, k);
915     k = child;
916     }
917     queue[k] = key;
918     setIndex(key, k);
919     }
920    
921     /**
922     * Resizes the heap array. Call only when holding lock.
923     */
924     private void grow() {
925     int oldCapacity = queue.length;
926     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
927     if (newCapacity < 0) // overflow
928     newCapacity = Integer.MAX_VALUE;
929     queue = Arrays.copyOf(queue, newCapacity);
930     }
931    
932     /**
933     * Finds index of given object, or -1 if absent.
934     */
935     private int indexOf(Object x) {
936     if (x != null) {
937     if (x instanceof ScheduledFutureTask) {
938     int i = ((ScheduledFutureTask) x).heapIndex;
939     // Sanity check; x could conceivably be a
940     // ScheduledFutureTask from some other pool.
941     if (i >= 0 && i < size && queue[i] == x)
942     return i;
943     } else {
944     for (int i = 0; i < size; i++)
945     if (x.equals(queue[i]))
946     return i;
947     }
948     }
949     return -1;
950     }
951    
952     public boolean contains(Object x) {
953     final ReentrantLock lock = this.lock;
954     lock.lock();
955     try {
956     return indexOf(x) != -1;
957     } finally {
958     lock.unlock();
959     }
960     }
961    
962     public boolean remove(Object x) {
963     final ReentrantLock lock = this.lock;
964     lock.lock();
965     try {
966     int i = indexOf(x);
967     if (i < 0)
968     return false;
969    
970     setIndex(queue[i], -1);
971     int s = --size;
972     RunnableScheduledFuture<?> replacement = queue[s];
973     queue[s] = null;
974     if (s != i) {
975     siftDown(i, replacement);
976     if (queue[i] == replacement)
977     siftUp(i, replacement);
978     }
979     return true;
980     } finally {
981     lock.unlock();
982     }
983     }
984    
985     public int size() {
986     final ReentrantLock lock = this.lock;
987     lock.lock();
988     try {
989     return size;
990     } finally {
991     lock.unlock();
992     }
993     }
994    
995     public boolean isEmpty() {
996     return size() == 0;
997     }
998    
999     public int remainingCapacity() {
1000     return Integer.MAX_VALUE;
1001     }
1002    
1003     public RunnableScheduledFuture<?> peek() {
1004     final ReentrantLock lock = this.lock;
1005     lock.lock();
1006     try {
1007     return queue[0];
1008     } finally {
1009     lock.unlock();
1010     }
1011     }
1012    
1013     public boolean offer(Runnable x) {
1014     if (x == null)
1015     throw new NullPointerException();
1016     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1017     final ReentrantLock lock = this.lock;
1018     lock.lock();
1019     try {
1020     int i = size;
1021     if (i >= queue.length)
1022     grow();
1023     size = i + 1;
1024     if (i == 0) {
1025     queue[0] = e;
1026     setIndex(e, 0);
1027     } else {
1028     siftUp(i, e);
1029     }
1030     if (queue[0] == e) {
1031     leader = null;
1032     available.signal();
1033     }
1034     } finally {
1035     lock.unlock();
1036     }
1037     return true;
1038     }
1039    
1040     public void put(Runnable e) {
1041     offer(e);
1042     }
1043    
1044     public boolean add(Runnable e) {
1045     return offer(e);
1046     }
1047    
1048     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1049     return offer(e);
1050     }
1051    
1052     /**
1053     * Performs common bookkeeping for poll and take: Replaces
1054     * first element with last and sifts it down. Call only when
1055     * holding lock.
1056     * @param f the task to remove and return
1057     */
1058     private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1059     int s = --size;
1060     RunnableScheduledFuture<?> x = queue[s];
1061     queue[s] = null;
1062     if (s != 0)
1063     siftDown(0, x);
1064     setIndex(f, -1);
1065     return f;
1066     }
1067    
1068     public RunnableScheduledFuture<?> poll() {
1069     final ReentrantLock lock = this.lock;
1070     lock.lock();
1071     try {
1072     RunnableScheduledFuture<?> first = queue[0];
1073 jsr166 1.9 return (first == null || first.getDelay(NANOSECONDS) > 0)
1074     ? null
1075     : finishPoll(first);
1076 dl 1.1 } finally {
1077     lock.unlock();
1078     }
1079     }
1080    
1081     public RunnableScheduledFuture<?> take() throws InterruptedException {
1082     final ReentrantLock lock = this.lock;
1083     lock.lockInterruptibly();
1084     try {
1085     for (;;) {
1086     RunnableScheduledFuture<?> first = queue[0];
1087     if (first == null)
1088     available.await();
1089     else {
1090     long delay = first.getDelay(NANOSECONDS);
1091 jsr166 1.9 if (delay <= 0L)
1092 dl 1.1 return finishPoll(first);
1093 jsr166 1.4 first = null; // don't retain ref while waiting
1094     if (leader != null)
1095 dl 1.1 available.await();
1096     else {
1097     Thread thisThread = Thread.currentThread();
1098     leader = thisThread;
1099     try {
1100     available.awaitNanos(delay);
1101     } finally {
1102     if (leader == thisThread)
1103     leader = null;
1104     }
1105     }
1106     }
1107     }
1108     } finally {
1109     if (leader == null && queue[0] != null)
1110     available.signal();
1111     lock.unlock();
1112     }
1113     }
1114    
1115     public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1116     throws InterruptedException {
1117     long nanos = unit.toNanos(timeout);
1118     final ReentrantLock lock = this.lock;
1119     lock.lockInterruptibly();
1120     try {
1121     for (;;) {
1122     RunnableScheduledFuture<?> first = queue[0];
1123     if (first == null) {
1124 jsr166 1.9 if (nanos <= 0L)
1125 dl 1.1 return null;
1126     else
1127     nanos = available.awaitNanos(nanos);
1128     } else {
1129     long delay = first.getDelay(NANOSECONDS);
1130 jsr166 1.9 if (delay <= 0L)
1131 dl 1.1 return finishPoll(first);
1132 jsr166 1.9 if (nanos <= 0L)
1133 dl 1.1 return null;
1134 jsr166 1.4 first = null; // don't retain ref while waiting
1135 dl 1.1 if (nanos < delay || leader != null)
1136     nanos = available.awaitNanos(nanos);
1137     else {
1138     Thread thisThread = Thread.currentThread();
1139     leader = thisThread;
1140     try {
1141     long timeLeft = available.awaitNanos(delay);
1142     nanos -= delay - timeLeft;
1143     } finally {
1144     if (leader == thisThread)
1145     leader = null;
1146     }
1147     }
1148     }
1149     }
1150     } finally {
1151     if (leader == null && queue[0] != null)
1152     available.signal();
1153     lock.unlock();
1154     }
1155     }
1156    
1157     public void clear() {
1158     final ReentrantLock lock = this.lock;
1159     lock.lock();
1160     try {
1161     for (int i = 0; i < size; i++) {
1162     RunnableScheduledFuture<?> t = queue[i];
1163     if (t != null) {
1164     queue[i] = null;
1165     setIndex(t, -1);
1166     }
1167     }
1168     size = 0;
1169     } finally {
1170     lock.unlock();
1171     }
1172     }
1173    
1174     /**
1175     * Returns first element only if it is expired.
1176     * Used only by drainTo. Call only when holding lock.
1177     */
1178     private RunnableScheduledFuture<?> peekExpired() {
1179     // assert lock.isHeldByCurrentThread();
1180     RunnableScheduledFuture<?> first = queue[0];
1181     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1182     null : first;
1183     }
1184    
1185     public int drainTo(Collection<? super Runnable> c) {
1186     if (c == null)
1187     throw new NullPointerException();
1188     if (c == this)
1189     throw new IllegalArgumentException();
1190     final ReentrantLock lock = this.lock;
1191     lock.lock();
1192     try {
1193     RunnableScheduledFuture<?> first;
1194     int n = 0;
1195     while ((first = peekExpired()) != null) {
1196     c.add(first); // In this order, in case add() throws.
1197     finishPoll(first);
1198     ++n;
1199     }
1200     return n;
1201     } finally {
1202     lock.unlock();
1203     }
1204     }
1205    
1206     public int drainTo(Collection<? super Runnable> c, int maxElements) {
1207     if (c == null)
1208     throw new NullPointerException();
1209     if (c == this)
1210     throw new IllegalArgumentException();
1211     if (maxElements <= 0)
1212     return 0;
1213     final ReentrantLock lock = this.lock;
1214     lock.lock();
1215     try {
1216     RunnableScheduledFuture<?> first;
1217     int n = 0;
1218     while (n < maxElements && (first = peekExpired()) != null) {
1219     c.add(first); // In this order, in case add() throws.
1220     finishPoll(first);
1221     ++n;
1222     }
1223     return n;
1224     } finally {
1225     lock.unlock();
1226     }
1227     }
1228    
1229     public Object[] toArray() {
1230     final ReentrantLock lock = this.lock;
1231     lock.lock();
1232     try {
1233     return Arrays.copyOf(queue, size, Object[].class);
1234     } finally {
1235     lock.unlock();
1236     }
1237     }
1238    
1239     @SuppressWarnings("unchecked")
1240     public <T> T[] toArray(T[] a) {
1241     final ReentrantLock lock = this.lock;
1242     lock.lock();
1243     try {
1244     if (a.length < size)
1245     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1246     System.arraycopy(queue, 0, a, 0, size);
1247     if (a.length > size)
1248     a[size] = null;
1249     return a;
1250     } finally {
1251     lock.unlock();
1252     }
1253     }
1254    
1255     public Iterator<Runnable> iterator() {
1256     return new Itr(Arrays.copyOf(queue, size));
1257     }
1258    
1259     /**
1260     * Snapshot iterator that works off copy of underlying q array.
1261     */
1262     private class Itr implements Iterator<Runnable> {
1263 jsr166 1.7 final RunnableScheduledFuture<?>[] array;
1264 jsr166 1.9 int cursor; // index of next element to return; initially 0
1265     int lastRet = -1; // index of last element returned; -1 if no such
1266 dl 1.1
1267 jsr166 1.7 Itr(RunnableScheduledFuture<?>[] array) {
1268 dl 1.1 this.array = array;
1269     }
1270    
1271     public boolean hasNext() {
1272     return cursor < array.length;
1273     }
1274    
1275     public Runnable next() {
1276     if (cursor >= array.length)
1277     throw new NoSuchElementException();
1278     lastRet = cursor;
1279     return array[cursor++];
1280     }
1281    
1282     public void remove() {
1283     if (lastRet < 0)
1284     throw new IllegalStateException();
1285     DelayedWorkQueue.this.remove(array[lastRet]);
1286     lastRet = -1;
1287     }
1288     }
1289     }
1290     }