ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.54
Committed: Wed Jul 22 01:08:19 2009 UTC (14 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.53: +38 -32 lines
Log Message:
6725789: ScheduledExecutorService does not work as expected in jdk7/6/5

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.atomic.*;
9 dl 1.40 import java.util.concurrent.locks.*;
10 dl 1.1 import java.util.*;
11    
12     /**
13 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
14     * commands to run after a given delay, or to execute
15     * periodically. This class is preferable to {@link java.util.Timer}
16     * when multiple worker threads are needed, or when the additional
17     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18     * this class extends) are required.
19 dl 1.1 *
20 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
21 dl 1.18 * without any real-time guarantees about when, after they are
22     * enabled, they will commence. Tasks scheduled for exactly the same
23     * execution time are enabled in first-in-first-out (FIFO) order of
24 jsr166 1.46 * submission.
25     *
26     * <p>When a submitted task is cancelled before it is run, execution
27     * is suppressed. By default, such a cancelled task is not
28     * automatically removed from the work queue until its delay
29     * elapses. While this enables further inspection and monitoring, it
30     * may also cause unbounded retention of cancelled tasks. To avoid
31     * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32     * causes tasks to be immediately removed from the work queue at
33     * time of cancellation.
34 dl 1.1 *
35 dl 1.51 * <p>Successive executions of a task scheduled via
36     * <code>scheduleAtFixedRate</code> or
37     * <code>scheduleWithFixedDelay</code> do not overlap. While different
38     * executions may be performed by different threads, the effects of
39     * prior executions <a
40     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
41     * those of subsequent ones.
42     *
43 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 dl 1.8 * of the inherited tuning methods are not useful for it. In
45 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
46     * {@code corePoolSize} threads and an unbounded queue, adjustments
47     * to {@code maximumPoolSize} have no useful effect. Additionally, it
48     * is almost never a good idea to set {@code corePoolSize} to zero or
49     * use {@code allowCoreThreadTimeOut} because this may leave the pool
50     * without threads to handle tasks once they become eligible to run.
51 dl 1.1 *
52 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
53     * {@link ThreadPoolExecutor#execute execute} and
54     * {@link AbstractExecutorService#submit(Runnable) submit}
55     * methods to generate internal {@link ScheduledFuture} objects to
56     * control per-task delays and scheduling. To preserve
57     * functionality, any further overrides of these methods in
58 dl 1.32 * subclasses must invoke superclass versions, which effectively
59 jsr166 1.39 * disables additional task customization. However, this class
60 dl 1.32 * provides alternative protected extension method
61 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
62     * {@code Callable}) that can be used to customize the concrete task
63     * types used to execute commands entered via {@code execute},
64     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
65     * and {@code scheduleWithFixedDelay}. By default, a
66     * {@code ScheduledThreadPoolExecutor} uses a task type extending
67 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
68     * subclasses of the form:
69     *
70 jsr166 1.39 * <pre> {@code
71 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
72     *
73 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
74 dl 1.23 *
75 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
76     * Runnable r, RunnableScheduledFuture<V> task) {
77     * return new CustomTask<V>(r, task);
78 jsr166 1.29 * }
79 dl 1.23 *
80 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
81     * Callable<V> c, RunnableScheduledFuture<V> task) {
82     * return new CustomTask<V>(c, task);
83 jsr166 1.29 * }
84     * // ... add constructors, etc.
85 jsr166 1.39 * }}</pre>
86     *
87 dl 1.1 * @since 1.5
88     * @author Doug Lea
89     */
90 jsr166 1.21 public class ScheduledThreadPoolExecutor
91     extends ThreadPoolExecutor
92 tim 1.3 implements ScheduledExecutorService {
93 dl 1.1
94 dl 1.37 /*
95     * This class specializes ThreadPoolExecutor implementation by
96     *
97     * 1. Using a custom task type, ScheduledFutureTask for
98     * tasks, even those that don't require scheduling (i.e.,
99     * those submitted using ExecutorService execute, not
100     * ScheduledExecutorService methods) which are treated as
101     * delayed tasks with a delay of zero.
102     *
103 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
104 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
105     * the fact that corePoolSize and maximumPoolSize are
106     * effectively identical simplifies some execution mechanics
107 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
108 dl 1.37 *
109     * 3. Supporting optional run-after-shutdown parameters, which
110     * leads to overrides of shutdown methods to remove and cancel
111     * tasks that should NOT be run after shutdown, as well as
112     * different recheck logic when task (re)submission overlaps
113     * with a shutdown.
114     *
115     * 4. Task decoration methods to allow interception and
116     * instrumentation, which are needed because subclasses cannot
117     * otherwise override submit methods to get this effect. These
118     * don't have any impact on pool control logic though.
119     */
120    
121 dl 1.1 /**
122     * False if should cancel/suppress periodic tasks on shutdown.
123     */
124     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
125    
126     /**
127     * False if should cancel non-periodic tasks on shutdown.
128     */
129     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
130    
131     /**
132 dl 1.41 * True if ScheduledFutureTask.cancel should remove from queue
133     */
134     private volatile boolean removeOnCancel = false;
135    
136     /**
137 dl 1.1 * Sequence number to break scheduling ties, and in turn to
138     * guarantee FIFO order among tied entries.
139     */
140     private static final AtomicLong sequencer = new AtomicLong(0);
141 dl 1.14
142     /**
143 jsr166 1.39 * Returns current nanosecond time.
144 dl 1.14 */
145 jsr166 1.54 final long now() {
146     return System.nanoTime();
147 dl 1.14 }
148    
149 jsr166 1.21 private class ScheduledFutureTask<V>
150 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
151 jsr166 1.21
152 dl 1.1 /** Sequence number to break ties FIFO */
153     private final long sequenceNumber;
154 jsr166 1.44
155 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
156     private long time;
157 jsr166 1.44
158 dl 1.16 /**
159     * Period in nanoseconds for repeating tasks. A positive
160     * value indicates fixed-rate execution. A negative value
161     * indicates fixed-delay execution. A value of 0 indicates a
162     * non-repeating task.
163     */
164 dl 1.1 private final long period;
165    
166 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
167     RunnableScheduledFuture<V> outerTask = this;
168 jsr166 1.44
169 dl 1.1 /**
170 dl 1.40 * Index into delay queue, to support faster cancellation.
171     */
172     int heapIndex;
173    
174     /**
175 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
176 dl 1.1 */
177     ScheduledFutureTask(Runnable r, V result, long ns) {
178     super(r, result);
179     this.time = ns;
180     this.period = 0;
181     this.sequenceNumber = sequencer.getAndIncrement();
182     }
183    
184     /**
185 jsr166 1.30 * Creates a periodic action with given nano time and period.
186 dl 1.1 */
187 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
188 dl 1.1 super(r, result);
189     this.time = ns;
190     this.period = period;
191     this.sequenceNumber = sequencer.getAndIncrement();
192     }
193    
194     /**
195 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger.
196 dl 1.1 */
197     ScheduledFutureTask(Callable<V> callable, long ns) {
198     super(callable);
199     this.time = ns;
200     this.period = 0;
201     this.sequenceNumber = sequencer.getAndIncrement();
202     }
203    
204     public long getDelay(TimeUnit unit) {
205 jsr166 1.54 return unit.convert(time - now(), TimeUnit.NANOSECONDS);
206 dl 1.1 }
207    
208 dl 1.20 public int compareTo(Delayed other) {
209 dl 1.1 if (other == this) // compare zero ONLY if same object
210     return 0;
211 dl 1.34 if (other instanceof ScheduledFutureTask) {
212     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
213     long diff = time - x.time;
214     if (diff < 0)
215     return -1;
216     else if (diff > 0)
217     return 1;
218     else if (sequenceNumber < x.sequenceNumber)
219     return -1;
220     else
221     return 1;
222     }
223     long d = (getDelay(TimeUnit.NANOSECONDS) -
224     other.getDelay(TimeUnit.NANOSECONDS));
225 jsr166 1.38 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
226 dl 1.1 }
227    
228     /**
229 dl 1.18 * Returns true if this is a periodic (not a one-shot) action.
230 jsr166 1.30 *
231 dl 1.1 * @return true if periodic
232     */
233 dl 1.23 public boolean isPeriodic() {
234 dl 1.16 return period != 0;
235 dl 1.1 }
236    
237     /**
238 jsr166 1.39 * Sets the next time to run for a periodic task.
239 dl 1.13 */
240 dl 1.37 private void setNextRunTime() {
241     long p = period;
242     if (p > 0)
243     time += p;
244     else
245 jsr166 1.54 time = triggerTime(-p);
246 dl 1.13 }
247    
248 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
249 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
250     if (cancelled && removeOnCancel && heapIndex >= 0)
251 jsr166 1.42 remove(this);
252 dl 1.41 return cancelled;
253 dl 1.40 }
254    
255 dl 1.13 /**
256 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
257 jsr166 1.21 */
258 dl 1.1 public void run() {
259 dl 1.37 boolean periodic = isPeriodic();
260     if (!canRunInCurrentRunState(periodic))
261     cancel(false);
262     else if (!periodic)
263 dl 1.5 ScheduledFutureTask.super.run();
264 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
265     setNextRunTime();
266 jsr166 1.44 reExecutePeriodic(outerTask);
267 dl 1.37 }
268 dl 1.1 }
269     }
270    
271     /**
272 dl 1.37 * Returns true if can run a task given current run state
273 jsr166 1.39 * and run-after-shutdown parameters.
274     *
275 dl 1.37 * @param periodic true if this task periodic, false if delayed
276     */
277     boolean canRunInCurrentRunState(boolean periodic) {
278 jsr166 1.38 return isRunningOrShutdown(periodic ?
279 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
280     executeExistingDelayedTasksAfterShutdown);
281     }
282    
283     /**
284     * Main execution method for delayed or periodic tasks. If pool
285     * is shut down, rejects the task. Otherwise adds task to queue
286     * and starts a thread, if necessary, to run it. (We cannot
287     * prestart the thread to run the task because the task (probably)
288     * shouldn't be run yet,) If the pool is shut down while the task
289     * is being added, cancel and remove it if required by state and
290 jsr166 1.39 * run-after-shutdown parameters.
291     *
292 dl 1.37 * @param task the task
293     */
294     private void delayedExecute(RunnableScheduledFuture<?> task) {
295     if (isShutdown())
296     reject(task);
297     else {
298     super.getQueue().add(task);
299     if (isShutdown() &&
300     !canRunInCurrentRunState(task.isPeriodic()) &&
301     remove(task))
302     task.cancel(false);
303 jsr166 1.48 else
304     prestartCoreThread();
305 dl 1.37 }
306     }
307 jsr166 1.21
308 dl 1.37 /**
309 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
310     * Same idea as delayedExecute except drops task rather than rejecting.
311     *
312 dl 1.37 * @param task the task
313     */
314     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
315     if (canRunInCurrentRunState(true)) {
316     super.getQueue().add(task);
317     if (!canRunInCurrentRunState(true) && remove(task))
318     task.cancel(false);
319 jsr166 1.48 else
320     prestartCoreThread();
321 dl 1.37 }
322 dl 1.13 }
323 dl 1.1
324 dl 1.13 /**
325 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
326 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
327 dl 1.13 */
328 dl 1.37 @Override void onShutdown() {
329     BlockingQueue<Runnable> q = super.getQueue();
330     boolean keepDelayed =
331     getExecuteExistingDelayedTasksAfterShutdownPolicy();
332     boolean keepPeriodic =
333     getContinueExistingPeriodicTasksAfterShutdownPolicy();
334 jsr166 1.21 if (!keepDelayed && !keepPeriodic)
335 dl 1.37 q.clear();
336     else {
337     // Traverse snapshot to avoid iterator exceptions
338 jsr166 1.39 for (Object e : q.toArray()) {
339 dl 1.23 if (e instanceof RunnableScheduledFuture) {
340 dl 1.37 RunnableScheduledFuture<?> t =
341     (RunnableScheduledFuture<?>)e;
342 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
343     t.isCancelled()) { // also remove if already cancelled
344     if (q.remove(t))
345     t.cancel(false);
346     }
347 dl 1.13 }
348     }
349 dl 1.1 }
350 jsr166 1.48 tryTerminate();
351 dl 1.1 }
352    
353 dl 1.23 /**
354 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
355 jsr166 1.28 * This method can be used to override the concrete
356 dl 1.23 * class used for managing internal tasks.
357 jsr166 1.30 * The default implementation simply returns the given task.
358 jsr166 1.28 *
359 dl 1.23 * @param runnable the submitted Runnable
360     * @param task the task created to execute the runnable
361     * @return a task that can execute the runnable
362     * @since 1.6
363     */
364 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
365 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
366     return task;
367 peierls 1.22 }
368    
369 dl 1.23 /**
370 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
371 jsr166 1.28 * This method can be used to override the concrete
372 dl 1.23 * class used for managing internal tasks.
373 jsr166 1.30 * The default implementation simply returns the given task.
374 jsr166 1.28 *
375 dl 1.23 * @param callable the submitted Callable
376     * @param task the task created to execute the callable
377     * @return a task that can execute the callable
378     * @since 1.6
379     */
380 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
381 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
382     return task;
383 dl 1.19 }
384    
385 dl 1.1 /**
386 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
387     * given core pool size.
388 jsr166 1.21 *
389 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
390     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
391     * @throws IllegalArgumentException if {@code corePoolSize < 0}
392 dl 1.1 */
393     public ScheduledThreadPoolExecutor(int corePoolSize) {
394     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
395     new DelayedWorkQueue());
396     }
397    
398     /**
399 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
400     * given initial parameters.
401 jsr166 1.21 *
402 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
403     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
404 dl 1.1 * @param threadFactory the factory to use when the executor
405 jsr166 1.39 * creates a new thread
406     * @throws IllegalArgumentException if {@code corePoolSize < 0}
407     * @throws NullPointerException if {@code threadFactory} is null
408 dl 1.1 */
409     public ScheduledThreadPoolExecutor(int corePoolSize,
410     ThreadFactory threadFactory) {
411     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
412     new DelayedWorkQueue(), threadFactory);
413     }
414    
415     /**
416 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
417     * initial parameters.
418 jsr166 1.21 *
419 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
420     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
421 dl 1.1 * @param handler the handler to use when execution is blocked
422 jsr166 1.39 * because the thread bounds and queue capacities are reached
423     * @throws IllegalArgumentException if {@code corePoolSize < 0}
424     * @throws NullPointerException if {@code handler} is null
425 dl 1.1 */
426     public ScheduledThreadPoolExecutor(int corePoolSize,
427     RejectedExecutionHandler handler) {
428     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
429     new DelayedWorkQueue(), handler);
430     }
431    
432     /**
433 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
434     * initial parameters.
435 jsr166 1.21 *
436 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
437     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
438 dl 1.1 * @param threadFactory the factory to use when the executor
439 jsr166 1.39 * creates a new thread
440 dl 1.1 * @param handler the handler to use when execution is blocked
441 jsr166 1.39 * because the thread bounds and queue capacities are reached
442     * @throws IllegalArgumentException if {@code corePoolSize < 0}
443     * @throws NullPointerException if {@code threadFactory} or
444     * {@code handler} is null
445 dl 1.1 */
446     public ScheduledThreadPoolExecutor(int corePoolSize,
447     ThreadFactory threadFactory,
448     RejectedExecutionHandler handler) {
449     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
450     new DelayedWorkQueue(), threadFactory, handler);
451     }
452    
453 dl 1.37 /**
454 jsr166 1.54 * Returns the trigger time of a delayed action.
455     */
456     private long triggerTime(long delay, TimeUnit unit) {
457     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
458     }
459    
460     /**
461     * Returns the trigger time of a delayed action.
462 dl 1.50 */
463 jsr166 1.54 long triggerTime(long delay) {
464     return now() +
465     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
466     }
467    
468     /**
469     * Constrains the values of all delays in the queue to be within
470     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
471     * This may occur if a task is eligible to be dequeued, but has
472     * not yet been, while some other task is added with a delay of
473     * Long.MAX_VALUE.
474     */
475     private long overflowFree(long delay) {
476     Delayed head = (Delayed) super.getQueue().peek();
477     if (head != null) {
478     long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
479     if (headDelay < 0 && (delay - headDelay < 0))
480     delay = Long.MAX_VALUE + headDelay;
481     }
482     return delay;
483 dl 1.50 }
484    
485     /**
486 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
487     * @throws NullPointerException {@inheritDoc}
488     */
489 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
490     long delay,
491 dl 1.13 TimeUnit unit) {
492 dl 1.9 if (command == null || unit == null)
493 dl 1.1 throw new NullPointerException();
494 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
495 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
496     triggerTime(delay, unit)));
497 dl 1.1 delayedExecute(t);
498     return t;
499     }
500 jsr166 1.52
501 dl 1.37 /**
502     * @throws RejectedExecutionException {@inheritDoc}
503     * @throws NullPointerException {@inheritDoc}
504     */
505 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
506     long delay,
507 dl 1.13 TimeUnit unit) {
508 dl 1.9 if (callable == null || unit == null)
509 dl 1.1 throw new NullPointerException();
510 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
511 jsr166 1.54 new ScheduledFutureTask<V>(callable,
512     triggerTime(delay, unit)));
513 dl 1.1 delayedExecute(t);
514     return t;
515     }
516    
517 dl 1.37 /**
518     * @throws RejectedExecutionException {@inheritDoc}
519     * @throws NullPointerException {@inheritDoc}
520     * @throws IllegalArgumentException {@inheritDoc}
521     */
522 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
523     long initialDelay,
524     long period,
525 dl 1.13 TimeUnit unit) {
526 dl 1.9 if (command == null || unit == null)
527 dl 1.1 throw new NullPointerException();
528     if (period <= 0)
529     throw new IllegalArgumentException();
530 jsr166 1.48 ScheduledFutureTask<Void> sft =
531     new ScheduledFutureTask<Void>(command,
532     null,
533 jsr166 1.54 triggerTime(initialDelay, unit),
534 jsr166 1.48 unit.toNanos(period));
535 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
536 jsr166 1.48 sft.outerTask = t;
537 dl 1.1 delayedExecute(t);
538     return t;
539     }
540 jsr166 1.21
541 dl 1.37 /**
542     * @throws RejectedExecutionException {@inheritDoc}
543     * @throws NullPointerException {@inheritDoc}
544     * @throws IllegalArgumentException {@inheritDoc}
545     */
546 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
547     long initialDelay,
548     long delay,
549 dl 1.13 TimeUnit unit) {
550 dl 1.9 if (command == null || unit == null)
551 dl 1.1 throw new NullPointerException();
552     if (delay <= 0)
553     throw new IllegalArgumentException();
554 jsr166 1.48 ScheduledFutureTask<Void> sft =
555     new ScheduledFutureTask<Void>(command,
556     null,
557 jsr166 1.54 triggerTime(initialDelay, unit),
558 jsr166 1.48 unit.toNanos(-delay));
559 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
560 jsr166 1.48 sft.outerTask = t;
561 dl 1.1 delayedExecute(t);
562     return t;
563     }
564 jsr166 1.21
565 dl 1.1 /**
566 jsr166 1.39 * Executes {@code command} with zero required delay.
567     * This has effect equivalent to
568     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
569     * Note that inspections of the queue and of the list returned by
570     * {@code shutdownNow} will access the zero-delayed
571     * {@link ScheduledFuture}, not the {@code command} itself.
572     *
573     * <p>A consequence of the use of {@code ScheduledFuture} objects is
574     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
575     * called with a null second {@code Throwable} argument, even if the
576     * {@code command} terminated abruptly. Instead, the {@code Throwable}
577     * thrown by such a task can be obtained via {@link Future#get}.
578 dl 1.1 *
579     * @throws RejectedExecutionException at discretion of
580 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
581     * cannot be accepted for execution because the
582     * executor has been shut down
583     * @throws NullPointerException {@inheritDoc}
584 dl 1.1 */
585     public void execute(Runnable command) {
586     schedule(command, 0, TimeUnit.NANOSECONDS);
587     }
588    
589 dl 1.13 // Override AbstractExecutorService methods
590    
591 dl 1.37 /**
592     * @throws RejectedExecutionException {@inheritDoc}
593     * @throws NullPointerException {@inheritDoc}
594     */
595 dl 1.7 public Future<?> submit(Runnable task) {
596     return schedule(task, 0, TimeUnit.NANOSECONDS);
597     }
598    
599 dl 1.37 /**
600     * @throws RejectedExecutionException {@inheritDoc}
601     * @throws NullPointerException {@inheritDoc}
602     */
603 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
604 jsr166 1.21 return schedule(Executors.callable(task, result),
605 dl 1.13 0, TimeUnit.NANOSECONDS);
606 dl 1.7 }
607    
608 dl 1.37 /**
609     * @throws RejectedExecutionException {@inheritDoc}
610     * @throws NullPointerException {@inheritDoc}
611     */
612 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
613     return schedule(task, 0, TimeUnit.NANOSECONDS);
614     }
615 dl 1.1
616     /**
617 dl 1.37 * Sets the policy on whether to continue executing existing
618 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
619     * In this case, these tasks will only terminate upon
620     * {@code shutdownNow} or after setting the policy to
621     * {@code false} when already shutdown.
622     * This value is by default {@code false}.
623 jsr166 1.30 *
624 jsr166 1.39 * @param value if {@code true}, continue after shutdown, else don't.
625 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
626 dl 1.1 */
627     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
628     continueExistingPeriodicTasksAfterShutdown = value;
629 jsr166 1.39 if (!value && isShutdown())
630 dl 1.37 onShutdown();
631 dl 1.1 }
632    
633     /**
634 jsr166 1.21 * Gets the policy on whether to continue executing existing
635 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
636     * In this case, these tasks will only terminate upon
637     * {@code shutdownNow} or after setting the policy to
638     * {@code false} when already shutdown.
639     * This value is by default {@code false}.
640 jsr166 1.30 *
641 jsr166 1.39 * @return {@code true} if will continue after shutdown
642 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
643 dl 1.1 */
644     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
645     return continueExistingPeriodicTasksAfterShutdown;
646     }
647    
648     /**
649 jsr166 1.21 * Sets the policy on whether to execute existing delayed
650 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
651     * In this case, these tasks will only terminate upon
652     * {@code shutdownNow}, or after setting the policy to
653     * {@code false} when already shutdown.
654     * This value is by default {@code true}.
655 jsr166 1.30 *
656 jsr166 1.39 * @param value if {@code true}, execute after shutdown, else don't.
657 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
658 dl 1.1 */
659     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
660     executeExistingDelayedTasksAfterShutdown = value;
661 jsr166 1.39 if (!value && isShutdown())
662 dl 1.37 onShutdown();
663 dl 1.1 }
664    
665     /**
666 jsr166 1.21 * Gets the policy on whether to execute existing delayed
667 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
668     * In this case, these tasks will only terminate upon
669     * {@code shutdownNow}, or after setting the policy to
670     * {@code false} when already shutdown.
671     * This value is by default {@code true}.
672 jsr166 1.30 *
673 jsr166 1.39 * @return {@code true} if will execute after shutdown
674 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
675 dl 1.1 */
676     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
677     return executeExistingDelayedTasksAfterShutdown;
678     }
679    
680     /**
681 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
682     * removed from the work queue at time of cancellation. This value is
683     * by default {@code false}.
684 dl 1.41 *
685 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
686 dl 1.41 * @see #getRemoveOnCancelPolicy
687 jsr166 1.43 * @since 1.7
688 dl 1.41 */
689     public void setRemoveOnCancelPolicy(boolean value) {
690     removeOnCancel = value;
691     }
692    
693     /**
694 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
695     * removed from the work queue at time of cancellation. This value is
696     * by default {@code false}.
697 dl 1.41 *
698 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
699     * from the queue
700 dl 1.41 * @see #setRemoveOnCancelPolicy
701 jsr166 1.43 * @since 1.7
702 dl 1.41 */
703     public boolean getRemoveOnCancelPolicy() {
704     return removeOnCancel;
705     }
706    
707     /**
708 dl 1.1 * Initiates an orderly shutdown in which previously submitted
709 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
710     * Invocation has no additional effect if already shut down.
711     *
712     * <p>This method does not wait for previously submitted tasks to
713     * complete execution. Use {@link #awaitTermination awaitTermination}
714     * to do that.
715     *
716     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
717     * has been set {@code false}, existing delayed tasks whose delays
718     * have not yet elapsed are cancelled. And unless the {@code
719     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
720     * {@code true}, future executions of existing periodic tasks will
721     * be cancelled.
722 jsr166 1.39 *
723     * @throws SecurityException {@inheritDoc}
724 dl 1.1 */
725     public void shutdown() {
726     super.shutdown();
727     }
728    
729     /**
730     * Attempts to stop all actively executing tasks, halts the
731 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
732     * that were awaiting execution.
733 jsr166 1.21 *
734 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
735     * terminate. Use {@link #awaitTermination awaitTermination} to
736     * do that.
737     *
738 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
739 dl 1.18 * processing actively executing tasks. This implementation
740 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
741     * fails to respond to interrupts may never terminate.
742 dl 1.1 *
743 jsr166 1.39 * @return list of tasks that never commenced execution.
744     * Each element of this list is a {@link ScheduledFuture},
745     * including those tasks submitted using {@code execute},
746     * which are for scheduling purposes used as the basis of a
747     * zero-delay {@code ScheduledFuture}.
748 jsr166 1.31 * @throws SecurityException {@inheritDoc}
749 dl 1.1 */
750 tim 1.4 public List<Runnable> shutdownNow() {
751 dl 1.1 return super.shutdownNow();
752     }
753    
754     /**
755     * Returns the task queue used by this executor. Each element of
756     * this queue is a {@link ScheduledFuture}, including those
757 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
758 dl 1.1 * purposes used as the basis of a zero-delay
759 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
760 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
761 dl 1.1 * which they will execute.
762     *
763     * @return the task queue
764     */
765     public BlockingQueue<Runnable> getQueue() {
766     return super.getQueue();
767     }
768    
769 dl 1.13 /**
770 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
771     * class must be declared as a BlockingQueue<Runnable> even though
772 jsr166 1.42 * it can only hold RunnableScheduledFutures.
773 jsr166 1.21 */
774 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
775 dl 1.13 implements BlockingQueue<Runnable> {
776 jsr166 1.21
777 dl 1.40 /*
778     * A DelayedWorkQueue is based on a heap-based data structure
779     * like those in DelayQueue and PriorityQueue, except that
780     * every ScheduledFutureTask also records its index into the
781     * heap array. This eliminates the need to find a task upon
782     * cancellation, greatly speeding up removal (down from O(n)
783     * to O(log n)), and reducing garbage retention that would
784     * otherwise occur by waiting for the element to rise to top
785     * before clearing. But because the queue may also hold
786     * RunnableScheduledFutures that are not ScheduledFutureTasks,
787     * we are not guaranteed to have such indices available, in
788     * which case we fall back to linear search. (We expect that
789     * most tasks will not be decorated, and that the faster cases
790     * will be much more common.)
791     *
792     * All heap operations must record index changes -- mainly
793     * within siftUp and siftDown. Upon removal, a task's
794     * heapIndex is set to -1. Note that ScheduledFutureTasks can
795     * appear at most once in the queue (this need not be true for
796     * other kinds of tasks or work queues), so are uniquely
797     * identified by heapIndex.
798     */
799    
800 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
801     private RunnableScheduledFuture[] queue =
802 dl 1.40 new RunnableScheduledFuture[INITIAL_CAPACITY];
803 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
804 dl 1.40 private int size = 0;
805    
806 jsr166 1.48 /**
807     * Thread designated to wait for the task at the head of the
808     * queue. This variant of the Leader-Follower pattern
809     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
810     * minimize unnecessary timed waiting. When a thread becomes
811     * the leader, it waits only for the next delay to elapse, but
812     * other threads await indefinitely. The leader thread must
813     * signal some other thread before returning from take() or
814     * poll(...), unless some other thread becomes leader in the
815     * interim. Whenever the head of the queue is replaced with a
816     * task with an earlier expiration time, the leader field is
817     * invalidated by being reset to null, and some waiting
818     * thread, but not necessarily the current leader, is
819     * signalled. So waiting threads must be prepared to acquire
820     * and lose leadership while waiting.
821     */
822     private Thread leader = null;
823    
824     /**
825     * Condition signalled when a newer task becomes available at the
826     * head of the queue or a new thread may need to become leader.
827     */
828     private final Condition available = lock.newCondition();
829 dl 1.40
830     /**
831 jsr166 1.42 * Set f's heapIndex if it is a ScheduledFutureTask.
832 dl 1.40 */
833 jsr166 1.45 private void setIndex(RunnableScheduledFuture f, int idx) {
834 dl 1.40 if (f instanceof ScheduledFutureTask)
835     ((ScheduledFutureTask)f).heapIndex = idx;
836     }
837    
838     /**
839 jsr166 1.42 * Sift element added at bottom up to its heap-ordered spot.
840 dl 1.40 * Call only when holding lock.
841     */
842     private void siftUp(int k, RunnableScheduledFuture key) {
843     while (k > 0) {
844     int parent = (k - 1) >>> 1;
845     RunnableScheduledFuture e = queue[parent];
846     if (key.compareTo(e) >= 0)
847     break;
848     queue[k] = e;
849     setIndex(e, k);
850     k = parent;
851     }
852     queue[k] = key;
853     setIndex(key, k);
854     }
855    
856     /**
857 jsr166 1.42 * Sift element added at top down to its heap-ordered spot.
858 dl 1.40 * Call only when holding lock.
859     */
860     private void siftDown(int k, RunnableScheduledFuture key) {
861 jsr166 1.42 int half = size >>> 1;
862 dl 1.40 while (k < half) {
863 jsr166 1.42 int child = (k << 1) + 1;
864 dl 1.40 RunnableScheduledFuture c = queue[child];
865     int right = child + 1;
866     if (right < size && c.compareTo(queue[right]) > 0)
867     c = queue[child = right];
868     if (key.compareTo(c) <= 0)
869     break;
870     queue[k] = c;
871     setIndex(c, k);
872     k = child;
873     }
874     queue[k] = key;
875     setIndex(key, k);
876     }
877    
878     /**
879     * Resize the heap array. Call only when holding lock.
880     */
881     private void grow() {
882     int oldCapacity = queue.length;
883     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
884     if (newCapacity < 0) // overflow
885     newCapacity = Integer.MAX_VALUE;
886     queue = Arrays.copyOf(queue, newCapacity);
887     }
888    
889     /**
890     * Find index of given object, or -1 if absent
891     */
892     private int indexOf(Object x) {
893     if (x != null) {
894 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
895     int i = ((ScheduledFutureTask) x).heapIndex;
896     // Sanity check; x could conceivably be a
897     // ScheduledFutureTask from some other pool.
898     if (i >= 0 && i < size && queue[i] == x)
899     return i;
900     } else {
901     for (int i = 0; i < size; i++)
902     if (x.equals(queue[i]))
903     return i;
904     }
905 dl 1.40 }
906     return -1;
907     }
908    
909 jsr166 1.48 public boolean contains(Object x) {
910     final ReentrantLock lock = this.lock;
911 jsr166 1.45 lock.lock();
912     try {
913 jsr166 1.48 return indexOf(x) != -1;
914 jsr166 1.45 } finally {
915     lock.unlock();
916     }
917 jsr166 1.48 }
918 jsr166 1.45
919 dl 1.40 public boolean remove(Object x) {
920     final ReentrantLock lock = this.lock;
921     lock.lock();
922     try {
923 jsr166 1.45 int i = indexOf(x);
924 jsr166 1.48 if (i < 0)
925     return false;
926 jsr166 1.45
927 jsr166 1.48 setIndex(queue[i], -1);
928     int s = --size;
929     RunnableScheduledFuture replacement = queue[s];
930     queue[s] = null;
931     if (s != i) {
932     siftDown(i, replacement);
933     if (queue[i] == replacement)
934     siftUp(i, replacement);
935     }
936     return true;
937 dl 1.40 } finally {
938     lock.unlock();
939     }
940     }
941    
942     public int size() {
943     final ReentrantLock lock = this.lock;
944     lock.lock();
945     try {
946 jsr166 1.45 return size;
947 dl 1.40 } finally {
948     lock.unlock();
949     }
950     }
951    
952 jsr166 1.42 public boolean isEmpty() {
953     return size() == 0;
954 dl 1.40 }
955    
956     public int remainingCapacity() {
957     return Integer.MAX_VALUE;
958     }
959    
960     public RunnableScheduledFuture peek() {
961     final ReentrantLock lock = this.lock;
962     lock.lock();
963     try {
964     return queue[0];
965     } finally {
966     lock.unlock();
967     }
968 dl 1.13 }
969    
970 dl 1.40 public boolean offer(Runnable x) {
971     if (x == null)
972     throw new NullPointerException();
973     RunnableScheduledFuture e = (RunnableScheduledFuture)x;
974     final ReentrantLock lock = this.lock;
975     lock.lock();
976     try {
977     int i = size;
978     if (i >= queue.length)
979     grow();
980     size = i + 1;
981     if (i == 0) {
982     queue[0] = e;
983     setIndex(e, 0);
984 jsr166 1.45 } else {
985 dl 1.40 siftUp(i, e);
986     }
987 jsr166 1.46 if (queue[0] == e) {
988 jsr166 1.48 leader = null;
989 jsr166 1.46 available.signal();
990 jsr166 1.48 }
991 dl 1.40 } finally {
992     lock.unlock();
993     }
994     return true;
995 jsr166 1.48 }
996 dl 1.40
997     public void put(Runnable e) {
998     offer(e);
999     }
1000    
1001     public boolean add(Runnable e) {
1002 jsr166 1.48 return offer(e);
1003     }
1004 dl 1.40
1005     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1006     return offer(e);
1007     }
1008 jsr166 1.42
1009 jsr166 1.46 /**
1010     * Performs common bookkeeping for poll and take: Replaces
1011 jsr166 1.47 * first element with last and sifts it down. Call only when
1012     * holding lock.
1013 jsr166 1.46 * @param f the task to remove and return
1014     */
1015     private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1016     int s = --size;
1017     RunnableScheduledFuture x = queue[s];
1018     queue[s] = null;
1019     if (s != 0)
1020     siftDown(0, x);
1021     setIndex(f, -1);
1022     return f;
1023     }
1024    
1025 dl 1.40 public RunnableScheduledFuture poll() {
1026     final ReentrantLock lock = this.lock;
1027     lock.lock();
1028     try {
1029     RunnableScheduledFuture first = queue[0];
1030     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1031     return null;
1032 jsr166 1.42 else
1033 dl 1.40 return finishPoll(first);
1034     } finally {
1035     lock.unlock();
1036     }
1037     }
1038    
1039     public RunnableScheduledFuture take() throws InterruptedException {
1040     final ReentrantLock lock = this.lock;
1041     lock.lockInterruptibly();
1042     try {
1043     for (;;) {
1044     RunnableScheduledFuture first = queue[0];
1045 jsr166 1.42 if (first == null)
1046 dl 1.40 available.await();
1047     else {
1048 jsr166 1.48 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1049     if (delay <= 0)
1050     return finishPoll(first);
1051     else if (leader != null)
1052     available.await();
1053     else {
1054     Thread thisThread = Thread.currentThread();
1055     leader = thisThread;
1056     try {
1057     available.awaitNanos(delay);
1058     } finally {
1059     if (leader == thisThread)
1060     leader = null;
1061     }
1062     }
1063 dl 1.40 }
1064     }
1065     } finally {
1066 jsr166 1.48 if (leader == null && queue[0] != null)
1067     available.signal();
1068 dl 1.40 lock.unlock();
1069     }
1070     }
1071    
1072 jsr166 1.42 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1073 dl 1.40 throws InterruptedException {
1074     long nanos = unit.toNanos(timeout);
1075     final ReentrantLock lock = this.lock;
1076     lock.lockInterruptibly();
1077     try {
1078     for (;;) {
1079     RunnableScheduledFuture first = queue[0];
1080     if (first == null) {
1081     if (nanos <= 0)
1082     return null;
1083     else
1084     nanos = available.awaitNanos(nanos);
1085     } else {
1086     long delay = first.getDelay(TimeUnit.NANOSECONDS);
1087 jsr166 1.48 if (delay <= 0)
1088 dl 1.40 return finishPoll(first);
1089 jsr166 1.48 if (nanos <= 0)
1090     return null;
1091     if (nanos < delay || leader != null)
1092     nanos = available.awaitNanos(nanos);
1093     else {
1094     Thread thisThread = Thread.currentThread();
1095     leader = thisThread;
1096     try {
1097     long timeLeft = available.awaitNanos(delay);
1098     nanos -= delay - timeLeft;
1099     } finally {
1100     if (leader == thisThread)
1101     leader = null;
1102     }
1103     }
1104     }
1105     }
1106 dl 1.40 } finally {
1107 jsr166 1.48 if (leader == null && queue[0] != null)
1108     available.signal();
1109 dl 1.40 lock.unlock();
1110     }
1111     }
1112    
1113     public void clear() {
1114     final ReentrantLock lock = this.lock;
1115     lock.lock();
1116     try {
1117     for (int i = 0; i < size; i++) {
1118     RunnableScheduledFuture t = queue[i];
1119     if (t != null) {
1120     queue[i] = null;
1121     setIndex(t, -1);
1122     }
1123     }
1124     size = 0;
1125     } finally {
1126     lock.unlock();
1127     }
1128 dl 1.13 }
1129 dl 1.40
1130     /**
1131     * Return and remove first element only if it is expired.
1132     * Used only by drainTo. Call only when holding lock.
1133     */
1134     private RunnableScheduledFuture pollExpired() {
1135     RunnableScheduledFuture first = queue[0];
1136 jsr166 1.42 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1137 dl 1.40 return null;
1138 jsr166 1.48 return finishPoll(first);
1139 dl 1.40 }
1140    
1141     public int drainTo(Collection<? super Runnable> c) {
1142     if (c == null)
1143     throw new NullPointerException();
1144     if (c == this)
1145     throw new IllegalArgumentException();
1146     final ReentrantLock lock = this.lock;
1147     lock.lock();
1148     try {
1149 jsr166 1.48 RunnableScheduledFuture first;
1150 dl 1.40 int n = 0;
1151 jsr166 1.45 while ((first = pollExpired()) != null) {
1152 jsr166 1.48 c.add(first);
1153     ++n;
1154     }
1155 dl 1.40 return n;
1156     } finally {
1157     lock.unlock();
1158     }
1159 dl 1.13 }
1160    
1161 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1162 dl 1.40 if (c == null)
1163     throw new NullPointerException();
1164     if (c == this)
1165     throw new IllegalArgumentException();
1166     if (maxElements <= 0)
1167     return 0;
1168     final ReentrantLock lock = this.lock;
1169     lock.lock();
1170     try {
1171 jsr166 1.48 RunnableScheduledFuture first;
1172 dl 1.40 int n = 0;
1173 jsr166 1.45 while (n < maxElements && (first = pollExpired()) != null) {
1174 jsr166 1.48 c.add(first);
1175     ++n;
1176     }
1177 dl 1.40 return n;
1178     } finally {
1179     lock.unlock();
1180     }
1181     }
1182    
1183     public Object[] toArray() {
1184     final ReentrantLock lock = this.lock;
1185     lock.lock();
1186     try {
1187 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1188 dl 1.40 } finally {
1189     lock.unlock();
1190     }
1191     }
1192    
1193 jsr166 1.48 @SuppressWarnings("unchecked")
1194 dl 1.40 public <T> T[] toArray(T[] a) {
1195     final ReentrantLock lock = this.lock;
1196     lock.lock();
1197     try {
1198     if (a.length < size)
1199     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1200     System.arraycopy(queue, 0, a, 0, size);
1201     if (a.length > size)
1202     a[size] = null;
1203     return a;
1204     } finally {
1205     lock.unlock();
1206     }
1207 dl 1.13 }
1208    
1209 jsr166 1.21 public Iterator<Runnable> iterator() {
1210 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1211 dl 1.40 }
1212 jsr166 1.42
1213 dl 1.40 /**
1214     * Snapshot iterator that works off copy of underlying q array.
1215     */
1216     private class Itr implements Iterator<Runnable> {
1217 jsr166 1.45 final RunnableScheduledFuture[] array;
1218 jsr166 1.48 int cursor = 0; // index of next element to return
1219     int lastRet = -1; // index of last element, or -1 if no such
1220 jsr166 1.42
1221 jsr166 1.45 Itr(RunnableScheduledFuture[] array) {
1222 dl 1.40 this.array = array;
1223     }
1224 jsr166 1.42
1225 dl 1.40 public boolean hasNext() {
1226     return cursor < array.length;
1227     }
1228 jsr166 1.42
1229 dl 1.40 public Runnable next() {
1230     if (cursor >= array.length)
1231     throw new NoSuchElementException();
1232     lastRet = cursor;
1233 jsr166 1.45 return array[cursor++];
1234 dl 1.40 }
1235 jsr166 1.42
1236 dl 1.40 public void remove() {
1237     if (lastRet < 0)
1238     throw new IllegalStateException();
1239     DelayedWorkQueue.this.remove(array[lastRet]);
1240     lastRet = -1;
1241     }
1242 dl 1.13 }
1243     }
1244 dl 1.1 }