ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.42
Committed: Mon Sep 10 21:16:06 2007 UTC (16 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.41: +29 -31 lines
Log Message:
tidying

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.atomic.*;
9 dl 1.40 import java.util.concurrent.locks.*;
10 dl 1.1 import java.util.*;
11    
12     /**
13 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
14     * commands to run after a given delay, or to execute
15     * periodically. This class is preferable to {@link java.util.Timer}
16     * when multiple worker threads are needed, or when the additional
17     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18     * this class extends) are required.
19 dl 1.1 *
20     * <p> Delayed tasks execute no sooner than they are enabled, but
21 dl 1.18 * without any real-time guarantees about when, after they are
22     * enabled, they will commence. Tasks scheduled for exactly the same
23     * execution time are enabled in first-in-first-out (FIFO) order of
24 dl 1.41 * submission. If {@link #setRemoveOnCancelPolicy} is set {@code true}
25     * cancelled tasks are automatically removed from the work queue.
26 dl 1.1 *
27     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28 dl 1.8 * of the inherited tuning methods are not useful for it. In
29     * particular, because it acts as a fixed-sized pool using
30 jsr166 1.39 * {@code corePoolSize} threads and an unbounded queue, adjustments
31     * to {@code maximumPoolSize} have no useful effect. Additionally, it
32     * is almost never a good idea to set {@code corePoolSize} to zero or
33     * use {@code allowCoreThreadTimeOut} because this may leave the pool
34 dl 1.37 * without threads to handle tasks once they become eligible to run.
35 dl 1.1 *
36 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
37     * {@link ThreadPoolExecutor#execute execute} and
38     * {@link AbstractExecutorService#submit(Runnable) submit}
39     * methods to generate internal {@link ScheduledFuture} objects to
40     * control per-task delays and scheduling. To preserve
41     * functionality, any further overrides of these methods in
42 dl 1.32 * subclasses must invoke superclass versions, which effectively
43 jsr166 1.39 * disables additional task customization. However, this class
44 dl 1.32 * provides alternative protected extension method
45 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
46     * {@code Callable}) that can be used to customize the concrete task
47     * types used to execute commands entered via {@code execute},
48     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
49     * and {@code scheduleWithFixedDelay}. By default, a
50     * {@code ScheduledThreadPoolExecutor} uses a task type extending
51 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
52     * subclasses of the form:
53     *
54 jsr166 1.39 * <pre> {@code
55 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
56     *
57 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
58 dl 1.23 *
59 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
60     * Runnable r, RunnableScheduledFuture<V> task) {
61     * return new CustomTask<V>(r, task);
62 jsr166 1.29 * }
63 dl 1.23 *
64 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
65     * Callable<V> c, RunnableScheduledFuture<V> task) {
66     * return new CustomTask<V>(c, task);
67 jsr166 1.29 * }
68     * // ... add constructors, etc.
69 jsr166 1.39 * }}</pre>
70     *
71 dl 1.1 * @since 1.5
72     * @author Doug Lea
73     */
74 jsr166 1.21 public class ScheduledThreadPoolExecutor
75     extends ThreadPoolExecutor
76 tim 1.3 implements ScheduledExecutorService {
77 dl 1.1
78 dl 1.37 /*
79     * This class specializes ThreadPoolExecutor implementation by
80     *
81     * 1. Using a custom task type, ScheduledFutureTask for
82     * tasks, even those that don't require scheduling (i.e.,
83     * those submitted using ExecutorService execute, not
84     * ScheduledExecutorService methods) which are treated as
85     * delayed tasks with a delay of zero.
86     *
87     * 2. Using a custom queue (DelayedWorkQueue) based on an
88     * unbounded DelayQueue. The lack of capacity constraint and
89     * the fact that corePoolSize and maximumPoolSize are
90     * effectively identical simplifies some execution mechanics
91     * (see delayedExecute) compared to ThreadPoolExecutor
92     * version.
93     *
94     * The DelayedWorkQueue class is defined below for the sake of
95     * ensuring that all elements are instances of
96     * RunnableScheduledFuture. Since DelayQueue otherwise
97     * requires type be Delayed, but not necessarily Runnable, and
98     * the workQueue requires the opposite, we need to explicitly
99     * define a class that requires both to ensure that users don't
100     * add objects that aren't RunnableScheduledFutures via
101     * getQueue().add() etc.
102     *
103     * 3. Supporting optional run-after-shutdown parameters, which
104     * leads to overrides of shutdown methods to remove and cancel
105     * tasks that should NOT be run after shutdown, as well as
106     * different recheck logic when task (re)submission overlaps
107     * with a shutdown.
108     *
109     * 4. Task decoration methods to allow interception and
110     * instrumentation, which are needed because subclasses cannot
111     * otherwise override submit methods to get this effect. These
112     * don't have any impact on pool control logic though.
113     */
114    
115 dl 1.1 /**
116     * False if should cancel/suppress periodic tasks on shutdown.
117     */
118     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
119    
120     /**
121     * False if should cancel non-periodic tasks on shutdown.
122     */
123     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
124    
125     /**
126 dl 1.41 * True if ScheduledFutureTask.cancel should remove from queue
127     */
128     private volatile boolean removeOnCancel = false;
129    
130     /**
131 dl 1.1 * Sequence number to break scheduling ties, and in turn to
132     * guarantee FIFO order among tied entries.
133     */
134     private static final AtomicLong sequencer = new AtomicLong(0);
135 dl 1.14
136     /**
137 jsr166 1.39 * Returns current nanosecond time.
138 dl 1.14 */
139 dl 1.17 final long now() {
140 jsr166 1.39 return System.nanoTime();
141 dl 1.14 }
142    
143 jsr166 1.21 private class ScheduledFutureTask<V>
144 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
145 jsr166 1.21
146 dl 1.1 /** Sequence number to break ties FIFO */
147     private final long sequenceNumber;
148     /** The time the task is enabled to execute in nanoTime units */
149     private long time;
150 dl 1.16 /**
151     * Period in nanoseconds for repeating tasks. A positive
152     * value indicates fixed-rate execution. A negative value
153     * indicates fixed-delay execution. A value of 0 indicates a
154     * non-repeating task.
155     */
156 dl 1.1 private final long period;
157    
158     /**
159 dl 1.40 * Index into delay queue, to support faster cancellation.
160     */
161     int heapIndex;
162    
163     /**
164 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
165 dl 1.1 */
166     ScheduledFutureTask(Runnable r, V result, long ns) {
167     super(r, result);
168     this.time = ns;
169     this.period = 0;
170     this.sequenceNumber = sequencer.getAndIncrement();
171     }
172    
173     /**
174 jsr166 1.30 * Creates a periodic action with given nano time and period.
175 dl 1.1 */
176 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
177 dl 1.1 super(r, result);
178     this.time = ns;
179     this.period = period;
180     this.sequenceNumber = sequencer.getAndIncrement();
181     }
182    
183     /**
184 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger.
185 dl 1.1 */
186     ScheduledFutureTask(Callable<V> callable, long ns) {
187     super(callable);
188     this.time = ns;
189     this.period = 0;
190     this.sequenceNumber = sequencer.getAndIncrement();
191     }
192    
193     public long getDelay(TimeUnit unit) {
194 dl 1.14 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
195 dl 1.1 return d;
196     }
197    
198 dl 1.20 public int compareTo(Delayed other) {
199 dl 1.1 if (other == this) // compare zero ONLY if same object
200     return 0;
201 dl 1.34 if (other instanceof ScheduledFutureTask) {
202     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
203     long diff = time - x.time;
204     if (diff < 0)
205     return -1;
206     else if (diff > 0)
207     return 1;
208     else if (sequenceNumber < x.sequenceNumber)
209     return -1;
210     else
211     return 1;
212     }
213     long d = (getDelay(TimeUnit.NANOSECONDS) -
214     other.getDelay(TimeUnit.NANOSECONDS));
215 jsr166 1.38 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
216 dl 1.1 }
217    
218     /**
219 dl 1.18 * Returns true if this is a periodic (not a one-shot) action.
220 jsr166 1.30 *
221 dl 1.1 * @return true if periodic
222     */
223 dl 1.23 public boolean isPeriodic() {
224 dl 1.16 return period != 0;
225 dl 1.1 }
226    
227     /**
228 jsr166 1.39 * Sets the next time to run for a periodic task.
229 dl 1.13 */
230 dl 1.37 private void setNextRunTime() {
231     long p = period;
232     if (p > 0)
233     time += p;
234     else
235     time = now() - p;
236 dl 1.13 }
237    
238 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
239 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
240     if (cancelled && removeOnCancel && heapIndex >= 0)
241 jsr166 1.42 remove(this);
242 dl 1.41 return cancelled;
243 dl 1.40 }
244    
245 dl 1.13 /**
246 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
247 jsr166 1.21 */
248 dl 1.1 public void run() {
249 dl 1.37 boolean periodic = isPeriodic();
250     if (!canRunInCurrentRunState(periodic))
251     cancel(false);
252     else if (!periodic)
253 dl 1.5 ScheduledFutureTask.super.run();
254 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
255     setNextRunTime();
256     reExecutePeriodic(this);
257     }
258 dl 1.1 }
259     }
260    
261     /**
262 dl 1.37 * Returns true if can run a task given current run state
263 jsr166 1.39 * and run-after-shutdown parameters.
264     *
265 dl 1.37 * @param periodic true if this task periodic, false if delayed
266     */
267     boolean canRunInCurrentRunState(boolean periodic) {
268 jsr166 1.38 return isRunningOrShutdown(periodic ?
269 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
270     executeExistingDelayedTasksAfterShutdown);
271     }
272    
273     /**
274     * Main execution method for delayed or periodic tasks. If pool
275     * is shut down, rejects the task. Otherwise adds task to queue
276     * and starts a thread, if necessary, to run it. (We cannot
277     * prestart the thread to run the task because the task (probably)
278     * shouldn't be run yet,) If the pool is shut down while the task
279     * is being added, cancel and remove it if required by state and
280 jsr166 1.39 * run-after-shutdown parameters.
281     *
282 dl 1.37 * @param task the task
283     */
284     private void delayedExecute(RunnableScheduledFuture<?> task) {
285     if (isShutdown())
286     reject(task);
287     else {
288     super.getQueue().add(task);
289     if (isShutdown() &&
290     !canRunInCurrentRunState(task.isPeriodic()) &&
291     remove(task))
292     task.cancel(false);
293 jsr166 1.39 else
294     prestartCoreThread();
295 dl 1.37 }
296     }
297 jsr166 1.21
298 dl 1.37 /**
299 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
300     * Same idea as delayedExecute except drops task rather than rejecting.
301     *
302 dl 1.37 * @param task the task
303     */
304     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
305     if (canRunInCurrentRunState(true)) {
306     super.getQueue().add(task);
307     if (!canRunInCurrentRunState(true) && remove(task))
308     task.cancel(false);
309 jsr166 1.39 else
310     prestartCoreThread();
311 dl 1.37 }
312 dl 1.13 }
313 dl 1.1
314 dl 1.13 /**
315 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
316 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
317 dl 1.13 */
318 dl 1.37 @Override void onShutdown() {
319     BlockingQueue<Runnable> q = super.getQueue();
320     boolean keepDelayed =
321     getExecuteExistingDelayedTasksAfterShutdownPolicy();
322     boolean keepPeriodic =
323     getContinueExistingPeriodicTasksAfterShutdownPolicy();
324 jsr166 1.21 if (!keepDelayed && !keepPeriodic)
325 dl 1.37 q.clear();
326     else {
327     // Traverse snapshot to avoid iterator exceptions
328 jsr166 1.39 for (Object e : q.toArray()) {
329 dl 1.23 if (e instanceof RunnableScheduledFuture) {
330 dl 1.37 RunnableScheduledFuture<?> t =
331     (RunnableScheduledFuture<?>)e;
332 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
333     t.isCancelled()) { // also remove if already cancelled
334     if (q.remove(t))
335     t.cancel(false);
336     }
337 dl 1.13 }
338     }
339 dl 1.1 }
340 jsr166 1.39 tryTerminate();
341 dl 1.1 }
342    
343 dl 1.23 /**
344 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
345 jsr166 1.28 * This method can be used to override the concrete
346 dl 1.23 * class used for managing internal tasks.
347 jsr166 1.30 * The default implementation simply returns the given task.
348 jsr166 1.28 *
349 dl 1.23 * @param runnable the submitted Runnable
350     * @param task the task created to execute the runnable
351     * @return a task that can execute the runnable
352     * @since 1.6
353     */
354 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
355 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
356     return task;
357 peierls 1.22 }
358    
359 dl 1.23 /**
360 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
361 jsr166 1.28 * This method can be used to override the concrete
362 dl 1.23 * class used for managing internal tasks.
363 jsr166 1.30 * The default implementation simply returns the given task.
364 jsr166 1.28 *
365 dl 1.23 * @param callable the submitted Callable
366     * @param task the task created to execute the callable
367     * @return a task that can execute the callable
368     * @since 1.6
369     */
370 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
371 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
372     return task;
373 dl 1.19 }
374    
375 dl 1.1 /**
376 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
377     * given core pool size.
378 jsr166 1.21 *
379 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
380     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
381     * @throws IllegalArgumentException if {@code corePoolSize < 0}
382 dl 1.1 */
383     public ScheduledThreadPoolExecutor(int corePoolSize) {
384     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
385     new DelayedWorkQueue());
386     }
387    
388     /**
389 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
390     * given initial parameters.
391 jsr166 1.21 *
392 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
393     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
394 dl 1.1 * @param threadFactory the factory to use when the executor
395 jsr166 1.39 * creates a new thread
396     * @throws IllegalArgumentException if {@code corePoolSize < 0}
397     * @throws NullPointerException if {@code threadFactory} is null
398 dl 1.1 */
399     public ScheduledThreadPoolExecutor(int corePoolSize,
400     ThreadFactory threadFactory) {
401     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
402     new DelayedWorkQueue(), threadFactory);
403     }
404    
405     /**
406 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
407     * initial parameters.
408 jsr166 1.21 *
409 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
410     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
411 dl 1.1 * @param handler the handler to use when execution is blocked
412 jsr166 1.39 * because the thread bounds and queue capacities are reached
413     * @throws IllegalArgumentException if {@code corePoolSize < 0}
414     * @throws NullPointerException if {@code handler} is null
415 dl 1.1 */
416     public ScheduledThreadPoolExecutor(int corePoolSize,
417     RejectedExecutionHandler handler) {
418     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
419     new DelayedWorkQueue(), handler);
420     }
421    
422     /**
423 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
424     * initial parameters.
425 jsr166 1.21 *
426 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
427     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
428 dl 1.1 * @param threadFactory the factory to use when the executor
429 jsr166 1.39 * creates a new thread
430 dl 1.1 * @param handler the handler to use when execution is blocked
431 jsr166 1.39 * because the thread bounds and queue capacities are reached
432     * @throws IllegalArgumentException if {@code corePoolSize < 0}
433     * @throws NullPointerException if {@code threadFactory} or
434     * {@code handler} is null
435 dl 1.1 */
436     public ScheduledThreadPoolExecutor(int corePoolSize,
437     ThreadFactory threadFactory,
438     RejectedExecutionHandler handler) {
439     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
440     new DelayedWorkQueue(), threadFactory, handler);
441     }
442    
443 dl 1.37 /**
444     * @throws RejectedExecutionException {@inheritDoc}
445     * @throws NullPointerException {@inheritDoc}
446     */
447 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
448     long delay,
449 dl 1.13 TimeUnit unit) {
450 dl 1.9 if (command == null || unit == null)
451 dl 1.1 throw new NullPointerException();
452 dl 1.37 if (delay < 0) delay = 0;
453 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
454 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
455 jsr166 1.42 new ScheduledFutureTask<Void>(command, null, triggerTime));
456 dl 1.1 delayedExecute(t);
457     return t;
458     }
459    
460 dl 1.37 /**
461     * @throws RejectedExecutionException {@inheritDoc}
462     * @throws NullPointerException {@inheritDoc}
463     */
464 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
465     long delay,
466 dl 1.13 TimeUnit unit) {
467 dl 1.9 if (callable == null || unit == null)
468 dl 1.1 throw new NullPointerException();
469 dl 1.16 if (delay < 0) delay = 0;
470 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
471 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
472     new ScheduledFutureTask<V>(callable, triggerTime));
473 dl 1.1 delayedExecute(t);
474     return t;
475     }
476    
477 dl 1.37 /**
478     * @throws RejectedExecutionException {@inheritDoc}
479     * @throws NullPointerException {@inheritDoc}
480     * @throws IllegalArgumentException {@inheritDoc}
481     */
482 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
483     long initialDelay,
484     long period,
485 dl 1.13 TimeUnit unit) {
486 dl 1.9 if (command == null || unit == null)
487 dl 1.1 throw new NullPointerException();
488     if (period <= 0)
489     throw new IllegalArgumentException();
490 dl 1.16 if (initialDelay < 0) initialDelay = 0;
491 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
492 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
493 jsr166 1.21 new ScheduledFutureTask<Object>(command,
494 dl 1.13 null,
495     triggerTime,
496 peierls 1.22 unit.toNanos(period)));
497 dl 1.1 delayedExecute(t);
498     return t;
499     }
500 jsr166 1.21
501 dl 1.37 /**
502     * @throws RejectedExecutionException {@inheritDoc}
503     * @throws NullPointerException {@inheritDoc}
504     * @throws IllegalArgumentException {@inheritDoc}
505     */
506 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
507     long initialDelay,
508     long delay,
509 dl 1.13 TimeUnit unit) {
510 dl 1.9 if (command == null || unit == null)
511 dl 1.1 throw new NullPointerException();
512     if (delay <= 0)
513     throw new IllegalArgumentException();
514 dl 1.16 if (initialDelay < 0) initialDelay = 0;
515 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
516 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
517 jsr166 1.42 new ScheduledFutureTask<Void>(command,
518 dl 1.13 null,
519     triggerTime,
520 peierls 1.22 unit.toNanos(-delay)));
521 dl 1.1 delayedExecute(t);
522     return t;
523     }
524 jsr166 1.21
525 dl 1.1 /**
526 jsr166 1.39 * Executes {@code command} with zero required delay.
527     * This has effect equivalent to
528     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
529     * Note that inspections of the queue and of the list returned by
530     * {@code shutdownNow} will access the zero-delayed
531     * {@link ScheduledFuture}, not the {@code command} itself.
532     *
533     * <p>A consequence of the use of {@code ScheduledFuture} objects is
534     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
535     * called with a null second {@code Throwable} argument, even if the
536     * {@code command} terminated abruptly. Instead, the {@code Throwable}
537     * thrown by such a task can be obtained via {@link Future#get}.
538 dl 1.1 *
539     * @throws RejectedExecutionException at discretion of
540 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
541     * cannot be accepted for execution because the
542     * executor has been shut down
543     * @throws NullPointerException {@inheritDoc}
544 dl 1.1 */
545     public void execute(Runnable command) {
546     schedule(command, 0, TimeUnit.NANOSECONDS);
547     }
548    
549 dl 1.13 // Override AbstractExecutorService methods
550    
551 dl 1.37 /**
552     * @throws RejectedExecutionException {@inheritDoc}
553     * @throws NullPointerException {@inheritDoc}
554     */
555 dl 1.7 public Future<?> submit(Runnable task) {
556     return schedule(task, 0, TimeUnit.NANOSECONDS);
557     }
558    
559 dl 1.37 /**
560     * @throws RejectedExecutionException {@inheritDoc}
561     * @throws NullPointerException {@inheritDoc}
562     */
563 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
564 jsr166 1.21 return schedule(Executors.callable(task, result),
565 dl 1.13 0, TimeUnit.NANOSECONDS);
566 dl 1.7 }
567    
568 dl 1.37 /**
569     * @throws RejectedExecutionException {@inheritDoc}
570     * @throws NullPointerException {@inheritDoc}
571     */
572 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
573     return schedule(task, 0, TimeUnit.NANOSECONDS);
574     }
575 dl 1.1
576     /**
577 dl 1.37 * Sets the policy on whether to continue executing existing
578 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
579     * In this case, these tasks will only terminate upon
580     * {@code shutdownNow} or after setting the policy to
581     * {@code false} when already shutdown.
582     * This value is by default {@code false}.
583 jsr166 1.30 *
584 jsr166 1.39 * @param value if {@code true}, continue after shutdown, else don't.
585 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
586 dl 1.1 */
587     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
588     continueExistingPeriodicTasksAfterShutdown = value;
589 jsr166 1.39 if (!value && isShutdown())
590 dl 1.37 onShutdown();
591 dl 1.1 }
592    
593     /**
594 jsr166 1.21 * Gets the policy on whether to continue executing existing
595 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
596     * In this case, these tasks will only terminate upon
597     * {@code shutdownNow} or after setting the policy to
598     * {@code false} when already shutdown.
599     * This value is by default {@code false}.
600 jsr166 1.30 *
601 jsr166 1.39 * @return {@code true} if will continue after shutdown
602 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
603 dl 1.1 */
604     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
605     return continueExistingPeriodicTasksAfterShutdown;
606     }
607    
608     /**
609 jsr166 1.21 * Sets the policy on whether to execute existing delayed
610 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
611     * In this case, these tasks will only terminate upon
612     * {@code shutdownNow}, or after setting the policy to
613     * {@code false} when already shutdown.
614     * This value is by default {@code true}.
615 jsr166 1.30 *
616 jsr166 1.39 * @param value if {@code true}, execute after shutdown, else don't.
617 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
618 dl 1.1 */
619     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
620     executeExistingDelayedTasksAfterShutdown = value;
621 jsr166 1.39 if (!value && isShutdown())
622 dl 1.37 onShutdown();
623 dl 1.1 }
624    
625     /**
626 jsr166 1.21 * Gets the policy on whether to execute existing delayed
627 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
628     * In this case, these tasks will only terminate upon
629     * {@code shutdownNow}, or after setting the policy to
630     * {@code false} when already shutdown.
631     * This value is by default {@code true}.
632 jsr166 1.30 *
633 jsr166 1.39 * @return {@code true} if will execute after shutdown
634 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
635 dl 1.1 */
636     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
637     return executeExistingDelayedTasksAfterShutdown;
638     }
639    
640     /**
641 jsr166 1.42 * Sets the policy on whether cancellation of a task should remove
642     * it from the work queue. This value is by default {@code false}.
643 dl 1.41 *
644 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
645 dl 1.41 * @see #getRemoveOnCancelPolicy
646     */
647     public void setRemoveOnCancelPolicy(boolean value) {
648     removeOnCancel = value;
649     }
650    
651     /**
652 jsr166 1.42 * Gets the policy on whether cancellation of a task should remove
653     * it from the work queue. This value is by default {@code false}.
654 dl 1.41 *
655 jsr166 1.42 * @return {@code true} if cancelled tasks are removed from the queue
656 dl 1.41 * @see #setRemoveOnCancelPolicy
657     */
658     public boolean getRemoveOnCancelPolicy() {
659     return removeOnCancel;
660     }
661    
662     /**
663 dl 1.1 * Initiates an orderly shutdown in which previously submitted
664 jsr166 1.39 * tasks are executed, but no new tasks will be accepted. If the
665     * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
666     * been set {@code false}, existing delayed tasks whose delays
667     * have not yet elapsed are cancelled. And unless the
668     * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
669     * been set {@code true}, future executions of existing periodic
670 dl 1.1 * tasks will be cancelled.
671 jsr166 1.39 *
672     * @throws SecurityException {@inheritDoc}
673 dl 1.1 */
674     public void shutdown() {
675     super.shutdown();
676     }
677    
678     /**
679     * Attempts to stop all actively executing tasks, halts the
680 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
681     * that were awaiting execution.
682 jsr166 1.21 *
683 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
684 dl 1.18 * processing actively executing tasks. This implementation
685 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
686     * fails to respond to interrupts may never terminate.
687 dl 1.1 *
688 jsr166 1.39 * @return list of tasks that never commenced execution.
689     * Each element of this list is a {@link ScheduledFuture},
690     * including those tasks submitted using {@code execute},
691     * which are for scheduling purposes used as the basis of a
692     * zero-delay {@code ScheduledFuture}.
693 jsr166 1.31 * @throws SecurityException {@inheritDoc}
694 dl 1.1 */
695 tim 1.4 public List<Runnable> shutdownNow() {
696 dl 1.1 return super.shutdownNow();
697     }
698    
699     /**
700     * Returns the task queue used by this executor. Each element of
701     * this queue is a {@link ScheduledFuture}, including those
702 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
703 dl 1.1 * purposes used as the basis of a zero-delay
704 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
705 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
706 dl 1.1 * which they will execute.
707     *
708     * @return the task queue
709     */
710     public BlockingQueue<Runnable> getQueue() {
711     return super.getQueue();
712     }
713    
714 dl 1.13 /**
715 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
716     * class must be declared as a BlockingQueue<Runnable> even though
717 jsr166 1.42 * it can only hold RunnableScheduledFutures.
718 jsr166 1.21 */
719 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
720 dl 1.13 implements BlockingQueue<Runnable> {
721 jsr166 1.21
722 dl 1.40 /*
723     * A DelayedWorkQueue is based on a heap-based data structure
724     * like those in DelayQueue and PriorityQueue, except that
725     * every ScheduledFutureTask also records its index into the
726     * heap array. This eliminates the need to find a task upon
727     * cancellation, greatly speeding up removal (down from O(n)
728     * to O(log n)), and reducing garbage retention that would
729     * otherwise occur by waiting for the element to rise to top
730     * before clearing. But because the queue may also hold
731     * RunnableScheduledFutures that are not ScheduledFutureTasks,
732     * we are not guaranteed to have such indices available, in
733     * which case we fall back to linear search. (We expect that
734     * most tasks will not be decorated, and that the faster cases
735     * will be much more common.)
736     *
737     * All heap operations must record index changes -- mainly
738     * within siftUp and siftDown. Upon removal, a task's
739     * heapIndex is set to -1. Note that ScheduledFutureTasks can
740     * appear at most once in the queue (this need not be true for
741     * other kinds of tasks or work queues), so are uniquely
742     * identified by heapIndex.
743     */
744    
745     private static final int INITIAL_CAPACITY = 64;
746     private transient RunnableScheduledFuture[] queue =
747     new RunnableScheduledFuture[INITIAL_CAPACITY];
748     private transient final ReentrantLock lock = new ReentrantLock();
749     private transient final Condition available = lock.newCondition();
750     private int size = 0;
751    
752    
753     /**
754 jsr166 1.42 * Set f's heapIndex if it is a ScheduledFutureTask.
755 dl 1.40 */
756     private void setIndex(Object f, int idx) {
757     if (f instanceof ScheduledFutureTask)
758     ((ScheduledFutureTask)f).heapIndex = idx;
759     }
760    
761     /**
762 jsr166 1.42 * Sift element added at bottom up to its heap-ordered spot.
763 dl 1.40 * Call only when holding lock.
764     */
765     private void siftUp(int k, RunnableScheduledFuture key) {
766     while (k > 0) {
767     int parent = (k - 1) >>> 1;
768     RunnableScheduledFuture e = queue[parent];
769     if (key.compareTo(e) >= 0)
770     break;
771     queue[k] = e;
772     setIndex(e, k);
773     k = parent;
774     }
775     queue[k] = key;
776     setIndex(key, k);
777     }
778    
779     /**
780 jsr166 1.42 * Sift element added at top down to its heap-ordered spot.
781 dl 1.40 * Call only when holding lock.
782     */
783     private void siftDown(int k, RunnableScheduledFuture key) {
784 jsr166 1.42 int half = size >>> 1;
785 dl 1.40 while (k < half) {
786 jsr166 1.42 int child = (k << 1) + 1;
787 dl 1.40 RunnableScheduledFuture c = queue[child];
788     int right = child + 1;
789     if (right < size && c.compareTo(queue[right]) > 0)
790     c = queue[child = right];
791     if (key.compareTo(c) <= 0)
792     break;
793     queue[k] = c;
794     setIndex(c, k);
795     k = child;
796     }
797     queue[k] = key;
798     setIndex(key, k);
799     }
800    
801     /**
802     * Performs common bookkeeping for poll and take: Replaces
803     * first element with last; sifts it down, and signals any
804     * waiting consumers. Call only when holding lock.
805     * @param f the task to remove and return
806     */
807     private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
808     int s = --size;
809     RunnableScheduledFuture x = queue[s];
810     queue[s] = null;
811     if (s != 0) {
812     siftDown(0, x);
813     available.signalAll();
814     }
815     setIndex(f, -1);
816     return f;
817     }
818    
819     /**
820     * Resize the heap array. Call only when holding lock.
821     */
822     private void grow() {
823     int oldCapacity = queue.length;
824     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
825     if (newCapacity < 0) // overflow
826     newCapacity = Integer.MAX_VALUE;
827     queue = Arrays.copyOf(queue, newCapacity);
828     }
829    
830     /**
831     * Find index of given object, or -1 if absent
832     */
833     private int indexOf(Object x) {
834     if (x != null) {
835     for (int i = 0; i < size; i++)
836     if (x.equals(queue[i]))
837     return i;
838     }
839     return -1;
840     }
841    
842     public boolean remove(Object x) {
843     boolean removed;
844     final ReentrantLock lock = this.lock;
845     lock.lock();
846     try {
847     int i;
848     if (x instanceof ScheduledFutureTask)
849     i = ((ScheduledFutureTask)x).heapIndex;
850     else
851     i = indexOf(x);
852     if (removed = (i >= 0 && i < size && queue[i] == x)) {
853     setIndex(x, -1);
854     int s = --size;
855     RunnableScheduledFuture replacement = queue[s];
856     queue[s] = null;
857     if (s != i) {
858     siftDown(i, replacement);
859     if (queue[i] == replacement)
860     siftUp(i, replacement);
861     }
862     }
863     } finally {
864     lock.unlock();
865     }
866     return removed;
867     }
868    
869     public int size() {
870     int s;
871     final ReentrantLock lock = this.lock;
872     lock.lock();
873     try {
874     s = size;
875     } finally {
876     lock.unlock();
877     }
878     return s;
879     }
880    
881 jsr166 1.42 public boolean isEmpty() {
882     return size() == 0;
883 dl 1.40 }
884    
885     public int remainingCapacity() {
886     return Integer.MAX_VALUE;
887     }
888    
889     public RunnableScheduledFuture peek() {
890     final ReentrantLock lock = this.lock;
891     lock.lock();
892     try {
893     return queue[0];
894     } finally {
895     lock.unlock();
896     }
897 dl 1.13 }
898    
899 dl 1.40 public boolean offer(Runnable x) {
900     if (x == null)
901     throw new NullPointerException();
902     RunnableScheduledFuture e = (RunnableScheduledFuture)x;
903     final ReentrantLock lock = this.lock;
904     lock.lock();
905     try {
906     int i = size;
907     if (i >= queue.length)
908     grow();
909     size = i + 1;
910     boolean notify;
911     if (i == 0) {
912     notify = true;
913     queue[0] = e;
914     setIndex(e, 0);
915     }
916     else {
917     notify = e.compareTo(queue[0]) < 0;
918     siftUp(i, e);
919     }
920     if (notify)
921     available.signalAll();
922     } finally {
923     lock.unlock();
924     }
925     return true;
926 jsr166 1.27 }
927 dl 1.40
928     public void put(Runnable e) {
929     offer(e);
930     }
931    
932     public boolean add(Runnable e) {
933     return offer(e);
934 jsr166 1.27 }
935 dl 1.40
936     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
937     return offer(e);
938     }
939 jsr166 1.42
940 dl 1.40 public RunnableScheduledFuture poll() {
941     final ReentrantLock lock = this.lock;
942     lock.lock();
943     try {
944     RunnableScheduledFuture first = queue[0];
945     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
946     return null;
947 jsr166 1.42 else
948 dl 1.40 return finishPoll(first);
949     } finally {
950     lock.unlock();
951     }
952     }
953    
954     public RunnableScheduledFuture take() throws InterruptedException {
955     final ReentrantLock lock = this.lock;
956     lock.lockInterruptibly();
957     try {
958     for (;;) {
959     RunnableScheduledFuture first = queue[0];
960 jsr166 1.42 if (first == null)
961 dl 1.40 available.await();
962     else {
963     long delay = first.getDelay(TimeUnit.NANOSECONDS);
964     if (delay > 0)
965     available.awaitNanos(delay);
966 jsr166 1.42 else
967 dl 1.40 return finishPoll(first);
968     }
969     }
970     } finally {
971     lock.unlock();
972     }
973     }
974    
975 jsr166 1.42 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
976 dl 1.40 throws InterruptedException {
977     long nanos = unit.toNanos(timeout);
978     final ReentrantLock lock = this.lock;
979     lock.lockInterruptibly();
980     try {
981     for (;;) {
982     RunnableScheduledFuture first = queue[0];
983     if (first == null) {
984     if (nanos <= 0)
985     return null;
986     else
987     nanos = available.awaitNanos(nanos);
988     } else {
989     long delay = first.getDelay(TimeUnit.NANOSECONDS);
990     if (delay > 0) {
991     if (nanos <= 0)
992     return null;
993     if (delay > nanos)
994     delay = nanos;
995     long timeLeft = available.awaitNanos(delay);
996     nanos -= delay - timeLeft;
997 jsr166 1.42 } else
998 dl 1.40 return finishPoll(first);
999     }
1000     }
1001     } finally {
1002     lock.unlock();
1003     }
1004     }
1005    
1006     public void clear() {
1007     final ReentrantLock lock = this.lock;
1008     lock.lock();
1009     try {
1010     for (int i = 0; i < size; i++) {
1011     RunnableScheduledFuture t = queue[i];
1012     if (t != null) {
1013     queue[i] = null;
1014     setIndex(t, -1);
1015     }
1016     }
1017     size = 0;
1018     } finally {
1019     lock.unlock();
1020     }
1021 dl 1.13 }
1022 dl 1.40
1023     /**
1024     * Return and remove first element only if it is expired.
1025     * Used only by drainTo. Call only when holding lock.
1026     */
1027     private RunnableScheduledFuture pollExpired() {
1028     RunnableScheduledFuture first = queue[0];
1029 jsr166 1.42 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1030 dl 1.40 return null;
1031     setIndex(first, -1);
1032     int s = --size;
1033     RunnableScheduledFuture x = queue[s];
1034     queue[s] = null;
1035     if (s != 0)
1036     siftDown(0, x);
1037     return first;
1038     }
1039    
1040     public int drainTo(Collection<? super Runnable> c) {
1041     if (c == null)
1042     throw new NullPointerException();
1043     if (c == this)
1044     throw new IllegalArgumentException();
1045     final ReentrantLock lock = this.lock;
1046     lock.lock();
1047     try {
1048     int n = 0;
1049     for (;;) {
1050     RunnableScheduledFuture first = pollExpired();
1051     if (first != null) {
1052     c.add(first);
1053     ++n;
1054     }
1055     else
1056     break;
1057     }
1058     if (n > 0)
1059     available.signalAll();
1060     return n;
1061     } finally {
1062     lock.unlock();
1063     }
1064 dl 1.13 }
1065    
1066 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1067 dl 1.40 if (c == null)
1068     throw new NullPointerException();
1069     if (c == this)
1070     throw new IllegalArgumentException();
1071     if (maxElements <= 0)
1072     return 0;
1073     final ReentrantLock lock = this.lock;
1074     lock.lock();
1075     try {
1076     int n = 0;
1077     while (n < maxElements) {
1078     RunnableScheduledFuture first = pollExpired();
1079     if (first != null) {
1080     c.add(first);
1081     ++n;
1082     }
1083     else
1084     break;
1085     }
1086     if (n > 0)
1087     available.signalAll();
1088     return n;
1089     } finally {
1090     lock.unlock();
1091     }
1092     }
1093    
1094     public Object[] toArray() {
1095     final ReentrantLock lock = this.lock;
1096     lock.lock();
1097     try {
1098     return Arrays.copyOf(queue, size);
1099     } finally {
1100     lock.unlock();
1101     }
1102     }
1103    
1104     public <T> T[] toArray(T[] a) {
1105     final ReentrantLock lock = this.lock;
1106     lock.lock();
1107     try {
1108     if (a.length < size)
1109     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1110     System.arraycopy(queue, 0, a, 0, size);
1111     if (a.length > size)
1112     a[size] = null;
1113     return a;
1114     } finally {
1115     lock.unlock();
1116     }
1117 dl 1.13 }
1118    
1119 jsr166 1.21 public Iterator<Runnable> iterator() {
1120 dl 1.40 return new Itr(toArray());
1121     }
1122 jsr166 1.42
1123 dl 1.40 /**
1124     * Snapshot iterator that works off copy of underlying q array.
1125     */
1126     private class Itr implements Iterator<Runnable> {
1127     final Object[] array; // Array of all elements
1128     int cursor; // index of next element to return;
1129     int lastRet; // index of last element, or -1 if no such
1130 jsr166 1.42
1131 dl 1.40 Itr(Object[] array) {
1132     lastRet = -1;
1133     this.array = array;
1134     }
1135 jsr166 1.42
1136 dl 1.40 public boolean hasNext() {
1137     return cursor < array.length;
1138     }
1139 jsr166 1.42
1140 dl 1.40 public Runnable next() {
1141     if (cursor >= array.length)
1142     throw new NoSuchElementException();
1143     lastRet = cursor;
1144     return (Runnable)array[cursor++];
1145     }
1146 jsr166 1.42
1147 dl 1.40 public void remove() {
1148     if (lastRet < 0)
1149     throw new IllegalStateException();
1150     DelayedWorkQueue.this.remove(array[lastRet]);
1151     lastRet = -1;
1152     }
1153 dl 1.13 }
1154     }
1155 dl 1.1 }