ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.45
Committed: Tue Sep 18 01:04:35 2007 UTC (16 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.44: +58 -62 lines
Log Message:
6602600: Fast removal of cancelled scheduled thread pool tasks

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.atomic.*;
9 dl 1.40 import java.util.concurrent.locks.*;
10 dl 1.1 import java.util.*;
11    
12     /**
13 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
14     * commands to run after a given delay, or to execute
15     * periodically. This class is preferable to {@link java.util.Timer}
16     * when multiple worker threads are needed, or when the additional
17     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18     * this class extends) are required.
19 dl 1.1 *
20     * <p> Delayed tasks execute no sooner than they are enabled, but
21 dl 1.18 * without any real-time guarantees about when, after they are
22     * enabled, they will commence. Tasks scheduled for exactly the same
23     * execution time are enabled in first-in-first-out (FIFO) order of
24 jsr166 1.43 * submission. If {@link #setRemoveOnCancelPolicy} is set {@code true},
25 dl 1.41 * cancelled tasks are automatically removed from the work queue.
26 dl 1.1 *
27     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28 dl 1.8 * of the inherited tuning methods are not useful for it. In
29     * particular, because it acts as a fixed-sized pool using
30 jsr166 1.39 * {@code corePoolSize} threads and an unbounded queue, adjustments
31     * to {@code maximumPoolSize} have no useful effect. Additionally, it
32     * is almost never a good idea to set {@code corePoolSize} to zero or
33     * use {@code allowCoreThreadTimeOut} because this may leave the pool
34 dl 1.37 * without threads to handle tasks once they become eligible to run.
35 dl 1.1 *
36 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
37     * {@link ThreadPoolExecutor#execute execute} and
38     * {@link AbstractExecutorService#submit(Runnable) submit}
39     * methods to generate internal {@link ScheduledFuture} objects to
40     * control per-task delays and scheduling. To preserve
41     * functionality, any further overrides of these methods in
42 dl 1.32 * subclasses must invoke superclass versions, which effectively
43 jsr166 1.39 * disables additional task customization. However, this class
44 dl 1.32 * provides alternative protected extension method
45 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
46     * {@code Callable}) that can be used to customize the concrete task
47     * types used to execute commands entered via {@code execute},
48     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
49     * and {@code scheduleWithFixedDelay}. By default, a
50     * {@code ScheduledThreadPoolExecutor} uses a task type extending
51 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
52     * subclasses of the form:
53     *
54 jsr166 1.39 * <pre> {@code
55 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
56     *
57 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
58 dl 1.23 *
59 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
60     * Runnable r, RunnableScheduledFuture<V> task) {
61     * return new CustomTask<V>(r, task);
62 jsr166 1.29 * }
63 dl 1.23 *
64 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
65     * Callable<V> c, RunnableScheduledFuture<V> task) {
66     * return new CustomTask<V>(c, task);
67 jsr166 1.29 * }
68     * // ... add constructors, etc.
69 jsr166 1.39 * }}</pre>
70     *
71 dl 1.1 * @since 1.5
72     * @author Doug Lea
73     */
74 jsr166 1.21 public class ScheduledThreadPoolExecutor
75     extends ThreadPoolExecutor
76 tim 1.3 implements ScheduledExecutorService {
77 dl 1.1
78 dl 1.37 /*
79     * This class specializes ThreadPoolExecutor implementation by
80     *
81     * 1. Using a custom task type, ScheduledFutureTask for
82     * tasks, even those that don't require scheduling (i.e.,
83     * those submitted using ExecutorService execute, not
84     * ScheduledExecutorService methods) which are treated as
85     * delayed tasks with a delay of zero.
86     *
87     * 2. Using a custom queue (DelayedWorkQueue) based on an
88     * unbounded DelayQueue. The lack of capacity constraint and
89     * the fact that corePoolSize and maximumPoolSize are
90     * effectively identical simplifies some execution mechanics
91     * (see delayedExecute) compared to ThreadPoolExecutor
92     * version.
93     *
94     * The DelayedWorkQueue class is defined below for the sake of
95     * ensuring that all elements are instances of
96     * RunnableScheduledFuture. Since DelayQueue otherwise
97     * requires type be Delayed, but not necessarily Runnable, and
98     * the workQueue requires the opposite, we need to explicitly
99     * define a class that requires both to ensure that users don't
100     * add objects that aren't RunnableScheduledFutures via
101     * getQueue().add() etc.
102     *
103     * 3. Supporting optional run-after-shutdown parameters, which
104     * leads to overrides of shutdown methods to remove and cancel
105     * tasks that should NOT be run after shutdown, as well as
106     * different recheck logic when task (re)submission overlaps
107     * with a shutdown.
108     *
109     * 4. Task decoration methods to allow interception and
110     * instrumentation, which are needed because subclasses cannot
111     * otherwise override submit methods to get this effect. These
112     * don't have any impact on pool control logic though.
113     */
114    
115 dl 1.1 /**
116     * False if should cancel/suppress periodic tasks on shutdown.
117     */
118     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
119    
120     /**
121     * False if should cancel non-periodic tasks on shutdown.
122     */
123     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
124    
125     /**
126 dl 1.41 * True if ScheduledFutureTask.cancel should remove from queue
127     */
128     private volatile boolean removeOnCancel = false;
129    
130     /**
131 dl 1.1 * Sequence number to break scheduling ties, and in turn to
132     * guarantee FIFO order among tied entries.
133     */
134     private static final AtomicLong sequencer = new AtomicLong(0);
135 dl 1.14
136     /**
137 jsr166 1.39 * Returns current nanosecond time.
138 dl 1.14 */
139 dl 1.17 final long now() {
140 jsr166 1.39 return System.nanoTime();
141 dl 1.14 }
142    
143 jsr166 1.21 private class ScheduledFutureTask<V>
144 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
145 jsr166 1.21
146 dl 1.1 /** Sequence number to break ties FIFO */
147     private final long sequenceNumber;
148 jsr166 1.44
149 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
150     private long time;
151 jsr166 1.44
152 dl 1.16 /**
153     * Period in nanoseconds for repeating tasks. A positive
154     * value indicates fixed-rate execution. A negative value
155     * indicates fixed-delay execution. A value of 0 indicates a
156     * non-repeating task.
157     */
158 dl 1.1 private final long period;
159    
160 jsr166 1.44 /** The actual task to be re-enqueued by reExecutePeriodic */
161     RunnableScheduledFuture<V> outerTask = this;
162    
163 dl 1.1 /**
164 dl 1.40 * Index into delay queue, to support faster cancellation.
165     */
166     int heapIndex;
167    
168     /**
169 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
170 dl 1.1 */
171     ScheduledFutureTask(Runnable r, V result, long ns) {
172     super(r, result);
173     this.time = ns;
174     this.period = 0;
175     this.sequenceNumber = sequencer.getAndIncrement();
176     }
177    
178     /**
179 jsr166 1.30 * Creates a periodic action with given nano time and period.
180 dl 1.1 */
181 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
182 dl 1.1 super(r, result);
183     this.time = ns;
184     this.period = period;
185     this.sequenceNumber = sequencer.getAndIncrement();
186     }
187    
188     /**
189 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger.
190 dl 1.1 */
191     ScheduledFutureTask(Callable<V> callable, long ns) {
192     super(callable);
193     this.time = ns;
194     this.period = 0;
195     this.sequenceNumber = sequencer.getAndIncrement();
196     }
197    
198     public long getDelay(TimeUnit unit) {
199 dl 1.14 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
200 dl 1.1 return d;
201     }
202    
203 dl 1.20 public int compareTo(Delayed other) {
204 dl 1.1 if (other == this) // compare zero ONLY if same object
205     return 0;
206 dl 1.34 if (other instanceof ScheduledFutureTask) {
207     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
208     long diff = time - x.time;
209     if (diff < 0)
210     return -1;
211     else if (diff > 0)
212     return 1;
213     else if (sequenceNumber < x.sequenceNumber)
214     return -1;
215     else
216     return 1;
217     }
218     long d = (getDelay(TimeUnit.NANOSECONDS) -
219     other.getDelay(TimeUnit.NANOSECONDS));
220 jsr166 1.38 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
221 dl 1.1 }
222    
223     /**
224 dl 1.18 * Returns true if this is a periodic (not a one-shot) action.
225 jsr166 1.30 *
226 dl 1.1 * @return true if periodic
227     */
228 dl 1.23 public boolean isPeriodic() {
229 dl 1.16 return period != 0;
230 dl 1.1 }
231    
232     /**
233 jsr166 1.39 * Sets the next time to run for a periodic task.
234 dl 1.13 */
235 dl 1.37 private void setNextRunTime() {
236     long p = period;
237     if (p > 0)
238     time += p;
239     else
240     time = now() - p;
241 dl 1.13 }
242    
243 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
244 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
245     if (cancelled && removeOnCancel && heapIndex >= 0)
246 jsr166 1.42 remove(this);
247 dl 1.41 return cancelled;
248 dl 1.40 }
249    
250 dl 1.13 /**
251 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
252 jsr166 1.21 */
253 dl 1.1 public void run() {
254 dl 1.37 boolean periodic = isPeriodic();
255     if (!canRunInCurrentRunState(periodic))
256     cancel(false);
257     else if (!periodic)
258 dl 1.5 ScheduledFutureTask.super.run();
259 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
260     setNextRunTime();
261 jsr166 1.44 reExecutePeriodic(outerTask);
262 dl 1.37 }
263 dl 1.1 }
264     }
265    
266     /**
267 dl 1.37 * Returns true if can run a task given current run state
268 jsr166 1.39 * and run-after-shutdown parameters.
269     *
270 dl 1.37 * @param periodic true if this task periodic, false if delayed
271     */
272     boolean canRunInCurrentRunState(boolean periodic) {
273 jsr166 1.38 return isRunningOrShutdown(periodic ?
274 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
275     executeExistingDelayedTasksAfterShutdown);
276     }
277    
278     /**
279     * Main execution method for delayed or periodic tasks. If pool
280     * is shut down, rejects the task. Otherwise adds task to queue
281     * and starts a thread, if necessary, to run it. (We cannot
282     * prestart the thread to run the task because the task (probably)
283     * shouldn't be run yet,) If the pool is shut down while the task
284     * is being added, cancel and remove it if required by state and
285 jsr166 1.39 * run-after-shutdown parameters.
286     *
287 dl 1.37 * @param task the task
288     */
289     private void delayedExecute(RunnableScheduledFuture<?> task) {
290     if (isShutdown())
291     reject(task);
292     else {
293     super.getQueue().add(task);
294     if (isShutdown() &&
295     !canRunInCurrentRunState(task.isPeriodic()) &&
296     remove(task))
297     task.cancel(false);
298 jsr166 1.39 else
299     prestartCoreThread();
300 dl 1.37 }
301     }
302 jsr166 1.21
303 dl 1.37 /**
304 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
305     * Same idea as delayedExecute except drops task rather than rejecting.
306     *
307 dl 1.37 * @param task the task
308     */
309     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
310     if (canRunInCurrentRunState(true)) {
311     super.getQueue().add(task);
312     if (!canRunInCurrentRunState(true) && remove(task))
313     task.cancel(false);
314 jsr166 1.39 else
315     prestartCoreThread();
316 dl 1.37 }
317 dl 1.13 }
318 dl 1.1
319 dl 1.13 /**
320 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
321 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
322 dl 1.13 */
323 dl 1.37 @Override void onShutdown() {
324     BlockingQueue<Runnable> q = super.getQueue();
325     boolean keepDelayed =
326     getExecuteExistingDelayedTasksAfterShutdownPolicy();
327     boolean keepPeriodic =
328     getContinueExistingPeriodicTasksAfterShutdownPolicy();
329 jsr166 1.21 if (!keepDelayed && !keepPeriodic)
330 dl 1.37 q.clear();
331     else {
332     // Traverse snapshot to avoid iterator exceptions
333 jsr166 1.39 for (Object e : q.toArray()) {
334 dl 1.23 if (e instanceof RunnableScheduledFuture) {
335 dl 1.37 RunnableScheduledFuture<?> t =
336     (RunnableScheduledFuture<?>)e;
337 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
338     t.isCancelled()) { // also remove if already cancelled
339     if (q.remove(t))
340     t.cancel(false);
341     }
342 dl 1.13 }
343     }
344 dl 1.1 }
345 jsr166 1.39 tryTerminate();
346 dl 1.1 }
347    
348 dl 1.23 /**
349 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
350 jsr166 1.28 * This method can be used to override the concrete
351 dl 1.23 * class used for managing internal tasks.
352 jsr166 1.30 * The default implementation simply returns the given task.
353 jsr166 1.28 *
354 dl 1.23 * @param runnable the submitted Runnable
355     * @param task the task created to execute the runnable
356     * @return a task that can execute the runnable
357     * @since 1.6
358     */
359 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
360 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
361     return task;
362 peierls 1.22 }
363    
364 dl 1.23 /**
365 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
366 jsr166 1.28 * This method can be used to override the concrete
367 dl 1.23 * class used for managing internal tasks.
368 jsr166 1.30 * The default implementation simply returns the given task.
369 jsr166 1.28 *
370 dl 1.23 * @param callable the submitted Callable
371     * @param task the task created to execute the callable
372     * @return a task that can execute the callable
373     * @since 1.6
374     */
375 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
376 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
377     return task;
378 dl 1.19 }
379    
380 dl 1.1 /**
381 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
382     * given core pool size.
383 jsr166 1.21 *
384 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
385     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
386     * @throws IllegalArgumentException if {@code corePoolSize < 0}
387 dl 1.1 */
388     public ScheduledThreadPoolExecutor(int corePoolSize) {
389     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
390     new DelayedWorkQueue());
391     }
392    
393     /**
394 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
395     * given initial parameters.
396 jsr166 1.21 *
397 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
398     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
399 dl 1.1 * @param threadFactory the factory to use when the executor
400 jsr166 1.39 * creates a new thread
401     * @throws IllegalArgumentException if {@code corePoolSize < 0}
402     * @throws NullPointerException if {@code threadFactory} is null
403 dl 1.1 */
404     public ScheduledThreadPoolExecutor(int corePoolSize,
405     ThreadFactory threadFactory) {
406     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
407     new DelayedWorkQueue(), threadFactory);
408     }
409    
410     /**
411 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
412     * initial parameters.
413 jsr166 1.21 *
414 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
415     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
416 dl 1.1 * @param handler the handler to use when execution is blocked
417 jsr166 1.39 * because the thread bounds and queue capacities are reached
418     * @throws IllegalArgumentException if {@code corePoolSize < 0}
419     * @throws NullPointerException if {@code handler} is null
420 dl 1.1 */
421     public ScheduledThreadPoolExecutor(int corePoolSize,
422     RejectedExecutionHandler handler) {
423     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
424     new DelayedWorkQueue(), handler);
425     }
426    
427     /**
428 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
429     * initial parameters.
430 jsr166 1.21 *
431 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
432     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
433 dl 1.1 * @param threadFactory the factory to use when the executor
434 jsr166 1.39 * creates a new thread
435 dl 1.1 * @param handler the handler to use when execution is blocked
436 jsr166 1.39 * because the thread bounds and queue capacities are reached
437     * @throws IllegalArgumentException if {@code corePoolSize < 0}
438     * @throws NullPointerException if {@code threadFactory} or
439     * {@code handler} is null
440 dl 1.1 */
441     public ScheduledThreadPoolExecutor(int corePoolSize,
442     ThreadFactory threadFactory,
443     RejectedExecutionHandler handler) {
444     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
445     new DelayedWorkQueue(), threadFactory, handler);
446     }
447    
448 dl 1.37 /**
449     * @throws RejectedExecutionException {@inheritDoc}
450     * @throws NullPointerException {@inheritDoc}
451     */
452 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
453     long delay,
454 dl 1.13 TimeUnit unit) {
455 dl 1.9 if (command == null || unit == null)
456 dl 1.1 throw new NullPointerException();
457 dl 1.37 if (delay < 0) delay = 0;
458 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
459 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
460 jsr166 1.42 new ScheduledFutureTask<Void>(command, null, triggerTime));
461 dl 1.1 delayedExecute(t);
462     return t;
463     }
464    
465 dl 1.37 /**
466     * @throws RejectedExecutionException {@inheritDoc}
467     * @throws NullPointerException {@inheritDoc}
468     */
469 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
470     long delay,
471 dl 1.13 TimeUnit unit) {
472 dl 1.9 if (callable == null || unit == null)
473 dl 1.1 throw new NullPointerException();
474 dl 1.16 if (delay < 0) delay = 0;
475 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
476 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
477     new ScheduledFutureTask<V>(callable, triggerTime));
478 dl 1.1 delayedExecute(t);
479     return t;
480     }
481    
482 dl 1.37 /**
483     * @throws RejectedExecutionException {@inheritDoc}
484     * @throws NullPointerException {@inheritDoc}
485     * @throws IllegalArgumentException {@inheritDoc}
486     */
487 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
488     long initialDelay,
489     long period,
490 dl 1.13 TimeUnit unit) {
491 dl 1.9 if (command == null || unit == null)
492 dl 1.1 throw new NullPointerException();
493     if (period <= 0)
494     throw new IllegalArgumentException();
495 dl 1.16 if (initialDelay < 0) initialDelay = 0;
496 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
497 jsr166 1.44 ScheduledFutureTask<Void> sft =
498     new ScheduledFutureTask<Void>(command,
499     null,
500     triggerTime,
501     unit.toNanos(period));
502     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
503     sft.outerTask = t;
504 dl 1.1 delayedExecute(t);
505     return t;
506     }
507 jsr166 1.21
508 dl 1.37 /**
509     * @throws RejectedExecutionException {@inheritDoc}
510     * @throws NullPointerException {@inheritDoc}
511     * @throws IllegalArgumentException {@inheritDoc}
512     */
513 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
514     long initialDelay,
515     long delay,
516 dl 1.13 TimeUnit unit) {
517 dl 1.9 if (command == null || unit == null)
518 dl 1.1 throw new NullPointerException();
519     if (delay <= 0)
520     throw new IllegalArgumentException();
521 dl 1.16 if (initialDelay < 0) initialDelay = 0;
522 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
523 jsr166 1.44 ScheduledFutureTask<Void> sft =
524     new ScheduledFutureTask<Void>(command,
525     null,
526     triggerTime,
527     unit.toNanos(-delay));
528     RunnableScheduledFuture<Void> t = decorateTask(command, sft);
529     sft.outerTask = t;
530 dl 1.1 delayedExecute(t);
531     return t;
532     }
533 jsr166 1.21
534 dl 1.1 /**
535 jsr166 1.39 * Executes {@code command} with zero required delay.
536     * This has effect equivalent to
537     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
538     * Note that inspections of the queue and of the list returned by
539     * {@code shutdownNow} will access the zero-delayed
540     * {@link ScheduledFuture}, not the {@code command} itself.
541     *
542     * <p>A consequence of the use of {@code ScheduledFuture} objects is
543     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
544     * called with a null second {@code Throwable} argument, even if the
545     * {@code command} terminated abruptly. Instead, the {@code Throwable}
546     * thrown by such a task can be obtained via {@link Future#get}.
547 dl 1.1 *
548     * @throws RejectedExecutionException at discretion of
549 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
550     * cannot be accepted for execution because the
551     * executor has been shut down
552     * @throws NullPointerException {@inheritDoc}
553 dl 1.1 */
554     public void execute(Runnable command) {
555     schedule(command, 0, TimeUnit.NANOSECONDS);
556     }
557    
558 dl 1.13 // Override AbstractExecutorService methods
559    
560 dl 1.37 /**
561     * @throws RejectedExecutionException {@inheritDoc}
562     * @throws NullPointerException {@inheritDoc}
563     */
564 dl 1.7 public Future<?> submit(Runnable task) {
565     return schedule(task, 0, TimeUnit.NANOSECONDS);
566     }
567    
568 dl 1.37 /**
569     * @throws RejectedExecutionException {@inheritDoc}
570     * @throws NullPointerException {@inheritDoc}
571     */
572 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
573 jsr166 1.21 return schedule(Executors.callable(task, result),
574 dl 1.13 0, TimeUnit.NANOSECONDS);
575 dl 1.7 }
576    
577 dl 1.37 /**
578     * @throws RejectedExecutionException {@inheritDoc}
579     * @throws NullPointerException {@inheritDoc}
580     */
581 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
582     return schedule(task, 0, TimeUnit.NANOSECONDS);
583     }
584 dl 1.1
585     /**
586 dl 1.37 * Sets the policy on whether to continue executing existing
587 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
588     * In this case, these tasks will only terminate upon
589     * {@code shutdownNow} or after setting the policy to
590     * {@code false} when already shutdown.
591     * This value is by default {@code false}.
592 jsr166 1.30 *
593 jsr166 1.39 * @param value if {@code true}, continue after shutdown, else don't.
594 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
595 dl 1.1 */
596     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
597     continueExistingPeriodicTasksAfterShutdown = value;
598 jsr166 1.39 if (!value && isShutdown())
599 dl 1.37 onShutdown();
600 dl 1.1 }
601    
602     /**
603 jsr166 1.21 * Gets the policy on whether to continue executing existing
604 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
605     * In this case, these tasks will only terminate upon
606     * {@code shutdownNow} or after setting the policy to
607     * {@code false} when already shutdown.
608     * This value is by default {@code false}.
609 jsr166 1.30 *
610 jsr166 1.39 * @return {@code true} if will continue after shutdown
611 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
612 dl 1.1 */
613     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
614     return continueExistingPeriodicTasksAfterShutdown;
615     }
616    
617     /**
618 jsr166 1.21 * Sets the policy on whether to execute existing delayed
619 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
620     * In this case, these tasks will only terminate upon
621     * {@code shutdownNow}, or after setting the policy to
622     * {@code false} when already shutdown.
623     * This value is by default {@code true}.
624 jsr166 1.30 *
625 jsr166 1.39 * @param value if {@code true}, execute after shutdown, else don't.
626 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
627 dl 1.1 */
628     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
629     executeExistingDelayedTasksAfterShutdown = value;
630 jsr166 1.39 if (!value && isShutdown())
631 dl 1.37 onShutdown();
632 dl 1.1 }
633    
634     /**
635 jsr166 1.21 * Gets the policy on whether to execute existing delayed
636 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
637     * In this case, these tasks will only terminate upon
638     * {@code shutdownNow}, or after setting the policy to
639     * {@code false} when already shutdown.
640     * This value is by default {@code true}.
641 jsr166 1.30 *
642 jsr166 1.39 * @return {@code true} if will execute after shutdown
643 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
644 dl 1.1 */
645     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
646     return executeExistingDelayedTasksAfterShutdown;
647     }
648    
649     /**
650 jsr166 1.42 * Sets the policy on whether cancellation of a task should remove
651     * it from the work queue. This value is by default {@code false}.
652 dl 1.41 *
653 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
654 dl 1.41 * @see #getRemoveOnCancelPolicy
655 jsr166 1.43 * @since 1.7
656 dl 1.41 */
657     public void setRemoveOnCancelPolicy(boolean value) {
658     removeOnCancel = value;
659     }
660    
661     /**
662 jsr166 1.42 * Gets the policy on whether cancellation of a task should remove
663     * it from the work queue. This value is by default {@code false}.
664 dl 1.41 *
665 jsr166 1.42 * @return {@code true} if cancelled tasks are removed from the queue
666 dl 1.41 * @see #setRemoveOnCancelPolicy
667 jsr166 1.43 * @since 1.7
668 dl 1.41 */
669     public boolean getRemoveOnCancelPolicy() {
670     return removeOnCancel;
671     }
672    
673     /**
674 dl 1.1 * Initiates an orderly shutdown in which previously submitted
675 jsr166 1.39 * tasks are executed, but no new tasks will be accepted. If the
676     * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
677     * been set {@code false}, existing delayed tasks whose delays
678     * have not yet elapsed are cancelled. And unless the
679     * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
680     * been set {@code true}, future executions of existing periodic
681 dl 1.1 * tasks will be cancelled.
682 jsr166 1.39 *
683     * @throws SecurityException {@inheritDoc}
684 dl 1.1 */
685     public void shutdown() {
686     super.shutdown();
687     }
688    
689     /**
690     * Attempts to stop all actively executing tasks, halts the
691 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
692     * that were awaiting execution.
693 jsr166 1.21 *
694 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
695 dl 1.18 * processing actively executing tasks. This implementation
696 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
697     * fails to respond to interrupts may never terminate.
698 dl 1.1 *
699 jsr166 1.39 * @return list of tasks that never commenced execution.
700     * Each element of this list is a {@link ScheduledFuture},
701     * including those tasks submitted using {@code execute},
702     * which are for scheduling purposes used as the basis of a
703     * zero-delay {@code ScheduledFuture}.
704 jsr166 1.31 * @throws SecurityException {@inheritDoc}
705 dl 1.1 */
706 tim 1.4 public List<Runnable> shutdownNow() {
707 dl 1.1 return super.shutdownNow();
708     }
709    
710     /**
711     * Returns the task queue used by this executor. Each element of
712     * this queue is a {@link ScheduledFuture}, including those
713 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
714 dl 1.1 * purposes used as the basis of a zero-delay
715 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
716 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
717 dl 1.1 * which they will execute.
718     *
719     * @return the task queue
720     */
721     public BlockingQueue<Runnable> getQueue() {
722     return super.getQueue();
723     }
724    
725 dl 1.13 /**
726 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
727     * class must be declared as a BlockingQueue<Runnable> even though
728 jsr166 1.42 * it can only hold RunnableScheduledFutures.
729 jsr166 1.21 */
730 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
731 dl 1.13 implements BlockingQueue<Runnable> {
732 jsr166 1.21
733 dl 1.40 /*
734     * A DelayedWorkQueue is based on a heap-based data structure
735     * like those in DelayQueue and PriorityQueue, except that
736     * every ScheduledFutureTask also records its index into the
737     * heap array. This eliminates the need to find a task upon
738     * cancellation, greatly speeding up removal (down from O(n)
739     * to O(log n)), and reducing garbage retention that would
740     * otherwise occur by waiting for the element to rise to top
741     * before clearing. But because the queue may also hold
742     * RunnableScheduledFutures that are not ScheduledFutureTasks,
743     * we are not guaranteed to have such indices available, in
744     * which case we fall back to linear search. (We expect that
745     * most tasks will not be decorated, and that the faster cases
746     * will be much more common.)
747     *
748     * All heap operations must record index changes -- mainly
749     * within siftUp and siftDown. Upon removal, a task's
750     * heapIndex is set to -1. Note that ScheduledFutureTasks can
751     * appear at most once in the queue (this need not be true for
752     * other kinds of tasks or work queues), so are uniquely
753     * identified by heapIndex.
754     */
755    
756     private static final int INITIAL_CAPACITY = 64;
757     private transient RunnableScheduledFuture[] queue =
758     new RunnableScheduledFuture[INITIAL_CAPACITY];
759     private transient final ReentrantLock lock = new ReentrantLock();
760     private transient final Condition available = lock.newCondition();
761     private int size = 0;
762    
763    
764     /**
765 jsr166 1.42 * Set f's heapIndex if it is a ScheduledFutureTask.
766 dl 1.40 */
767 jsr166 1.45 private void setIndex(RunnableScheduledFuture f, int idx) {
768 dl 1.40 if (f instanceof ScheduledFutureTask)
769     ((ScheduledFutureTask)f).heapIndex = idx;
770     }
771    
772     /**
773 jsr166 1.42 * Sift element added at bottom up to its heap-ordered spot.
774 dl 1.40 * Call only when holding lock.
775     */
776     private void siftUp(int k, RunnableScheduledFuture key) {
777     while (k > 0) {
778     int parent = (k - 1) >>> 1;
779     RunnableScheduledFuture e = queue[parent];
780     if (key.compareTo(e) >= 0)
781     break;
782     queue[k] = e;
783     setIndex(e, k);
784     k = parent;
785     }
786     queue[k] = key;
787     setIndex(key, k);
788     }
789    
790     /**
791 jsr166 1.42 * Sift element added at top down to its heap-ordered spot.
792 dl 1.40 * Call only when holding lock.
793     */
794     private void siftDown(int k, RunnableScheduledFuture key) {
795 jsr166 1.42 int half = size >>> 1;
796 dl 1.40 while (k < half) {
797 jsr166 1.42 int child = (k << 1) + 1;
798 dl 1.40 RunnableScheduledFuture c = queue[child];
799     int right = child + 1;
800     if (right < size && c.compareTo(queue[right]) > 0)
801     c = queue[child = right];
802     if (key.compareTo(c) <= 0)
803     break;
804     queue[k] = c;
805     setIndex(c, k);
806     k = child;
807     }
808     queue[k] = key;
809     setIndex(key, k);
810     }
811    
812     /**
813     * Performs common bookkeeping for poll and take: Replaces
814     * first element with last; sifts it down, and signals any
815     * waiting consumers. Call only when holding lock.
816     * @param f the task to remove and return
817     */
818     private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
819     int s = --size;
820     RunnableScheduledFuture x = queue[s];
821     queue[s] = null;
822     if (s != 0) {
823     siftDown(0, x);
824     available.signalAll();
825     }
826     setIndex(f, -1);
827     return f;
828     }
829    
830     /**
831     * Resize the heap array. Call only when holding lock.
832     */
833     private void grow() {
834     int oldCapacity = queue.length;
835     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
836     if (newCapacity < 0) // overflow
837     newCapacity = Integer.MAX_VALUE;
838     queue = Arrays.copyOf(queue, newCapacity);
839     }
840    
841     /**
842     * Find index of given object, or -1 if absent
843     */
844     private int indexOf(Object x) {
845     if (x != null) {
846 jsr166 1.45 if (x instanceof ScheduledFutureTask) {
847     int i = ((ScheduledFutureTask) x).heapIndex;
848     // Sanity check; x could conceivably be a
849     // ScheduledFutureTask from some other pool.
850     if (i >= 0 && i < size && queue[i] == x)
851     return i;
852     } else {
853     for (int i = 0; i < size; i++)
854     if (x.equals(queue[i]))
855     return i;
856     }
857 dl 1.40 }
858     return -1;
859     }
860    
861 jsr166 1.45 public boolean contains(Object x) {
862     final ReentrantLock lock = this.lock;
863     lock.lock();
864     try {
865     return indexOf(x) != -1;
866     } finally {
867     lock.unlock();
868     }
869     }
870    
871 dl 1.40 public boolean remove(Object x) {
872     final ReentrantLock lock = this.lock;
873     lock.lock();
874     try {
875 jsr166 1.45 int i = indexOf(x);
876     if (i < 0)
877     return false;
878    
879     setIndex(queue[i], -1);
880     int s = --size;
881     RunnableScheduledFuture replacement = queue[s];
882     queue[s] = null;
883     if (s != i) {
884     siftDown(i, replacement);
885     if (queue[i] == replacement)
886     siftUp(i, replacement);
887     }
888     return true;
889 dl 1.40 } finally {
890     lock.unlock();
891     }
892     }
893    
894     public int size() {
895     final ReentrantLock lock = this.lock;
896     lock.lock();
897     try {
898 jsr166 1.45 return size;
899 dl 1.40 } finally {
900     lock.unlock();
901     }
902     }
903    
904 jsr166 1.42 public boolean isEmpty() {
905     return size() == 0;
906 dl 1.40 }
907    
908     public int remainingCapacity() {
909     return Integer.MAX_VALUE;
910     }
911    
912     public RunnableScheduledFuture peek() {
913     final ReentrantLock lock = this.lock;
914     lock.lock();
915     try {
916     return queue[0];
917     } finally {
918     lock.unlock();
919     }
920 dl 1.13 }
921    
922 dl 1.40 public boolean offer(Runnable x) {
923     if (x == null)
924     throw new NullPointerException();
925     RunnableScheduledFuture e = (RunnableScheduledFuture)x;
926     final ReentrantLock lock = this.lock;
927     lock.lock();
928     try {
929     int i = size;
930     if (i >= queue.length)
931     grow();
932     size = i + 1;
933     if (i == 0) {
934     queue[0] = e;
935     setIndex(e, 0);
936 jsr166 1.45 } else {
937 dl 1.40 siftUp(i, e);
938     }
939 jsr166 1.45 if (queue[0] == e)
940 dl 1.40 available.signalAll();
941     } finally {
942     lock.unlock();
943     }
944     return true;
945 jsr166 1.27 }
946 dl 1.40
947     public void put(Runnable e) {
948     offer(e);
949     }
950    
951     public boolean add(Runnable e) {
952     return offer(e);
953 jsr166 1.27 }
954 dl 1.40
955     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
956     return offer(e);
957     }
958 jsr166 1.42
959 dl 1.40 public RunnableScheduledFuture poll() {
960     final ReentrantLock lock = this.lock;
961     lock.lock();
962     try {
963     RunnableScheduledFuture first = queue[0];
964     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
965     return null;
966 jsr166 1.42 else
967 dl 1.40 return finishPoll(first);
968     } finally {
969     lock.unlock();
970     }
971     }
972    
973     public RunnableScheduledFuture take() throws InterruptedException {
974     final ReentrantLock lock = this.lock;
975     lock.lockInterruptibly();
976     try {
977     for (;;) {
978     RunnableScheduledFuture first = queue[0];
979 jsr166 1.42 if (first == null)
980 dl 1.40 available.await();
981     else {
982 jsr166 1.45 long delay = first.getDelay(TimeUnit.NANOSECONDS);
983 dl 1.40 if (delay > 0)
984     available.awaitNanos(delay);
985 jsr166 1.42 else
986 dl 1.40 return finishPoll(first);
987     }
988     }
989     } finally {
990     lock.unlock();
991     }
992     }
993    
994 jsr166 1.42 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
995 dl 1.40 throws InterruptedException {
996     long nanos = unit.toNanos(timeout);
997     final ReentrantLock lock = this.lock;
998     lock.lockInterruptibly();
999     try {
1000     for (;;) {
1001     RunnableScheduledFuture first = queue[0];
1002     if (first == null) {
1003     if (nanos <= 0)
1004     return null;
1005     else
1006     nanos = available.awaitNanos(nanos);
1007     } else {
1008     long delay = first.getDelay(TimeUnit.NANOSECONDS);
1009     if (delay > 0) {
1010     if (nanos <= 0)
1011     return null;
1012     if (delay > nanos)
1013     delay = nanos;
1014     long timeLeft = available.awaitNanos(delay);
1015     nanos -= delay - timeLeft;
1016 jsr166 1.42 } else
1017 dl 1.40 return finishPoll(first);
1018     }
1019     }
1020     } finally {
1021     lock.unlock();
1022     }
1023     }
1024    
1025     public void clear() {
1026     final ReentrantLock lock = this.lock;
1027     lock.lock();
1028     try {
1029     for (int i = 0; i < size; i++) {
1030     RunnableScheduledFuture t = queue[i];
1031     if (t != null) {
1032     queue[i] = null;
1033     setIndex(t, -1);
1034     }
1035     }
1036     size = 0;
1037     } finally {
1038     lock.unlock();
1039     }
1040 dl 1.13 }
1041 dl 1.40
1042     /**
1043     * Return and remove first element only if it is expired.
1044     * Used only by drainTo. Call only when holding lock.
1045     */
1046     private RunnableScheduledFuture pollExpired() {
1047     RunnableScheduledFuture first = queue[0];
1048 jsr166 1.42 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1049 dl 1.40 return null;
1050     setIndex(first, -1);
1051     int s = --size;
1052     RunnableScheduledFuture x = queue[s];
1053     queue[s] = null;
1054     if (s != 0)
1055     siftDown(0, x);
1056     return first;
1057     }
1058    
1059     public int drainTo(Collection<? super Runnable> c) {
1060     if (c == null)
1061     throw new NullPointerException();
1062     if (c == this)
1063     throw new IllegalArgumentException();
1064     final ReentrantLock lock = this.lock;
1065     lock.lock();
1066     try {
1067 jsr166 1.45 RunnableScheduledFuture first;
1068 dl 1.40 int n = 0;
1069 jsr166 1.45 while ((first = pollExpired()) != null) {
1070     c.add(first);
1071     ++n;
1072     }
1073 dl 1.40 return n;
1074     } finally {
1075     lock.unlock();
1076     }
1077 dl 1.13 }
1078    
1079 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1080 dl 1.40 if (c == null)
1081     throw new NullPointerException();
1082     if (c == this)
1083     throw new IllegalArgumentException();
1084     if (maxElements <= 0)
1085     return 0;
1086     final ReentrantLock lock = this.lock;
1087     lock.lock();
1088     try {
1089 jsr166 1.45 RunnableScheduledFuture first;
1090 dl 1.40 int n = 0;
1091 jsr166 1.45 while (n < maxElements && (first = pollExpired()) != null) {
1092     c.add(first);
1093     ++n;
1094     }
1095 dl 1.40 return n;
1096     } finally {
1097     lock.unlock();
1098     }
1099     }
1100    
1101     public Object[] toArray() {
1102     final ReentrantLock lock = this.lock;
1103     lock.lock();
1104     try {
1105 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1106 dl 1.40 } finally {
1107     lock.unlock();
1108     }
1109     }
1110    
1111 jsr166 1.45 @SuppressWarnings("unchecked")
1112 dl 1.40 public <T> T[] toArray(T[] a) {
1113     final ReentrantLock lock = this.lock;
1114     lock.lock();
1115     try {
1116     if (a.length < size)
1117     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1118     System.arraycopy(queue, 0, a, 0, size);
1119     if (a.length > size)
1120     a[size] = null;
1121     return a;
1122     } finally {
1123     lock.unlock();
1124     }
1125 dl 1.13 }
1126    
1127 jsr166 1.21 public Iterator<Runnable> iterator() {
1128 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1129 dl 1.40 }
1130 jsr166 1.42
1131 dl 1.40 /**
1132     * Snapshot iterator that works off copy of underlying q array.
1133     */
1134     private class Itr implements Iterator<Runnable> {
1135 jsr166 1.45 final RunnableScheduledFuture[] array;
1136     int cursor = 0; // index of next element to return
1137     int lastRet = -1; // index of last element, or -1 if no such
1138 jsr166 1.42
1139 jsr166 1.45 Itr(RunnableScheduledFuture[] array) {
1140 dl 1.40 this.array = array;
1141     }
1142 jsr166 1.42
1143 dl 1.40 public boolean hasNext() {
1144     return cursor < array.length;
1145     }
1146 jsr166 1.42
1147 dl 1.40 public Runnable next() {
1148     if (cursor >= array.length)
1149     throw new NoSuchElementException();
1150     lastRet = cursor;
1151 jsr166 1.45 return array[cursor++];
1152 dl 1.40 }
1153 jsr166 1.42
1154 dl 1.40 public void remove() {
1155     if (lastRet < 0)
1156     throw new IllegalStateException();
1157     DelayedWorkQueue.this.remove(array[lastRet]);
1158     lastRet = -1;
1159     }
1160 dl 1.13 }
1161     }
1162 dl 1.1 }