ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.48
Committed: Sun May 18 23:47:56 2008 UTC (16 years ago) by jsr166
Branch: MAIN
Changes since 1.47: +123 -123 lines
Log Message:
Sync with OpenJDK; untabify

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.atomic.*;
9 dl 1.40 import java.util.concurrent.locks.*;
10 dl 1.1 import java.util.*;
11    
12     /**
13 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
14     * commands to run after a given delay, or to execute
15     * periodically. This class is preferable to {@link java.util.Timer}
16     * when multiple worker threads are needed, or when the additional
17     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18     * this class extends) are required.
19 dl 1.1 *
20 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
21 dl 1.18 * without any real-time guarantees about when, after they are
22     * enabled, they will commence. Tasks scheduled for exactly the same
23     * execution time are enabled in first-in-first-out (FIFO) order of
24 jsr166 1.46 * submission.
25     *
26     * <p>When a submitted task is cancelled before it is run, execution
27     * is suppressed. By default, such a cancelled task is not
28     * automatically removed from the work queue until its delay
29     * elapses. While this enables further inspection and monitoring, it
30     * may also cause unbounded retention of cancelled tasks. To avoid
31     * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32     * causes tasks to be immediately removed from the work queue at
33     * time of cancellation.
34 dl 1.1 *
35     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36 dl 1.8 * of the inherited tuning methods are not useful for it. In
37     * particular, because it acts as a fixed-sized pool using
38 jsr166 1.39 * {@code corePoolSize} threads and an unbounded queue, adjustments
39     * to {@code maximumPoolSize} have no useful effect. Additionally, it
40     * is almost never a good idea to set {@code corePoolSize} to zero or
41     * use {@code allowCoreThreadTimeOut} because this may leave the pool
42 dl 1.37 * without threads to handle tasks once they become eligible to run.
43 dl 1.1 *
44 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
45     * {@link ThreadPoolExecutor#execute execute} and
46     * {@link AbstractExecutorService#submit(Runnable) submit}
47     * methods to generate internal {@link ScheduledFuture} objects to
48     * control per-task delays and scheduling. To preserve
49     * functionality, any further overrides of these methods in
50 dl 1.32 * subclasses must invoke superclass versions, which effectively
51 jsr166 1.39 * disables additional task customization. However, this class
52 dl 1.32 * provides alternative protected extension method
53 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
54     * {@code Callable}) that can be used to customize the concrete task
55     * types used to execute commands entered via {@code execute},
56     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
57     * and {@code scheduleWithFixedDelay}. By default, a
58     * {@code ScheduledThreadPoolExecutor} uses a task type extending
59 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
60     * subclasses of the form:
61     *
62 jsr166 1.39 * <pre> {@code
63 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
64     *
65 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
66 dl 1.23 *
67 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
68     * Runnable r, RunnableScheduledFuture<V> task) {
69     * return new CustomTask<V>(r, task);
70 jsr166 1.29 * }
71 dl 1.23 *
72 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
73     * Callable<V> c, RunnableScheduledFuture<V> task) {
74     * return new CustomTask<V>(c, task);
75 jsr166 1.29 * }
76     * // ... add constructors, etc.
77 jsr166 1.39 * }}</pre>
78     *
79 dl 1.1 * @since 1.5
80     * @author Doug Lea
81     */
82 jsr166 1.21 public class ScheduledThreadPoolExecutor
83     extends ThreadPoolExecutor
84 tim 1.3 implements ScheduledExecutorService {
85 dl 1.1
86 dl 1.37 /*
87     * This class specializes ThreadPoolExecutor implementation by
88     *
89     * 1. Using a custom task type, ScheduledFutureTask for
90     * tasks, even those that don't require scheduling (i.e.,
91     * those submitted using ExecutorService execute, not
92     * ScheduledExecutorService methods) which are treated as
93     * delayed tasks with a delay of zero.
94     *
95 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
96 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
97     * the fact that corePoolSize and maximumPoolSize are
98     * effectively identical simplifies some execution mechanics
99 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
100 dl 1.37 *
101     * 3. Supporting optional run-after-shutdown parameters, which
102     * leads to overrides of shutdown methods to remove and cancel
103     * tasks that should NOT be run after shutdown, as well as
104     * different recheck logic when task (re)submission overlaps
105     * with a shutdown.
106     *
107     * 4. Task decoration methods to allow interception and
108     * instrumentation, which are needed because subclasses cannot
109     * otherwise override submit methods to get this effect. These
110     * don't have any impact on pool control logic though.
111     */
112    
113 dl 1.1 /**
114     * False if should cancel/suppress periodic tasks on shutdown.
115     */
116     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
117    
118     /**
119     * False if should cancel non-periodic tasks on shutdown.
120     */
121     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
122    
123     /**
124 dl 1.41 * True if ScheduledFutureTask.cancel should remove from queue
125     */
126     private volatile boolean removeOnCancel = false;
127    
128     /**
129 dl 1.1 * Sequence number to break scheduling ties, and in turn to
130     * guarantee FIFO order among tied entries.
131     */
132     private static final AtomicLong sequencer = new AtomicLong(0);
133 dl 1.14
134     /**
135 jsr166 1.39 * Returns current nanosecond time.
136 dl 1.14 */
137 dl 1.17 final long now() {
138 jsr166 1.39 return System.nanoTime();
139 dl 1.14 }
140    
141 jsr166 1.21 private class ScheduledFutureTask<V>
142 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
143 jsr166 1.21
144 dl 1.1 /** Sequence number to break ties FIFO */
145     private final long sequenceNumber;
146 jsr166 1.44
147 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
148     private long time;
149 jsr166 1.44
150 dl 1.16 /**
151     * Period in nanoseconds for repeating tasks. A positive
152     * value indicates fixed-rate execution. A negative value
153     * indicates fixed-delay execution. A value of 0 indicates a
154     * non-repeating task.
155     */
156 dl 1.1 private final long period;
157    
158 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
159     RunnableScheduledFuture<V> outerTask = this;
160 jsr166 1.44
161 dl 1.1 /**
162 dl 1.40 * Index into delay queue, to support faster cancellation.
163     */
164     int heapIndex;
165    
166     /**
167 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
168 dl 1.1 */
169     ScheduledFutureTask(Runnable r, V result, long ns) {
170     super(r, result);
171     this.time = ns;
172     this.period = 0;
173     this.sequenceNumber = sequencer.getAndIncrement();
174     }
175    
176     /**
177 jsr166 1.30 * Creates a periodic action with given nano time and period.
178 dl 1.1 */
179 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
180 dl 1.1 super(r, result);
181     this.time = ns;
182     this.period = period;
183     this.sequenceNumber = sequencer.getAndIncrement();
184     }
185    
186     /**
187 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger.
188 dl 1.1 */
189     ScheduledFutureTask(Callable<V> callable, long ns) {
190     super(callable);
191     this.time = ns;
192     this.period = 0;
193     this.sequenceNumber = sequencer.getAndIncrement();
194     }
195    
196     public long getDelay(TimeUnit unit) {
197 dl 1.14 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
198 dl 1.1 return d;
199     }
200    
201 dl 1.20 public int compareTo(Delayed other) {
202 dl 1.1 if (other == this) // compare zero ONLY if same object
203     return 0;
204 dl 1.34 if (other instanceof ScheduledFutureTask) {
205     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
206     long diff = time - x.time;
207     if (diff < 0)
208     return -1;
209     else if (diff > 0)
210     return 1;
211     else if (sequenceNumber < x.sequenceNumber)
212     return -1;
213     else
214     return 1;
215     }
216     long d = (getDelay(TimeUnit.NANOSECONDS) -
217     other.getDelay(TimeUnit.NANOSECONDS));
218 jsr166 1.38 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
219 dl 1.1 }
220    
221     /**
222 dl 1.18 * Returns true if this is a periodic (not a one-shot) action.
223 jsr166 1.30 *
224 dl 1.1 * @return true if periodic
225     */
226 dl 1.23 public boolean isPeriodic() {
227 dl 1.16 return period != 0;
228 dl 1.1 }
229    
230     /**
231 jsr166 1.39 * Sets the next time to run for a periodic task.
232 dl 1.13 */
233 dl 1.37 private void setNextRunTime() {
234     long p = period;
235     if (p > 0)
236     time += p;
237     else
238     time = now() - p;
239 dl 1.13 }
240    
241 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
242 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
243     if (cancelled && removeOnCancel && heapIndex >= 0)
244 jsr166 1.42 remove(this);
245 dl 1.41 return cancelled;
246 dl 1.40 }
247    
248 dl 1.13 /**
249 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
250 jsr166 1.21 */
251 dl 1.1 public void run() {
252 dl 1.37 boolean periodic = isPeriodic();
253     if (!canRunInCurrentRunState(periodic))
254     cancel(false);
255     else if (!periodic)
256 dl 1.5 ScheduledFutureTask.super.run();
257 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
258     setNextRunTime();
259 jsr166 1.44 reExecutePeriodic(outerTask);
260 dl 1.37 }
261 dl 1.1 }
262     }
263    
264     /**
265 dl 1.37 * Returns true if can run a task given current run state
266 jsr166 1.39 * and run-after-shutdown parameters.
267     *
268 dl 1.37 * @param periodic true if this task periodic, false if delayed
269     */
270     boolean canRunInCurrentRunState(boolean periodic) {
271 jsr166 1.38 return isRunningOrShutdown(periodic ?
272 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
273     executeExistingDelayedTasksAfterShutdown);
274     }
275    
276     /**
277     * Main execution method for delayed or periodic tasks. If pool
278     * is shut down, rejects the task. Otherwise adds task to queue
279     * and starts a thread, if necessary, to run it. (We cannot
280     * prestart the thread to run the task because the task (probably)
281     * shouldn't be run yet,) If the pool is shut down while the task
282     * is being added, cancel and remove it if required by state and
283 jsr166 1.39 * run-after-shutdown parameters.
284     *
285 dl 1.37 * @param task the task
286     */
287     private void delayedExecute(RunnableScheduledFuture<?> task) {
288     if (isShutdown())
289     reject(task);
290     else {
291     super.getQueue().add(task);
292     if (isShutdown() &&
293     !canRunInCurrentRunState(task.isPeriodic()) &&
294     remove(task))
295     task.cancel(false);
296 jsr166 1.48 else
297     prestartCoreThread();
298 dl 1.37 }
299     }
300 jsr166 1.21
301 dl 1.37 /**
302 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
303     * Same idea as delayedExecute except drops task rather than rejecting.
304     *
305 dl 1.37 * @param task the task
306     */
307     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
308     if (canRunInCurrentRunState(true)) {
309     super.getQueue().add(task);
310     if (!canRunInCurrentRunState(true) && remove(task))
311     task.cancel(false);
312 jsr166 1.48 else
313     prestartCoreThread();
314 dl 1.37 }
315 dl 1.13 }
316 dl 1.1
317 dl 1.13 /**
318 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
319 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
320 dl 1.13 */
321 dl 1.37 @Override void onShutdown() {
322     BlockingQueue<Runnable> q = super.getQueue();
323     boolean keepDelayed =
324     getExecuteExistingDelayedTasksAfterShutdownPolicy();
325     boolean keepPeriodic =
326     getContinueExistingPeriodicTasksAfterShutdownPolicy();
327 jsr166 1.21 if (!keepDelayed && !keepPeriodic)
328 dl 1.37 q.clear();
329     else {
330     // Traverse snapshot to avoid iterator exceptions
331 jsr166 1.39 for (Object e : q.toArray()) {
332 dl 1.23 if (e instanceof RunnableScheduledFuture) {
333 dl 1.37 RunnableScheduledFuture<?> t =
334     (RunnableScheduledFuture<?>)e;
335 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
336     t.isCancelled()) { // also remove if already cancelled
337     if (q.remove(t))
338     t.cancel(false);
339     }
340 dl 1.13 }
341     }
342 dl 1.1 }
343 jsr166 1.48 tryTerminate();
344 dl 1.1 }
345    
346 dl 1.23 /**
347 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
348 jsr166 1.28 * This method can be used to override the concrete
349 dl 1.23 * class used for managing internal tasks.
350 jsr166 1.30 * The default implementation simply returns the given task.
351 jsr166 1.28 *
352 dl 1.23 * @param runnable the submitted Runnable
353     * @param task the task created to execute the runnable
354     * @return a task that can execute the runnable
355     * @since 1.6
356     */
357 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
358 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
359     return task;
360 peierls 1.22 }
361    
362 dl 1.23 /**
363 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
364 jsr166 1.28 * This method can be used to override the concrete
365 dl 1.23 * class used for managing internal tasks.
366 jsr166 1.30 * The default implementation simply returns the given task.
367 jsr166 1.28 *
368 dl 1.23 * @param callable the submitted Callable
369     * @param task the task created to execute the callable
370     * @return a task that can execute the callable
371     * @since 1.6
372     */
373 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
374 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
375     return task;
376 dl 1.19 }
377    
378 dl 1.1 /**
379 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
380     * given core pool size.
381 jsr166 1.21 *
382 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
383     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
384     * @throws IllegalArgumentException if {@code corePoolSize < 0}
385 dl 1.1 */
386     public ScheduledThreadPoolExecutor(int corePoolSize) {
387     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
388     new DelayedWorkQueue());
389     }
390    
391     /**
392 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
393     * given initial parameters.
394 jsr166 1.21 *
395 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
396     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
397 dl 1.1 * @param threadFactory the factory to use when the executor
398 jsr166 1.39 * creates a new thread
399     * @throws IllegalArgumentException if {@code corePoolSize < 0}
400     * @throws NullPointerException if {@code threadFactory} is null
401 dl 1.1 */
402     public ScheduledThreadPoolExecutor(int corePoolSize,
403     ThreadFactory threadFactory) {
404     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
405     new DelayedWorkQueue(), threadFactory);
406     }
407    
408     /**
409 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
410     * initial parameters.
411 jsr166 1.21 *
412 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
413     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
414 dl 1.1 * @param handler the handler to use when execution is blocked
415 jsr166 1.39 * because the thread bounds and queue capacities are reached
416     * @throws IllegalArgumentException if {@code corePoolSize < 0}
417     * @throws NullPointerException if {@code handler} is null
418 dl 1.1 */
419     public ScheduledThreadPoolExecutor(int corePoolSize,
420     RejectedExecutionHandler handler) {
421     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
422     new DelayedWorkQueue(), handler);
423     }
424    
425     /**
426 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
427     * initial parameters.
428 jsr166 1.21 *
429 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
430     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431 dl 1.1 * @param threadFactory the factory to use when the executor
432 jsr166 1.39 * creates a new thread
433 dl 1.1 * @param handler the handler to use when execution is blocked
434 jsr166 1.39 * because the thread bounds and queue capacities are reached
435     * @throws IllegalArgumentException if {@code corePoolSize < 0}
436     * @throws NullPointerException if {@code threadFactory} or
437     * {@code handler} is null
438 dl 1.1 */
439     public ScheduledThreadPoolExecutor(int corePoolSize,
440     ThreadFactory threadFactory,
441     RejectedExecutionHandler handler) {
442     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
443     new DelayedWorkQueue(), threadFactory, handler);
444     }
445    
446 dl 1.37 /**
447     * @throws RejectedExecutionException {@inheritDoc}
448     * @throws NullPointerException {@inheritDoc}
449     */
450 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
451     long delay,
452 dl 1.13 TimeUnit unit) {
453 dl 1.9 if (command == null || unit == null)
454 dl 1.1 throw new NullPointerException();
455 dl 1.37 if (delay < 0) delay = 0;
456 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
457 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
458 jsr166 1.42 new ScheduledFutureTask<Void>(command, null, triggerTime));
459 dl 1.1 delayedExecute(t);
460     return t;
461     }
462    
463 dl 1.37 /**
464     * @throws RejectedExecutionException {@inheritDoc}
465     * @throws NullPointerException {@inheritDoc}
466     */
467 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
468     long delay,
469 dl 1.13 TimeUnit unit) {
470 dl 1.9 if (callable == null || unit == null)
471 dl 1.1 throw new NullPointerException();
472 dl 1.16 if (delay < 0) delay = 0;
473 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
474 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
475     new ScheduledFutureTask<V>(callable, triggerTime));
476 dl 1.1 delayedExecute(t);
477     return t;
478     }
479    
480 dl 1.37 /**
481     * @throws RejectedExecutionException {@inheritDoc}
482     * @throws NullPointerException {@inheritDoc}
483     * @throws IllegalArgumentException {@inheritDoc}
484     */
485 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
486     long initialDelay,
487     long period,
488 dl 1.13 TimeUnit unit) {
489 dl 1.9 if (command == null || unit == null)
490 dl 1.1 throw new NullPointerException();
491     if (period <= 0)
492     throw new IllegalArgumentException();
493 dl 1.16 if (initialDelay < 0) initialDelay = 0;
494 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
495 jsr166 1.48 ScheduledFutureTask<Void> sft =
496     new ScheduledFutureTask<Void>(command,
497     null,
498     triggerTime,
499     unit.toNanos(period));
500 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
501 jsr166 1.48 sft.outerTask = t;
502 dl 1.1 delayedExecute(t);
503     return t;
504     }
505 jsr166 1.21
506 dl 1.37 /**
507     * @throws RejectedExecutionException {@inheritDoc}
508     * @throws NullPointerException {@inheritDoc}
509     * @throws IllegalArgumentException {@inheritDoc}
510     */
511 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
512     long initialDelay,
513     long delay,
514 dl 1.13 TimeUnit unit) {
515 dl 1.9 if (command == null || unit == null)
516 dl 1.1 throw new NullPointerException();
517     if (delay <= 0)
518     throw new IllegalArgumentException();
519 dl 1.16 if (initialDelay < 0) initialDelay = 0;
520 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
521 jsr166 1.48 ScheduledFutureTask<Void> sft =
522     new ScheduledFutureTask<Void>(command,
523     null,
524     triggerTime,
525     unit.toNanos(-delay));
526 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
527 jsr166 1.48 sft.outerTask = t;
528 dl 1.1 delayedExecute(t);
529     return t;
530     }
531 jsr166 1.21
532 dl 1.1 /**
533 jsr166 1.39 * Executes {@code command} with zero required delay.
534     * This has effect equivalent to
535     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
536     * Note that inspections of the queue and of the list returned by
537     * {@code shutdownNow} will access the zero-delayed
538     * {@link ScheduledFuture}, not the {@code command} itself.
539     *
540     * <p>A consequence of the use of {@code ScheduledFuture} objects is
541     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
542     * called with a null second {@code Throwable} argument, even if the
543     * {@code command} terminated abruptly. Instead, the {@code Throwable}
544     * thrown by such a task can be obtained via {@link Future#get}.
545 dl 1.1 *
546     * @throws RejectedExecutionException at discretion of
547 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
548     * cannot be accepted for execution because the
549     * executor has been shut down
550     * @throws NullPointerException {@inheritDoc}
551 dl 1.1 */
552     public void execute(Runnable command) {
553     schedule(command, 0, TimeUnit.NANOSECONDS);
554     }
555    
556 dl 1.13 // Override AbstractExecutorService methods
557    
558 dl 1.37 /**
559     * @throws RejectedExecutionException {@inheritDoc}
560     * @throws NullPointerException {@inheritDoc}
561     */
562 dl 1.7 public Future<?> submit(Runnable task) {
563     return schedule(task, 0, TimeUnit.NANOSECONDS);
564     }
565    
566 dl 1.37 /**
567     * @throws RejectedExecutionException {@inheritDoc}
568     * @throws NullPointerException {@inheritDoc}
569     */
570 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
571 jsr166 1.21 return schedule(Executors.callable(task, result),
572 dl 1.13 0, TimeUnit.NANOSECONDS);
573 dl 1.7 }
574    
575 dl 1.37 /**
576     * @throws RejectedExecutionException {@inheritDoc}
577     * @throws NullPointerException {@inheritDoc}
578     */
579 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
580     return schedule(task, 0, TimeUnit.NANOSECONDS);
581     }
582 dl 1.1
583     /**
584 dl 1.37 * Sets the policy on whether to continue executing existing
585 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
586     * In this case, these tasks will only terminate upon
587     * {@code shutdownNow} or after setting the policy to
588     * {@code false} when already shutdown.
589     * This value is by default {@code false}.
590 jsr166 1.30 *
591 jsr166 1.39 * @param value if {@code true}, continue after shutdown, else don't.
592 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
593 dl 1.1 */
594     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
595     continueExistingPeriodicTasksAfterShutdown = value;
596 jsr166 1.39 if (!value && isShutdown())
597 dl 1.37 onShutdown();
598 dl 1.1 }
599    
600     /**
601 jsr166 1.21 * Gets the policy on whether to continue executing existing
602 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
603     * In this case, these tasks will only terminate upon
604     * {@code shutdownNow} or after setting the policy to
605     * {@code false} when already shutdown.
606     * This value is by default {@code false}.
607 jsr166 1.30 *
608 jsr166 1.39 * @return {@code true} if will continue after shutdown
609 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
610 dl 1.1 */
611     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
612     return continueExistingPeriodicTasksAfterShutdown;
613     }
614    
615     /**
616 jsr166 1.21 * Sets the policy on whether to execute existing delayed
617 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
618     * In this case, these tasks will only terminate upon
619     * {@code shutdownNow}, or after setting the policy to
620     * {@code false} when already shutdown.
621     * This value is by default {@code true}.
622 jsr166 1.30 *
623 jsr166 1.39 * @param value if {@code true}, execute after shutdown, else don't.
624 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
625 dl 1.1 */
626     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
627     executeExistingDelayedTasksAfterShutdown = value;
628 jsr166 1.39 if (!value && isShutdown())
629 dl 1.37 onShutdown();
630 dl 1.1 }
631    
632     /**
633 jsr166 1.21 * Gets the policy on whether to execute existing delayed
634 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
635     * In this case, these tasks will only terminate upon
636     * {@code shutdownNow}, or after setting the policy to
637     * {@code false} when already shutdown.
638     * This value is by default {@code true}.
639 jsr166 1.30 *
640 jsr166 1.39 * @return {@code true} if will execute after shutdown
641 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
642 dl 1.1 */
643     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
644     return executeExistingDelayedTasksAfterShutdown;
645     }
646    
647     /**
648 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
649     * removed from the work queue at time of cancellation. This value is
650     * by default {@code false}.
651 dl 1.41 *
652 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
653 dl 1.41 * @see #getRemoveOnCancelPolicy
654 jsr166 1.43 * @since 1.7
655 dl 1.41 */
656     public void setRemoveOnCancelPolicy(boolean value) {
657     removeOnCancel = value;
658     }
659    
660     /**
661 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
662     * removed from the work queue at time of cancellation. This value is
663     * by default {@code false}.
664 dl 1.41 *
665 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
666     * from the queue
667 dl 1.41 * @see #setRemoveOnCancelPolicy
668 jsr166 1.43 * @since 1.7
669 dl 1.41 */
670     public boolean getRemoveOnCancelPolicy() {
671     return removeOnCancel;
672     }
673    
674     /**
675 dl 1.1 * Initiates an orderly shutdown in which previously submitted
676 jsr166 1.39 * tasks are executed, but no new tasks will be accepted. If the
677     * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
678     * been set {@code false}, existing delayed tasks whose delays
679     * have not yet elapsed are cancelled. And unless the
680     * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
681     * been set {@code true}, future executions of existing periodic
682 dl 1.1 * tasks will be cancelled.
683 jsr166 1.39 *
684     * @throws SecurityException {@inheritDoc}
685 dl 1.1 */
686     public void shutdown() {
687     super.shutdown();
688     }
689    
690     /**
691     * Attempts to stop all actively executing tasks, halts the
692 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
693     * that were awaiting execution.
694 jsr166 1.21 *
695 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
696 dl 1.18 * processing actively executing tasks. This implementation
697 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
698     * fails to respond to interrupts may never terminate.
699 dl 1.1 *
700 jsr166 1.39 * @return list of tasks that never commenced execution.
701     * Each element of this list is a {@link ScheduledFuture},
702     * including those tasks submitted using {@code execute},
703     * which are for scheduling purposes used as the basis of a
704     * zero-delay {@code ScheduledFuture}.
705 jsr166 1.31 * @throws SecurityException {@inheritDoc}
706 dl 1.1 */
707 tim 1.4 public List<Runnable> shutdownNow() {
708 dl 1.1 return super.shutdownNow();
709     }
710    
711     /**
712     * Returns the task queue used by this executor. Each element of
713     * this queue is a {@link ScheduledFuture}, including those
714 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
715 dl 1.1 * purposes used as the basis of a zero-delay
716 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
717 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
718 dl 1.1 * which they will execute.
719     *
720     * @return the task queue
721     */
722     public BlockingQueue<Runnable> getQueue() {
723     return super.getQueue();
724     }
725    
726 dl 1.13 /**
727 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
728     * class must be declared as a BlockingQueue<Runnable> even though
729 jsr166 1.42 * it can only hold RunnableScheduledFutures.
730 jsr166 1.21 */
731 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
732 dl 1.13 implements BlockingQueue<Runnable> {
733 jsr166 1.21
734 dl 1.40 /*
735     * A DelayedWorkQueue is based on a heap-based data structure
736     * like those in DelayQueue and PriorityQueue, except that
737     * every ScheduledFutureTask also records its index into the
738     * heap array. This eliminates the need to find a task upon
739     * cancellation, greatly speeding up removal (down from O(n)
740     * to O(log n)), and reducing garbage retention that would
741     * otherwise occur by waiting for the element to rise to top
742     * before clearing. But because the queue may also hold
743     * RunnableScheduledFutures that are not ScheduledFutureTasks,
744     * we are not guaranteed to have such indices available, in
745     * which case we fall back to linear search. (We expect that
746     * most tasks will not be decorated, and that the faster cases
747     * will be much more common.)
748     *
749     * All heap operations must record index changes -- mainly
750     * within siftUp and siftDown. Upon removal, a task's
751     * heapIndex is set to -1. Note that ScheduledFutureTasks can
752     * appear at most once in the queue (this need not be true for
753     * other kinds of tasks or work queues), so are uniquely
754     * identified by heapIndex.
755     */
756    
757 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
758     private RunnableScheduledFuture[] queue =
759 dl 1.40 new RunnableScheduledFuture[INITIAL_CAPACITY];
760 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
761 dl 1.40 private int size = 0;
762    
763 jsr166 1.48 /**
764     * Thread designated to wait for the task at the head of the
765     * queue. This variant of the Leader-Follower pattern
766     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
767     * minimize unnecessary timed waiting. When a thread becomes
768     * the leader, it waits only for the next delay to elapse, but
769     * other threads await indefinitely. The leader thread must
770     * signal some other thread before returning from take() or
771     * poll(...), unless some other thread becomes leader in the
772     * interim. Whenever the head of the queue is replaced with a
773     * task with an earlier expiration time, the leader field is
774     * invalidated by being reset to null, and some waiting
775     * thread, but not necessarily the current leader, is
776     * signalled. So waiting threads must be prepared to acquire
777     * and lose leadership while waiting.
778     */
779     private Thread leader = null;
780    
781     /**
782     * Condition signalled when a newer task becomes available at the
783     * head of the queue or a new thread may need to become leader.
784     */
785     private final Condition available = lock.newCondition();
786 dl 1.40
787     /**
788 jsr166 1.42 * Set f's heapIndex if it is a ScheduledFutureTask.
789 dl 1.40 */
790 jsr166 1.45 private void setIndex(RunnableScheduledFuture f, int idx) {
791 dl 1.40 if (f instanceof ScheduledFutureTask)
792     ((ScheduledFutureTask)f).heapIndex = idx;
793     }
794    
795     /**
796 jsr166 1.42 * Sift element added at bottom up to its heap-ordered spot.
797 dl 1.40 * Call only when holding lock.
798     */
799     private void siftUp(int k, RunnableScheduledFuture key) {
800     while (k > 0) {
801     int parent = (k - 1) >>> 1;
802     RunnableScheduledFuture e = queue[parent];
803     if (key.compareTo(e) >= 0)
804     break;
805     queue[k] = e;
806     setIndex(e, k);
807     k = parent;
808     }
809     queue[k] = key;
810     setIndex(key, k);
811     }
812    
813     /**
814 jsr166 1.42 * Sift element added at top down to its heap-ordered spot.
815 dl 1.40 * Call only when holding lock.
816     */
817     private void siftDown(int k, RunnableScheduledFuture key) {
818 jsr166 1.42 int half = size >>> 1;
819 dl 1.40 while (k < half) {
820 jsr166 1.42 int child = (k << 1) + 1;
821 dl 1.40 RunnableScheduledFuture c = queue[child];
822     int right = child + 1;
823     if (right < size && c.compareTo(queue[right]) > 0)
824     c = queue[child = right];
825     if (key.compareTo(c) <= 0)
826     break;
827     queue[k] = c;
828     setIndex(c, k);
829     k = child;
830     }
831     queue[k] = key;
832     setIndex(key, k);
833     }
834    
835     /**
836     * Resize the heap array. Call only when holding lock.
837     */
838     private void grow() {
839     int oldCapacity = queue.length;
840     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
841     if (newCapacity < 0) // overflow
842     newCapacity = Integer.MAX_VALUE;
843     queue = Arrays.copyOf(queue, newCapacity);
844     }
845    
846     /**
847     * Find index of given object, or -1 if absent
848     */
849     private int indexOf(Object x) {
850     if (x != null) {
851 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
852     int i = ((ScheduledFutureTask) x).heapIndex;
853     // Sanity check; x could conceivably be a
854     // ScheduledFutureTask from some other pool.
855     if (i >= 0 && i < size && queue[i] == x)
856     return i;
857     } else {
858     for (int i = 0; i < size; i++)
859     if (x.equals(queue[i]))
860     return i;
861     }
862 dl 1.40 }
863     return -1;
864     }
865    
866 jsr166 1.48 public boolean contains(Object x) {
867     final ReentrantLock lock = this.lock;
868 jsr166 1.45 lock.lock();
869     try {
870 jsr166 1.48 return indexOf(x) != -1;
871 jsr166 1.45 } finally {
872     lock.unlock();
873     }
874 jsr166 1.48 }
875 jsr166 1.45
876 dl 1.40 public boolean remove(Object x) {
877     final ReentrantLock lock = this.lock;
878     lock.lock();
879     try {
880 jsr166 1.45 int i = indexOf(x);
881 jsr166 1.48 if (i < 0)
882     return false;
883 jsr166 1.45
884 jsr166 1.48 setIndex(queue[i], -1);
885     int s = --size;
886     RunnableScheduledFuture replacement = queue[s];
887     queue[s] = null;
888     if (s != i) {
889     siftDown(i, replacement);
890     if (queue[i] == replacement)
891     siftUp(i, replacement);
892     }
893     return true;
894 dl 1.40 } finally {
895     lock.unlock();
896     }
897     }
898    
899     public int size() {
900     final ReentrantLock lock = this.lock;
901     lock.lock();
902     try {
903 jsr166 1.45 return size;
904 dl 1.40 } finally {
905     lock.unlock();
906     }
907     }
908    
909 jsr166 1.42 public boolean isEmpty() {
910     return size() == 0;
911 dl 1.40 }
912    
913     public int remainingCapacity() {
914     return Integer.MAX_VALUE;
915     }
916    
917     public RunnableScheduledFuture peek() {
918     final ReentrantLock lock = this.lock;
919     lock.lock();
920     try {
921     return queue[0];
922     } finally {
923     lock.unlock();
924     }
925 dl 1.13 }
926    
927 dl 1.40 public boolean offer(Runnable x) {
928     if (x == null)
929     throw new NullPointerException();
930     RunnableScheduledFuture e = (RunnableScheduledFuture)x;
931     final ReentrantLock lock = this.lock;
932     lock.lock();
933     try {
934     int i = size;
935     if (i >= queue.length)
936     grow();
937     size = i + 1;
938     if (i == 0) {
939     queue[0] = e;
940     setIndex(e, 0);
941 jsr166 1.45 } else {
942 dl 1.40 siftUp(i, e);
943     }
944 jsr166 1.46 if (queue[0] == e) {
945 jsr166 1.48 leader = null;
946 jsr166 1.46 available.signal();
947 jsr166 1.48 }
948 dl 1.40 } finally {
949     lock.unlock();
950     }
951     return true;
952 jsr166 1.48 }
953 dl 1.40
954     public void put(Runnable e) {
955     offer(e);
956     }
957    
958     public boolean add(Runnable e) {
959 jsr166 1.48 return offer(e);
960     }
961 dl 1.40
962     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
963     return offer(e);
964     }
965 jsr166 1.42
966 jsr166 1.46 /**
967     * Performs common bookkeeping for poll and take: Replaces
968 jsr166 1.47 * first element with last and sifts it down. Call only when
969     * holding lock.
970 jsr166 1.46 * @param f the task to remove and return
971     */
972     private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
973     int s = --size;
974     RunnableScheduledFuture x = queue[s];
975     queue[s] = null;
976     if (s != 0)
977     siftDown(0, x);
978     setIndex(f, -1);
979     return f;
980     }
981    
982 dl 1.40 public RunnableScheduledFuture poll() {
983     final ReentrantLock lock = this.lock;
984     lock.lock();
985     try {
986     RunnableScheduledFuture first = queue[0];
987     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
988     return null;
989 jsr166 1.42 else
990 dl 1.40 return finishPoll(first);
991     } finally {
992     lock.unlock();
993     }
994     }
995    
996     public RunnableScheduledFuture take() throws InterruptedException {
997     final ReentrantLock lock = this.lock;
998     lock.lockInterruptibly();
999     try {
1000     for (;;) {
1001     RunnableScheduledFuture first = queue[0];
1002 jsr166 1.42 if (first == null)
1003 dl 1.40 available.await();
1004     else {
1005 jsr166 1.48 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1006     if (delay <= 0)
1007     return finishPoll(first);
1008     else if (leader != null)
1009     available.await();
1010     else {
1011     Thread thisThread = Thread.currentThread();
1012     leader = thisThread;
1013     try {
1014     available.awaitNanos(delay);
1015     } finally {
1016     if (leader == thisThread)
1017     leader = null;
1018     }
1019     }
1020 dl 1.40 }
1021     }
1022     } finally {
1023 jsr166 1.48 if (leader == null && queue[0] != null)
1024     available.signal();
1025 dl 1.40 lock.unlock();
1026     }
1027     }
1028    
1029 jsr166 1.42 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1030 dl 1.40 throws InterruptedException {
1031     long nanos = unit.toNanos(timeout);
1032     final ReentrantLock lock = this.lock;
1033     lock.lockInterruptibly();
1034     try {
1035     for (;;) {
1036     RunnableScheduledFuture first = queue[0];
1037     if (first == null) {
1038     if (nanos <= 0)
1039     return null;
1040     else
1041     nanos = available.awaitNanos(nanos);
1042     } else {
1043     long delay = first.getDelay(TimeUnit.NANOSECONDS);
1044 jsr166 1.48 if (delay <= 0)
1045 dl 1.40 return finishPoll(first);
1046 jsr166 1.48 if (nanos <= 0)
1047     return null;
1048     if (nanos < delay || leader != null)
1049     nanos = available.awaitNanos(nanos);
1050     else {
1051     Thread thisThread = Thread.currentThread();
1052     leader = thisThread;
1053     try {
1054     long timeLeft = available.awaitNanos(delay);
1055     nanos -= delay - timeLeft;
1056     } finally {
1057     if (leader == thisThread)
1058     leader = null;
1059     }
1060     }
1061     }
1062     }
1063 dl 1.40 } finally {
1064 jsr166 1.48 if (leader == null && queue[0] != null)
1065     available.signal();
1066 dl 1.40 lock.unlock();
1067     }
1068     }
1069    
1070     public void clear() {
1071     final ReentrantLock lock = this.lock;
1072     lock.lock();
1073     try {
1074     for (int i = 0; i < size; i++) {
1075     RunnableScheduledFuture t = queue[i];
1076     if (t != null) {
1077     queue[i] = null;
1078     setIndex(t, -1);
1079     }
1080     }
1081     size = 0;
1082     } finally {
1083     lock.unlock();
1084     }
1085 dl 1.13 }
1086 dl 1.40
1087     /**
1088     * Return and remove first element only if it is expired.
1089     * Used only by drainTo. Call only when holding lock.
1090     */
1091     private RunnableScheduledFuture pollExpired() {
1092     RunnableScheduledFuture first = queue[0];
1093 jsr166 1.42 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1094 dl 1.40 return null;
1095 jsr166 1.48 return finishPoll(first);
1096 dl 1.40 }
1097    
1098     public int drainTo(Collection<? super Runnable> c) {
1099     if (c == null)
1100     throw new NullPointerException();
1101     if (c == this)
1102     throw new IllegalArgumentException();
1103     final ReentrantLock lock = this.lock;
1104     lock.lock();
1105     try {
1106 jsr166 1.48 RunnableScheduledFuture first;
1107 dl 1.40 int n = 0;
1108 jsr166 1.45 while ((first = pollExpired()) != null) {
1109 jsr166 1.48 c.add(first);
1110     ++n;
1111     }
1112 dl 1.40 return n;
1113     } finally {
1114     lock.unlock();
1115     }
1116 dl 1.13 }
1117    
1118 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1119 dl 1.40 if (c == null)
1120     throw new NullPointerException();
1121     if (c == this)
1122     throw new IllegalArgumentException();
1123     if (maxElements <= 0)
1124     return 0;
1125     final ReentrantLock lock = this.lock;
1126     lock.lock();
1127     try {
1128 jsr166 1.48 RunnableScheduledFuture first;
1129 dl 1.40 int n = 0;
1130 jsr166 1.45 while (n < maxElements && (first = pollExpired()) != null) {
1131 jsr166 1.48 c.add(first);
1132     ++n;
1133     }
1134 dl 1.40 return n;
1135     } finally {
1136     lock.unlock();
1137     }
1138     }
1139    
1140     public Object[] toArray() {
1141     final ReentrantLock lock = this.lock;
1142     lock.lock();
1143     try {
1144 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1145 dl 1.40 } finally {
1146     lock.unlock();
1147     }
1148     }
1149    
1150 jsr166 1.48 @SuppressWarnings("unchecked")
1151 dl 1.40 public <T> T[] toArray(T[] a) {
1152     final ReentrantLock lock = this.lock;
1153     lock.lock();
1154     try {
1155     if (a.length < size)
1156     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1157     System.arraycopy(queue, 0, a, 0, size);
1158     if (a.length > size)
1159     a[size] = null;
1160     return a;
1161     } finally {
1162     lock.unlock();
1163     }
1164 dl 1.13 }
1165    
1166 jsr166 1.21 public Iterator<Runnable> iterator() {
1167 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1168 dl 1.40 }
1169 jsr166 1.42
1170 dl 1.40 /**
1171     * Snapshot iterator that works off copy of underlying q array.
1172     */
1173     private class Itr implements Iterator<Runnable> {
1174 jsr166 1.45 final RunnableScheduledFuture[] array;
1175 jsr166 1.48 int cursor = 0; // index of next element to return
1176     int lastRet = -1; // index of last element, or -1 if no such
1177 jsr166 1.42
1178 jsr166 1.45 Itr(RunnableScheduledFuture[] array) {
1179 dl 1.40 this.array = array;
1180     }
1181 jsr166 1.42
1182 dl 1.40 public boolean hasNext() {
1183     return cursor < array.length;
1184     }
1185 jsr166 1.42
1186 dl 1.40 public Runnable next() {
1187     if (cursor >= array.length)
1188     throw new NoSuchElementException();
1189     lastRet = cursor;
1190 jsr166 1.45 return array[cursor++];
1191 dl 1.40 }
1192 jsr166 1.42
1193 dl 1.40 public void remove() {
1194     if (lastRet < 0)
1195     throw new IllegalStateException();
1196     DelayedWorkQueue.this.remove(array[lastRet]);
1197     lastRet = -1;
1198     }
1199 dl 1.13 }
1200     }
1201 dl 1.1 }