ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.40
Committed: Sun Feb 18 23:16:35 2007 UTC (17 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.39: +439 -42 lines
Log Message:
Faster task cancellation and removal

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.atomic.*;
9 dl 1.40 import java.util.concurrent.locks.*;
10 dl 1.1 import java.util.*;
11    
12     /**
13 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
14     * commands to run after a given delay, or to execute
15     * periodically. This class is preferable to {@link java.util.Timer}
16     * when multiple worker threads are needed, or when the additional
17     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18     * this class extends) are required.
19 dl 1.1 *
20     * <p> Delayed tasks execute no sooner than they are enabled, but
21 dl 1.18 * without any real-time guarantees about when, after they are
22     * enabled, they will commence. Tasks scheduled for exactly the same
23     * execution time are enabled in first-in-first-out (FIFO) order of
24 dl 1.40 * submission. Cancelled tasks are automatically removed from the
25     * work queue.
26 dl 1.1 *
27     * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28 dl 1.8 * of the inherited tuning methods are not useful for it. In
29     * particular, because it acts as a fixed-sized pool using
30 jsr166 1.39 * {@code corePoolSize} threads and an unbounded queue, adjustments
31     * to {@code maximumPoolSize} have no useful effect. Additionally, it
32     * is almost never a good idea to set {@code corePoolSize} to zero or
33     * use {@code allowCoreThreadTimeOut} because this may leave the pool
34 dl 1.37 * without threads to handle tasks once they become eligible to run.
35 dl 1.1 *
36 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
37     * {@link ThreadPoolExecutor#execute execute} and
38     * {@link AbstractExecutorService#submit(Runnable) submit}
39     * methods to generate internal {@link ScheduledFuture} objects to
40     * control per-task delays and scheduling. To preserve
41     * functionality, any further overrides of these methods in
42 dl 1.32 * subclasses must invoke superclass versions, which effectively
43 jsr166 1.39 * disables additional task customization. However, this class
44 dl 1.32 * provides alternative protected extension method
45 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
46     * {@code Callable}) that can be used to customize the concrete task
47     * types used to execute commands entered via {@code execute},
48     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
49     * and {@code scheduleWithFixedDelay}. By default, a
50     * {@code ScheduledThreadPoolExecutor} uses a task type extending
51 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
52     * subclasses of the form:
53     *
54 jsr166 1.39 * <pre> {@code
55 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
56     *
57 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
58 dl 1.23 *
59 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
60     * Runnable r, RunnableScheduledFuture<V> task) {
61     * return new CustomTask<V>(r, task);
62 jsr166 1.29 * }
63 dl 1.23 *
64 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
65     * Callable<V> c, RunnableScheduledFuture<V> task) {
66     * return new CustomTask<V>(c, task);
67 jsr166 1.29 * }
68     * // ... add constructors, etc.
69 jsr166 1.39 * }}</pre>
70     *
71 dl 1.1 * @since 1.5
72     * @author Doug Lea
73     */
74 jsr166 1.21 public class ScheduledThreadPoolExecutor
75     extends ThreadPoolExecutor
76 tim 1.3 implements ScheduledExecutorService {
77 dl 1.1
78 dl 1.37 /*
79     * This class specializes ThreadPoolExecutor implementation by
80     *
81     * 1. Using a custom task type, ScheduledFutureTask for
82     * tasks, even those that don't require scheduling (i.e.,
83     * those submitted using ExecutorService execute, not
84     * ScheduledExecutorService methods) which are treated as
85     * delayed tasks with a delay of zero.
86     *
87     * 2. Using a custom queue (DelayedWorkQueue) based on an
88     * unbounded DelayQueue. The lack of capacity constraint and
89     * the fact that corePoolSize and maximumPoolSize are
90     * effectively identical simplifies some execution mechanics
91     * (see delayedExecute) compared to ThreadPoolExecutor
92     * version.
93     *
94     * The DelayedWorkQueue class is defined below for the sake of
95     * ensuring that all elements are instances of
96     * RunnableScheduledFuture. Since DelayQueue otherwise
97     * requires type be Delayed, but not necessarily Runnable, and
98     * the workQueue requires the opposite, we need to explicitly
99     * define a class that requires both to ensure that users don't
100     * add objects that aren't RunnableScheduledFutures via
101     * getQueue().add() etc.
102     *
103     * 3. Supporting optional run-after-shutdown parameters, which
104     * leads to overrides of shutdown methods to remove and cancel
105     * tasks that should NOT be run after shutdown, as well as
106     * different recheck logic when task (re)submission overlaps
107     * with a shutdown.
108     *
109     * 4. Task decoration methods to allow interception and
110     * instrumentation, which are needed because subclasses cannot
111     * otherwise override submit methods to get this effect. These
112     * don't have any impact on pool control logic though.
113     */
114    
115 dl 1.1 /**
116     * False if should cancel/suppress periodic tasks on shutdown.
117     */
118     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
119    
120     /**
121     * False if should cancel non-periodic tasks on shutdown.
122     */
123     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
124    
125     /**
126     * Sequence number to break scheduling ties, and in turn to
127     * guarantee FIFO order among tied entries.
128     */
129     private static final AtomicLong sequencer = new AtomicLong(0);
130 dl 1.14
131     /**
132 jsr166 1.39 * Returns current nanosecond time.
133 dl 1.14 */
134 dl 1.17 final long now() {
135 jsr166 1.39 return System.nanoTime();
136 dl 1.14 }
137    
138 jsr166 1.21 private class ScheduledFutureTask<V>
139 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
140 jsr166 1.21
141 dl 1.1 /** Sequence number to break ties FIFO */
142     private final long sequenceNumber;
143     /** The time the task is enabled to execute in nanoTime units */
144     private long time;
145 dl 1.16 /**
146     * Period in nanoseconds for repeating tasks. A positive
147     * value indicates fixed-rate execution. A negative value
148     * indicates fixed-delay execution. A value of 0 indicates a
149     * non-repeating task.
150     */
151 dl 1.1 private final long period;
152    
153     /**
154 dl 1.40 * Index into delay queue, to support faster cancellation.
155     */
156     int heapIndex;
157    
158     /**
159 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
160 dl 1.1 */
161     ScheduledFutureTask(Runnable r, V result, long ns) {
162     super(r, result);
163     this.time = ns;
164     this.period = 0;
165     this.sequenceNumber = sequencer.getAndIncrement();
166     }
167    
168     /**
169 jsr166 1.30 * Creates a periodic action with given nano time and period.
170 dl 1.1 */
171 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
172 dl 1.1 super(r, result);
173     this.time = ns;
174     this.period = period;
175     this.sequenceNumber = sequencer.getAndIncrement();
176     }
177    
178     /**
179 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger.
180 dl 1.1 */
181     ScheduledFutureTask(Callable<V> callable, long ns) {
182     super(callable);
183     this.time = ns;
184     this.period = 0;
185     this.sequenceNumber = sequencer.getAndIncrement();
186     }
187    
188     public long getDelay(TimeUnit unit) {
189 dl 1.14 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
190 dl 1.1 return d;
191     }
192    
193 dl 1.20 public int compareTo(Delayed other) {
194 dl 1.1 if (other == this) // compare zero ONLY if same object
195     return 0;
196 dl 1.34 if (other instanceof ScheduledFutureTask) {
197     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
198     long diff = time - x.time;
199     if (diff < 0)
200     return -1;
201     else if (diff > 0)
202     return 1;
203     else if (sequenceNumber < x.sequenceNumber)
204     return -1;
205     else
206     return 1;
207     }
208     long d = (getDelay(TimeUnit.NANOSECONDS) -
209     other.getDelay(TimeUnit.NANOSECONDS));
210 jsr166 1.38 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
211 dl 1.1 }
212    
213     /**
214 dl 1.18 * Returns true if this is a periodic (not a one-shot) action.
215 jsr166 1.30 *
216 dl 1.1 * @return true if periodic
217     */
218 dl 1.23 public boolean isPeriodic() {
219 dl 1.16 return period != 0;
220 dl 1.1 }
221    
222     /**
223 jsr166 1.39 * Sets the next time to run for a periodic task.
224 dl 1.13 */
225 dl 1.37 private void setNextRunTime() {
226     long p = period;
227     if (p > 0)
228     time += p;
229     else
230     time = now() - p;
231 dl 1.13 }
232    
233 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
234     remove(this); // unconditionally remove
235     return super.cancel(mayInterruptIfRunning);
236     }
237    
238 dl 1.13 /**
239 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
240 jsr166 1.21 */
241 dl 1.1 public void run() {
242 dl 1.37 boolean periodic = isPeriodic();
243     if (!canRunInCurrentRunState(periodic))
244     cancel(false);
245     else if (!periodic)
246 dl 1.5 ScheduledFutureTask.super.run();
247 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
248     setNextRunTime();
249     reExecutePeriodic(this);
250     }
251 dl 1.1 }
252     }
253    
254     /**
255 dl 1.37 * Returns true if can run a task given current run state
256 jsr166 1.39 * and run-after-shutdown parameters.
257     *
258 dl 1.37 * @param periodic true if this task periodic, false if delayed
259     */
260     boolean canRunInCurrentRunState(boolean periodic) {
261 jsr166 1.38 return isRunningOrShutdown(periodic ?
262 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
263     executeExistingDelayedTasksAfterShutdown);
264     }
265    
266     /**
267     * Main execution method for delayed or periodic tasks. If pool
268     * is shut down, rejects the task. Otherwise adds task to queue
269     * and starts a thread, if necessary, to run it. (We cannot
270     * prestart the thread to run the task because the task (probably)
271     * shouldn't be run yet,) If the pool is shut down while the task
272     * is being added, cancel and remove it if required by state and
273 jsr166 1.39 * run-after-shutdown parameters.
274     *
275 dl 1.37 * @param task the task
276     */
277     private void delayedExecute(RunnableScheduledFuture<?> task) {
278     if (isShutdown())
279     reject(task);
280     else {
281     super.getQueue().add(task);
282     if (isShutdown() &&
283     !canRunInCurrentRunState(task.isPeriodic()) &&
284     remove(task))
285     task.cancel(false);
286 jsr166 1.39 else
287     prestartCoreThread();
288 dl 1.37 }
289     }
290 jsr166 1.21
291 dl 1.37 /**
292 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
293     * Same idea as delayedExecute except drops task rather than rejecting.
294     *
295 dl 1.37 * @param task the task
296     */
297     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
298     if (canRunInCurrentRunState(true)) {
299     super.getQueue().add(task);
300     if (!canRunInCurrentRunState(true) && remove(task))
301     task.cancel(false);
302 jsr166 1.39 else
303     prestartCoreThread();
304 dl 1.37 }
305 dl 1.13 }
306 dl 1.1
307 dl 1.13 /**
308 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
309 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
310 dl 1.13 */
311 dl 1.37 @Override void onShutdown() {
312     BlockingQueue<Runnable> q = super.getQueue();
313     boolean keepDelayed =
314     getExecuteExistingDelayedTasksAfterShutdownPolicy();
315     boolean keepPeriodic =
316     getContinueExistingPeriodicTasksAfterShutdownPolicy();
317 jsr166 1.21 if (!keepDelayed && !keepPeriodic)
318 dl 1.37 q.clear();
319     else {
320     // Traverse snapshot to avoid iterator exceptions
321 jsr166 1.39 for (Object e : q.toArray()) {
322 dl 1.23 if (e instanceof RunnableScheduledFuture) {
323 dl 1.37 RunnableScheduledFuture<?> t =
324     (RunnableScheduledFuture<?>)e;
325 dl 1.40 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed))
326     t.cancel(false);
327 dl 1.13 }
328     }
329 dl 1.1 }
330 jsr166 1.39 tryTerminate();
331 dl 1.1 }
332    
333 dl 1.23 /**
334 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
335 jsr166 1.28 * This method can be used to override the concrete
336 dl 1.23 * class used for managing internal tasks.
337 jsr166 1.30 * The default implementation simply returns the given task.
338 jsr166 1.28 *
339 dl 1.23 * @param runnable the submitted Runnable
340     * @param task the task created to execute the runnable
341     * @return a task that can execute the runnable
342     * @since 1.6
343     */
344 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
345 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
346     return task;
347 peierls 1.22 }
348    
349 dl 1.23 /**
350 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
351 jsr166 1.28 * This method can be used to override the concrete
352 dl 1.23 * class used for managing internal tasks.
353 jsr166 1.30 * The default implementation simply returns the given task.
354 jsr166 1.28 *
355 dl 1.23 * @param callable the submitted Callable
356     * @param task the task created to execute the callable
357     * @return a task that can execute the callable
358     * @since 1.6
359     */
360 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
361 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
362     return task;
363 dl 1.19 }
364    
365 dl 1.1 /**
366 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
367     * given core pool size.
368 jsr166 1.21 *
369 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
370     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
371     * @throws IllegalArgumentException if {@code corePoolSize < 0}
372 dl 1.1 */
373     public ScheduledThreadPoolExecutor(int corePoolSize) {
374     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
375     new DelayedWorkQueue());
376     }
377    
378     /**
379 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
380     * given initial parameters.
381 jsr166 1.21 *
382 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
383     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
384 dl 1.1 * @param threadFactory the factory to use when the executor
385 jsr166 1.39 * creates a new thread
386     * @throws IllegalArgumentException if {@code corePoolSize < 0}
387     * @throws NullPointerException if {@code threadFactory} is null
388 dl 1.1 */
389     public ScheduledThreadPoolExecutor(int corePoolSize,
390     ThreadFactory threadFactory) {
391     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
392     new DelayedWorkQueue(), threadFactory);
393     }
394    
395     /**
396 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
397     * initial parameters.
398 jsr166 1.21 *
399 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
400     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
401 dl 1.1 * @param handler the handler to use when execution is blocked
402 jsr166 1.39 * because the thread bounds and queue capacities are reached
403     * @throws IllegalArgumentException if {@code corePoolSize < 0}
404     * @throws NullPointerException if {@code handler} is null
405 dl 1.1 */
406     public ScheduledThreadPoolExecutor(int corePoolSize,
407     RejectedExecutionHandler handler) {
408     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
409     new DelayedWorkQueue(), handler);
410     }
411    
412     /**
413 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
414     * initial parameters.
415 jsr166 1.21 *
416 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
417     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
418 dl 1.1 * @param threadFactory the factory to use when the executor
419 jsr166 1.39 * creates a new thread
420 dl 1.1 * @param handler the handler to use when execution is blocked
421 jsr166 1.39 * because the thread bounds and queue capacities are reached
422     * @throws IllegalArgumentException if {@code corePoolSize < 0}
423     * @throws NullPointerException if {@code threadFactory} or
424     * {@code handler} is null
425 dl 1.1 */
426     public ScheduledThreadPoolExecutor(int corePoolSize,
427     ThreadFactory threadFactory,
428     RejectedExecutionHandler handler) {
429     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
430     new DelayedWorkQueue(), threadFactory, handler);
431     }
432    
433 dl 1.37 /**
434     * @throws RejectedExecutionException {@inheritDoc}
435     * @throws NullPointerException {@inheritDoc}
436     */
437 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
438     long delay,
439 dl 1.13 TimeUnit unit) {
440 dl 1.9 if (command == null || unit == null)
441 dl 1.1 throw new NullPointerException();
442 dl 1.37 if (delay < 0) delay = 0;
443 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
444 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
445     new ScheduledFutureTask<Boolean>(command, null, triggerTime));
446 dl 1.1 delayedExecute(t);
447     return t;
448     }
449    
450 dl 1.37 /**
451     * @throws RejectedExecutionException {@inheritDoc}
452     * @throws NullPointerException {@inheritDoc}
453     */
454 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
455     long delay,
456 dl 1.13 TimeUnit unit) {
457 dl 1.9 if (callable == null || unit == null)
458 dl 1.1 throw new NullPointerException();
459 dl 1.16 if (delay < 0) delay = 0;
460 dl 1.14 long triggerTime = now() + unit.toNanos(delay);
461 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
462     new ScheduledFutureTask<V>(callable, triggerTime));
463 dl 1.1 delayedExecute(t);
464     return t;
465     }
466    
467 dl 1.37 /**
468     * @throws RejectedExecutionException {@inheritDoc}
469     * @throws NullPointerException {@inheritDoc}
470     * @throws IllegalArgumentException {@inheritDoc}
471     */
472 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
473     long initialDelay,
474     long period,
475 dl 1.13 TimeUnit unit) {
476 dl 1.9 if (command == null || unit == null)
477 dl 1.1 throw new NullPointerException();
478     if (period <= 0)
479     throw new IllegalArgumentException();
480 dl 1.16 if (initialDelay < 0) initialDelay = 0;
481 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
482 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
483 jsr166 1.21 new ScheduledFutureTask<Object>(command,
484 dl 1.13 null,
485     triggerTime,
486 peierls 1.22 unit.toNanos(period)));
487 dl 1.1 delayedExecute(t);
488     return t;
489     }
490 jsr166 1.21
491 dl 1.37 /**
492     * @throws RejectedExecutionException {@inheritDoc}
493     * @throws NullPointerException {@inheritDoc}
494     * @throws IllegalArgumentException {@inheritDoc}
495     */
496 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
497     long initialDelay,
498     long delay,
499 dl 1.13 TimeUnit unit) {
500 dl 1.9 if (command == null || unit == null)
501 dl 1.1 throw new NullPointerException();
502     if (delay <= 0)
503     throw new IllegalArgumentException();
504 dl 1.16 if (initialDelay < 0) initialDelay = 0;
505 dl 1.14 long triggerTime = now() + unit.toNanos(initialDelay);
506 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
507 jsr166 1.21 new ScheduledFutureTask<Boolean>(command,
508 dl 1.13 null,
509     triggerTime,
510 peierls 1.22 unit.toNanos(-delay)));
511 dl 1.1 delayedExecute(t);
512     return t;
513     }
514 jsr166 1.21
515 dl 1.1 /**
516 jsr166 1.39 * Executes {@code command} with zero required delay.
517     * This has effect equivalent to
518     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
519     * Note that inspections of the queue and of the list returned by
520     * {@code shutdownNow} will access the zero-delayed
521     * {@link ScheduledFuture}, not the {@code command} itself.
522     *
523     * <p>A consequence of the use of {@code ScheduledFuture} objects is
524     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
525     * called with a null second {@code Throwable} argument, even if the
526     * {@code command} terminated abruptly. Instead, the {@code Throwable}
527     * thrown by such a task can be obtained via {@link Future#get}.
528 dl 1.1 *
529     * @throws RejectedExecutionException at discretion of
530 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
531     * cannot be accepted for execution because the
532     * executor has been shut down
533     * @throws NullPointerException {@inheritDoc}
534 dl 1.1 */
535     public void execute(Runnable command) {
536     schedule(command, 0, TimeUnit.NANOSECONDS);
537     }
538    
539 dl 1.13 // Override AbstractExecutorService methods
540    
541 dl 1.37 /**
542     * @throws RejectedExecutionException {@inheritDoc}
543     * @throws NullPointerException {@inheritDoc}
544     */
545 dl 1.7 public Future<?> submit(Runnable task) {
546     return schedule(task, 0, TimeUnit.NANOSECONDS);
547     }
548    
549 dl 1.37 /**
550     * @throws RejectedExecutionException {@inheritDoc}
551     * @throws NullPointerException {@inheritDoc}
552     */
553 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
554 jsr166 1.21 return schedule(Executors.callable(task, result),
555 dl 1.13 0, TimeUnit.NANOSECONDS);
556 dl 1.7 }
557    
558 dl 1.37 /**
559     * @throws RejectedExecutionException {@inheritDoc}
560     * @throws NullPointerException {@inheritDoc}
561     */
562 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
563     return schedule(task, 0, TimeUnit.NANOSECONDS);
564     }
565 dl 1.1
566     /**
567 dl 1.37 * Sets the policy on whether to continue executing existing
568 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
569     * In this case, these tasks will only terminate upon
570     * {@code shutdownNow} or after setting the policy to
571     * {@code false} when already shutdown.
572     * This value is by default {@code false}.
573 jsr166 1.30 *
574 jsr166 1.39 * @param value if {@code true}, continue after shutdown, else don't.
575 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
576 dl 1.1 */
577     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
578     continueExistingPeriodicTasksAfterShutdown = value;
579 jsr166 1.39 if (!value && isShutdown())
580 dl 1.37 onShutdown();
581 dl 1.1 }
582    
583     /**
584 jsr166 1.21 * Gets the policy on whether to continue executing existing
585 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
586     * In this case, these tasks will only terminate upon
587     * {@code shutdownNow} or after setting the policy to
588     * {@code false} when already shutdown.
589     * This value is by default {@code false}.
590 jsr166 1.30 *
591 jsr166 1.39 * @return {@code true} if will continue after shutdown
592 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
593 dl 1.1 */
594     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
595     return continueExistingPeriodicTasksAfterShutdown;
596     }
597    
598     /**
599 jsr166 1.21 * Sets the policy on whether to execute existing delayed
600 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
601     * In this case, these tasks will only terminate upon
602     * {@code shutdownNow}, or after setting the policy to
603     * {@code false} when already shutdown.
604     * This value is by default {@code true}.
605 jsr166 1.30 *
606 jsr166 1.39 * @param value if {@code true}, execute after shutdown, else don't.
607 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
608 dl 1.1 */
609     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
610     executeExistingDelayedTasksAfterShutdown = value;
611 jsr166 1.39 if (!value && isShutdown())
612 dl 1.37 onShutdown();
613 dl 1.1 }
614    
615     /**
616 jsr166 1.21 * Gets the policy on whether to execute existing delayed
617 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
618     * In this case, these tasks will only terminate upon
619     * {@code shutdownNow}, or after setting the policy to
620     * {@code false} when already shutdown.
621     * This value is by default {@code true}.
622 jsr166 1.30 *
623 jsr166 1.39 * @return {@code true} if will execute after shutdown
624 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
625 dl 1.1 */
626     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
627     return executeExistingDelayedTasksAfterShutdown;
628     }
629    
630     /**
631     * Initiates an orderly shutdown in which previously submitted
632 jsr166 1.39 * tasks are executed, but no new tasks will be accepted. If the
633     * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
634     * been set {@code false}, existing delayed tasks whose delays
635     * have not yet elapsed are cancelled. And unless the
636     * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
637     * been set {@code true}, future executions of existing periodic
638 dl 1.1 * tasks will be cancelled.
639 jsr166 1.39 *
640     * @throws SecurityException {@inheritDoc}
641 dl 1.1 */
642     public void shutdown() {
643     super.shutdown();
644     }
645    
646     /**
647     * Attempts to stop all actively executing tasks, halts the
648 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
649     * that were awaiting execution.
650 jsr166 1.21 *
651 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
652 dl 1.18 * processing actively executing tasks. This implementation
653 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
654     * fails to respond to interrupts may never terminate.
655 dl 1.1 *
656 jsr166 1.39 * @return list of tasks that never commenced execution.
657     * Each element of this list is a {@link ScheduledFuture},
658     * including those tasks submitted using {@code execute},
659     * which are for scheduling purposes used as the basis of a
660     * zero-delay {@code ScheduledFuture}.
661 jsr166 1.31 * @throws SecurityException {@inheritDoc}
662 dl 1.1 */
663 tim 1.4 public List<Runnable> shutdownNow() {
664 dl 1.1 return super.shutdownNow();
665     }
666    
667     /**
668     * Returns the task queue used by this executor. Each element of
669     * this queue is a {@link ScheduledFuture}, including those
670 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
671 dl 1.1 * purposes used as the basis of a zero-delay
672 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
673 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
674 dl 1.1 * which they will execute.
675     *
676     * @return the task queue
677     */
678     public BlockingQueue<Runnable> getQueue() {
679     return super.getQueue();
680     }
681    
682 dl 1.13 /**
683 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
684     * class must be declared as a BlockingQueue<Runnable> even though
685     * it can only hold RunnableScheduledFutures
686 jsr166 1.21 */
687 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
688 dl 1.13 implements BlockingQueue<Runnable> {
689 jsr166 1.21
690 dl 1.40 /*
691     * A DelayedWorkQueue is based on a heap-based data structure
692     * like those in DelayQueue and PriorityQueue, except that
693     * every ScheduledFutureTask also records its index into the
694     * heap array. This eliminates the need to find a task upon
695     * cancellation, greatly speeding up removal (down from O(n)
696     * to O(log n)), and reducing garbage retention that would
697     * otherwise occur by waiting for the element to rise to top
698     * before clearing. But because the queue may also hold
699     * RunnableScheduledFutures that are not ScheduledFutureTasks,
700     * we are not guaranteed to have such indices available, in
701     * which case we fall back to linear search. (We expect that
702     * most tasks will not be decorated, and that the faster cases
703     * will be much more common.)
704     *
705     * All heap operations must record index changes -- mainly
706     * within siftUp and siftDown. Upon removal, a task's
707     * heapIndex is set to -1. Note that ScheduledFutureTasks can
708     * appear at most once in the queue (this need not be true for
709     * other kinds of tasks or work queues), so are uniquely
710     * identified by heapIndex.
711     */
712    
713     private static final int INITIAL_CAPACITY = 64;
714     private transient RunnableScheduledFuture[] queue =
715     new RunnableScheduledFuture[INITIAL_CAPACITY];
716     private transient final ReentrantLock lock = new ReentrantLock();
717     private transient final Condition available = lock.newCondition();
718     private int size = 0;
719    
720    
721     /**
722     * Set f's heapIndex if it is a ScheduledFutureTask
723     */
724     private void setIndex(Object f, int idx) {
725     if (f instanceof ScheduledFutureTask)
726     ((ScheduledFutureTask)f).heapIndex = idx;
727     }
728    
729     /**
730     * Sift element added at bottom up to its heap-ordered spot
731     * Call only when holding lock.
732     */
733     private void siftUp(int k, RunnableScheduledFuture key) {
734     while (k > 0) {
735     int parent = (k - 1) >>> 1;
736     RunnableScheduledFuture e = queue[parent];
737     if (key.compareTo(e) >= 0)
738     break;
739     queue[k] = e;
740     setIndex(e, k);
741     k = parent;
742     }
743     queue[k] = key;
744     setIndex(key, k);
745     }
746    
747     /**
748     * Sift element added at top down to its heap-ordered spot
749     * Call only when holding lock.
750     */
751     private void siftDown(int k, RunnableScheduledFuture key) {
752     int half = size >>> 1;
753     while (k < half) {
754     int child = (k << 1) + 1;
755     RunnableScheduledFuture c = queue[child];
756     int right = child + 1;
757     if (right < size && c.compareTo(queue[right]) > 0)
758     c = queue[child = right];
759     if (key.compareTo(c) <= 0)
760     break;
761     queue[k] = c;
762     setIndex(c, k);
763     k = child;
764     }
765     queue[k] = key;
766     setIndex(key, k);
767     }
768    
769     /**
770     * Performs common bookkeeping for poll and take: Replaces
771     * first element with last; sifts it down, and signals any
772     * waiting consumers. Call only when holding lock.
773     * @param f the task to remove and return
774     */
775     private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
776     int s = --size;
777     RunnableScheduledFuture x = queue[s];
778     queue[s] = null;
779     if (s != 0) {
780     siftDown(0, x);
781     available.signalAll();
782     }
783     setIndex(f, -1);
784     return f;
785     }
786    
787     /**
788     * Resize the heap array. Call only when holding lock.
789     */
790     private void grow() {
791     int oldCapacity = queue.length;
792     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
793     if (newCapacity < 0) // overflow
794     newCapacity = Integer.MAX_VALUE;
795     queue = Arrays.copyOf(queue, newCapacity);
796     }
797    
798     /**
799     * Find index of given object, or -1 if absent
800     */
801     private int indexOf(Object x) {
802     if (x != null) {
803     for (int i = 0; i < size; i++)
804     if (x.equals(queue[i]))
805     return i;
806     }
807     return -1;
808     }
809    
810     public boolean remove(Object x) {
811     boolean removed;
812     final ReentrantLock lock = this.lock;
813     lock.lock();
814     try {
815     int i;
816     if (x instanceof ScheduledFutureTask)
817     i = ((ScheduledFutureTask)x).heapIndex;
818     else
819     i = indexOf(x);
820     if (removed = (i >= 0 && i < size && queue[i] == x)) {
821     setIndex(x, -1);
822     int s = --size;
823     RunnableScheduledFuture replacement = queue[s];
824     queue[s] = null;
825     if (s != i) {
826     siftDown(i, replacement);
827     if (queue[i] == replacement)
828     siftUp(i, replacement);
829     }
830     }
831     } finally {
832     lock.unlock();
833     }
834     return removed;
835     }
836    
837     public int size() {
838     int s;
839     final ReentrantLock lock = this.lock;
840     lock.lock();
841     try {
842     s = size;
843     } finally {
844     lock.unlock();
845     }
846     return s;
847     }
848    
849     public boolean isEmpty() {
850     return size() == 0;
851     }
852    
853     public int remainingCapacity() {
854     return Integer.MAX_VALUE;
855     }
856    
857     public RunnableScheduledFuture peek() {
858     final ReentrantLock lock = this.lock;
859     lock.lock();
860     try {
861     return queue[0];
862     } finally {
863     lock.unlock();
864     }
865 dl 1.13 }
866    
867 dl 1.40 public boolean offer(Runnable x) {
868     if (x == null)
869     throw new NullPointerException();
870     RunnableScheduledFuture e = (RunnableScheduledFuture)x;
871     final ReentrantLock lock = this.lock;
872     lock.lock();
873     try {
874     int i = size;
875     if (i >= queue.length)
876     grow();
877     size = i + 1;
878     boolean notify;
879     if (i == 0) {
880     notify = true;
881     queue[0] = e;
882     setIndex(e, 0);
883     }
884     else {
885     notify = e.compareTo(queue[0]) < 0;
886     siftUp(i, e);
887     }
888     if (notify)
889     available.signalAll();
890     } finally {
891     lock.unlock();
892     }
893     return true;
894 jsr166 1.27 }
895 dl 1.40
896     public void put(Runnable e) {
897     offer(e);
898     }
899    
900     public boolean add(Runnable e) {
901     return offer(e);
902 jsr166 1.27 }
903 dl 1.40
904     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
905     return offer(e);
906     }
907    
908     public RunnableScheduledFuture poll() {
909     final ReentrantLock lock = this.lock;
910     lock.lock();
911     try {
912     RunnableScheduledFuture first = queue[0];
913     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
914     return null;
915     else
916     return finishPoll(first);
917     } finally {
918     lock.unlock();
919     }
920     }
921    
922     public RunnableScheduledFuture take() throws InterruptedException {
923     final ReentrantLock lock = this.lock;
924     lock.lockInterruptibly();
925     try {
926     for (;;) {
927     RunnableScheduledFuture first = queue[0];
928     if (first == null)
929     available.await();
930     else {
931     long delay = first.getDelay(TimeUnit.NANOSECONDS);
932     if (delay > 0)
933     available.awaitNanos(delay);
934     else
935     return finishPoll(first);
936     }
937     }
938     } finally {
939     lock.unlock();
940     }
941     }
942    
943     public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
944     throws InterruptedException {
945     long nanos = unit.toNanos(timeout);
946     final ReentrantLock lock = this.lock;
947     lock.lockInterruptibly();
948     try {
949     for (;;) {
950     RunnableScheduledFuture first = queue[0];
951     if (first == null) {
952     if (nanos <= 0)
953     return null;
954     else
955     nanos = available.awaitNanos(nanos);
956     } else {
957     long delay = first.getDelay(TimeUnit.NANOSECONDS);
958     if (delay > 0) {
959     if (nanos <= 0)
960     return null;
961     if (delay > nanos)
962     delay = nanos;
963     long timeLeft = available.awaitNanos(delay);
964     nanos -= delay - timeLeft;
965     } else
966     return finishPoll(first);
967     }
968     }
969     } finally {
970     lock.unlock();
971     }
972     }
973    
974     public void clear() {
975     final ReentrantLock lock = this.lock;
976     lock.lock();
977     try {
978     for (int i = 0; i < size; i++) {
979     RunnableScheduledFuture t = queue[i];
980     if (t != null) {
981     queue[i] = null;
982     setIndex(t, -1);
983     }
984     }
985     size = 0;
986     } finally {
987     lock.unlock();
988     }
989 dl 1.13 }
990 dl 1.40
991     /**
992     * Return and remove first element only if it is expired.
993     * Used only by drainTo. Call only when holding lock.
994     */
995     private RunnableScheduledFuture pollExpired() {
996     RunnableScheduledFuture first = queue[0];
997     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
998     return null;
999     setIndex(first, -1);
1000     int s = --size;
1001     RunnableScheduledFuture x = queue[s];
1002     queue[s] = null;
1003     if (s != 0)
1004     siftDown(0, x);
1005     return first;
1006     }
1007    
1008     public int drainTo(Collection<? super Runnable> c) {
1009     if (c == null)
1010     throw new NullPointerException();
1011     if (c == this)
1012     throw new IllegalArgumentException();
1013     final ReentrantLock lock = this.lock;
1014     lock.lock();
1015     try {
1016     int n = 0;
1017     for (;;) {
1018     RunnableScheduledFuture first = pollExpired();
1019     if (first != null) {
1020     c.add(first);
1021     ++n;
1022     }
1023     else
1024     break;
1025     }
1026     if (n > 0)
1027     available.signalAll();
1028     return n;
1029     } finally {
1030     lock.unlock();
1031     }
1032 dl 1.13 }
1033    
1034 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1035 dl 1.40 if (c == null)
1036     throw new NullPointerException();
1037     if (c == this)
1038     throw new IllegalArgumentException();
1039     if (maxElements <= 0)
1040     return 0;
1041     final ReentrantLock lock = this.lock;
1042     lock.lock();
1043     try {
1044     int n = 0;
1045     while (n < maxElements) {
1046     RunnableScheduledFuture first = pollExpired();
1047     if (first != null) {
1048     c.add(first);
1049     ++n;
1050     }
1051     else
1052     break;
1053     }
1054     if (n > 0)
1055     available.signalAll();
1056     return n;
1057     } finally {
1058     lock.unlock();
1059     }
1060     }
1061    
1062     public Object[] toArray() {
1063     final ReentrantLock lock = this.lock;
1064     lock.lock();
1065     try {
1066     return Arrays.copyOf(queue, size);
1067     } finally {
1068     lock.unlock();
1069     }
1070     }
1071    
1072     public <T> T[] toArray(T[] a) {
1073     final ReentrantLock lock = this.lock;
1074     lock.lock();
1075     try {
1076     if (a.length < size)
1077     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1078     System.arraycopy(queue, 0, a, 0, size);
1079     if (a.length > size)
1080     a[size] = null;
1081     return a;
1082     } finally {
1083     lock.unlock();
1084     }
1085 dl 1.13 }
1086    
1087 jsr166 1.21 public Iterator<Runnable> iterator() {
1088 dl 1.40 return new Itr(toArray());
1089     }
1090    
1091     /**
1092     * Snapshot iterator that works off copy of underlying q array.
1093     */
1094     private class Itr implements Iterator<Runnable> {
1095     final Object[] array; // Array of all elements
1096     int cursor; // index of next element to return;
1097     int lastRet; // index of last element, or -1 if no such
1098    
1099     Itr(Object[] array) {
1100     lastRet = -1;
1101     this.array = array;
1102     }
1103    
1104     public boolean hasNext() {
1105     return cursor < array.length;
1106     }
1107    
1108     public Runnable next() {
1109     if (cursor >= array.length)
1110     throw new NoSuchElementException();
1111     lastRet = cursor;
1112     return (Runnable)array[cursor++];
1113     }
1114    
1115     public void remove() {
1116     if (lastRet < 0)
1117     throw new IllegalStateException();
1118     DelayedWorkQueue.this.remove(array[lastRet]);
1119     lastRet = -1;
1120     }
1121 dl 1.13 }
1122     }
1123 dl 1.1 }