ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.51
Committed: Tue Sep 16 10:45:05 2008 UTC (15 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.50: +14 -6 lines
Log Message:
Clarify happens-before

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.atomic.*;
9 dl 1.40 import java.util.concurrent.locks.*;
10 dl 1.1 import java.util.*;
11    
12     /**
13 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
14     * commands to run after a given delay, or to execute
15     * periodically. This class is preferable to {@link java.util.Timer}
16     * when multiple worker threads are needed, or when the additional
17     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18     * this class extends) are required.
19 dl 1.1 *
20 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
21 dl 1.18 * without any real-time guarantees about when, after they are
22     * enabled, they will commence. Tasks scheduled for exactly the same
23     * execution time are enabled in first-in-first-out (FIFO) order of
24 jsr166 1.46 * submission.
25     *
26     * <p>When a submitted task is cancelled before it is run, execution
27     * is suppressed. By default, such a cancelled task is not
28     * automatically removed from the work queue until its delay
29     * elapses. While this enables further inspection and monitoring, it
30     * may also cause unbounded retention of cancelled tasks. To avoid
31     * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32     * causes tasks to be immediately removed from the work queue at
33     * time of cancellation.
34 dl 1.1 *
35 dl 1.51 * <p>Successive executions of a task scheduled via
36     * <code>scheduleAtFixedRate</code> or
37     * <code>scheduleWithFixedDelay</code> do not overlap. While different
38     * executions may be performed by different threads, the effects of
39     * prior executions <a
40     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
41     * those of subsequent ones.
42     *
43 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
44 dl 1.8 * of the inherited tuning methods are not useful for it. In
45 dl 1.51 * particular, because it acts as a fixed-sized pool using {@code
46     * corePoolSize} threads and an unbounded queue, adjustments to {@code
47     * maximumPoolSize} have no useful effect. Additionally, it is almost
48     * never a good idea to set {@code corePoolSize} to zero or use {@code
49     * allowCoreThreadTimeOut} because this may leave the pool without
50     * threads to handle tasks once they become eligible to run.
51 dl 1.1 *
52 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
53     * {@link ThreadPoolExecutor#execute execute} and
54     * {@link AbstractExecutorService#submit(Runnable) submit}
55     * methods to generate internal {@link ScheduledFuture} objects to
56     * control per-task delays and scheduling. To preserve
57     * functionality, any further overrides of these methods in
58 dl 1.32 * subclasses must invoke superclass versions, which effectively
59 jsr166 1.39 * disables additional task customization. However, this class
60 dl 1.32 * provides alternative protected extension method
61 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
62     * {@code Callable}) that can be used to customize the concrete task
63     * types used to execute commands entered via {@code execute},
64     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
65     * and {@code scheduleWithFixedDelay}. By default, a
66     * {@code ScheduledThreadPoolExecutor} uses a task type extending
67 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
68     * subclasses of the form:
69     *
70 jsr166 1.39 * <pre> {@code
71 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
72     *
73 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
74 dl 1.23 *
75 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
76     * Runnable r, RunnableScheduledFuture<V> task) {
77     * return new CustomTask<V>(r, task);
78 jsr166 1.29 * }
79 dl 1.23 *
80 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
81     * Callable<V> c, RunnableScheduledFuture<V> task) {
82     * return new CustomTask<V>(c, task);
83 jsr166 1.29 * }
84     * // ... add constructors, etc.
85 jsr166 1.39 * }}</pre>
86     *
87 dl 1.1 * @since 1.5
88     * @author Doug Lea
89     */
90 jsr166 1.21 public class ScheduledThreadPoolExecutor
91     extends ThreadPoolExecutor
92 tim 1.3 implements ScheduledExecutorService {
93 dl 1.1
94 dl 1.37 /*
95     * This class specializes ThreadPoolExecutor implementation by
96     *
97     * 1. Using a custom task type, ScheduledFutureTask for
98     * tasks, even those that don't require scheduling (i.e.,
99     * those submitted using ExecutorService execute, not
100     * ScheduledExecutorService methods) which are treated as
101     * delayed tasks with a delay of zero.
102     *
103 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
104 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
105     * the fact that corePoolSize and maximumPoolSize are
106     * effectively identical simplifies some execution mechanics
107 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
108 dl 1.37 *
109     * 3. Supporting optional run-after-shutdown parameters, which
110     * leads to overrides of shutdown methods to remove and cancel
111     * tasks that should NOT be run after shutdown, as well as
112     * different recheck logic when task (re)submission overlaps
113     * with a shutdown.
114     *
115     * 4. Task decoration methods to allow interception and
116     * instrumentation, which are needed because subclasses cannot
117     * otherwise override submit methods to get this effect. These
118     * don't have any impact on pool control logic though.
119     */
120    
121 dl 1.1 /**
122     * False if should cancel/suppress periodic tasks on shutdown.
123     */
124     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
125    
126     /**
127     * False if should cancel non-periodic tasks on shutdown.
128     */
129     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
130    
131     /**
132 dl 1.41 * True if ScheduledFutureTask.cancel should remove from queue
133     */
134     private volatile boolean removeOnCancel = false;
135    
136     /**
137 dl 1.1 * Sequence number to break scheduling ties, and in turn to
138     * guarantee FIFO order among tied entries.
139     */
140     private static final AtomicLong sequencer = new AtomicLong(0);
141 dl 1.14
142 dl 1.50
143     /**
144     * Value of System.nanoTime upon static initialization. This is
145     * used as an offset by now() to avoid wraparound of time values
146     * that would make them appear negative.
147     */
148     static final long initialNanoTime = System.nanoTime();
149    
150 dl 1.14 /**
151 jsr166 1.39 * Returns current nanosecond time.
152 dl 1.14 */
153 dl 1.50 static long now() {
154     return System.nanoTime() - initialNanoTime;
155 dl 1.14 }
156    
157 jsr166 1.21 private class ScheduledFutureTask<V>
158 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
159 jsr166 1.21
160 dl 1.1 /** Sequence number to break ties FIFO */
161     private final long sequenceNumber;
162 jsr166 1.44
163 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
164     private long time;
165 jsr166 1.44
166 dl 1.16 /**
167     * Period in nanoseconds for repeating tasks. A positive
168     * value indicates fixed-rate execution. A negative value
169     * indicates fixed-delay execution. A value of 0 indicates a
170     * non-repeating task.
171     */
172 dl 1.1 private final long period;
173    
174 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
175     RunnableScheduledFuture<V> outerTask = this;
176 jsr166 1.44
177 dl 1.1 /**
178 dl 1.40 * Index into delay queue, to support faster cancellation.
179     */
180     int heapIndex;
181    
182     /**
183 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
184 dl 1.1 */
185     ScheduledFutureTask(Runnable r, V result, long ns) {
186     super(r, result);
187     this.time = ns;
188     this.period = 0;
189     this.sequenceNumber = sequencer.getAndIncrement();
190     }
191    
192     /**
193 jsr166 1.30 * Creates a periodic action with given nano time and period.
194 dl 1.1 */
195 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
196 dl 1.1 super(r, result);
197     this.time = ns;
198     this.period = period;
199     this.sequenceNumber = sequencer.getAndIncrement();
200     }
201    
202     /**
203 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger.
204 dl 1.1 */
205     ScheduledFutureTask(Callable<V> callable, long ns) {
206     super(callable);
207     this.time = ns;
208     this.period = 0;
209     this.sequenceNumber = sequencer.getAndIncrement();
210     }
211    
212     public long getDelay(TimeUnit unit) {
213 dl 1.50 long d = time - now();
214     return d<=0? 0 : unit.convert(d, TimeUnit.NANOSECONDS);
215 dl 1.1 }
216    
217 dl 1.20 public int compareTo(Delayed other) {
218 dl 1.1 if (other == this) // compare zero ONLY if same object
219     return 0;
220 dl 1.34 if (other instanceof ScheduledFutureTask) {
221     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
222     long diff = time - x.time;
223     if (diff < 0)
224     return -1;
225     else if (diff > 0)
226     return 1;
227     else if (sequenceNumber < x.sequenceNumber)
228     return -1;
229     else
230     return 1;
231     }
232     long d = (getDelay(TimeUnit.NANOSECONDS) -
233     other.getDelay(TimeUnit.NANOSECONDS));
234 jsr166 1.38 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
235 dl 1.1 }
236    
237     /**
238 dl 1.18 * Returns true if this is a periodic (not a one-shot) action.
239 jsr166 1.30 *
240 dl 1.1 * @return true if periodic
241     */
242 dl 1.23 public boolean isPeriodic() {
243 dl 1.16 return period != 0;
244 dl 1.1 }
245    
246     /**
247 jsr166 1.39 * Sets the next time to run for a periodic task.
248 dl 1.13 */
249 dl 1.37 private void setNextRunTime() {
250     long p = period;
251     if (p > 0)
252     time += p;
253     else
254     time = now() - p;
255 dl 1.13 }
256    
257 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
258 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
259     if (cancelled && removeOnCancel && heapIndex >= 0)
260 jsr166 1.42 remove(this);
261 dl 1.41 return cancelled;
262 dl 1.40 }
263    
264 dl 1.13 /**
265 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
266 jsr166 1.21 */
267 dl 1.1 public void run() {
268 dl 1.37 boolean periodic = isPeriodic();
269     if (!canRunInCurrentRunState(periodic))
270     cancel(false);
271     else if (!periodic)
272 dl 1.5 ScheduledFutureTask.super.run();
273 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
274     setNextRunTime();
275 jsr166 1.44 reExecutePeriodic(outerTask);
276 dl 1.37 }
277 dl 1.1 }
278     }
279    
280     /**
281 dl 1.37 * Returns true if can run a task given current run state
282 jsr166 1.39 * and run-after-shutdown parameters.
283     *
284 dl 1.37 * @param periodic true if this task periodic, false if delayed
285     */
286     boolean canRunInCurrentRunState(boolean periodic) {
287 jsr166 1.38 return isRunningOrShutdown(periodic ?
288 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
289     executeExistingDelayedTasksAfterShutdown);
290     }
291    
292     /**
293     * Main execution method for delayed or periodic tasks. If pool
294     * is shut down, rejects the task. Otherwise adds task to queue
295     * and starts a thread, if necessary, to run it. (We cannot
296     * prestart the thread to run the task because the task (probably)
297     * shouldn't be run yet,) If the pool is shut down while the task
298     * is being added, cancel and remove it if required by state and
299 jsr166 1.39 * run-after-shutdown parameters.
300     *
301 dl 1.37 * @param task the task
302     */
303     private void delayedExecute(RunnableScheduledFuture<?> task) {
304     if (isShutdown())
305     reject(task);
306     else {
307     super.getQueue().add(task);
308     if (isShutdown() &&
309     !canRunInCurrentRunState(task.isPeriodic()) &&
310     remove(task))
311     task.cancel(false);
312 jsr166 1.48 else
313     prestartCoreThread();
314 dl 1.37 }
315     }
316 jsr166 1.21
317 dl 1.37 /**
318 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
319     * Same idea as delayedExecute except drops task rather than rejecting.
320     *
321 dl 1.37 * @param task the task
322     */
323     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
324     if (canRunInCurrentRunState(true)) {
325     super.getQueue().add(task);
326     if (!canRunInCurrentRunState(true) && remove(task))
327     task.cancel(false);
328 jsr166 1.48 else
329     prestartCoreThread();
330 dl 1.37 }
331 dl 1.13 }
332 dl 1.1
333 dl 1.13 /**
334 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
335 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
336 dl 1.13 */
337 dl 1.37 @Override void onShutdown() {
338     BlockingQueue<Runnable> q = super.getQueue();
339     boolean keepDelayed =
340     getExecuteExistingDelayedTasksAfterShutdownPolicy();
341     boolean keepPeriodic =
342     getContinueExistingPeriodicTasksAfterShutdownPolicy();
343 jsr166 1.21 if (!keepDelayed && !keepPeriodic)
344 dl 1.37 q.clear();
345     else {
346     // Traverse snapshot to avoid iterator exceptions
347 jsr166 1.39 for (Object e : q.toArray()) {
348 dl 1.23 if (e instanceof RunnableScheduledFuture) {
349 dl 1.37 RunnableScheduledFuture<?> t =
350     (RunnableScheduledFuture<?>)e;
351 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
352     t.isCancelled()) { // also remove if already cancelled
353     if (q.remove(t))
354     t.cancel(false);
355     }
356 dl 1.13 }
357     }
358 dl 1.1 }
359 jsr166 1.48 tryTerminate();
360 dl 1.1 }
361    
362 dl 1.23 /**
363 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
364 jsr166 1.28 * This method can be used to override the concrete
365 dl 1.23 * class used for managing internal tasks.
366 jsr166 1.30 * The default implementation simply returns the given task.
367 jsr166 1.28 *
368 dl 1.23 * @param runnable the submitted Runnable
369     * @param task the task created to execute the runnable
370     * @return a task that can execute the runnable
371     * @since 1.6
372     */
373 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
374 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
375     return task;
376 peierls 1.22 }
377    
378 dl 1.23 /**
379 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
380 jsr166 1.28 * This method can be used to override the concrete
381 dl 1.23 * class used for managing internal tasks.
382 jsr166 1.30 * The default implementation simply returns the given task.
383 jsr166 1.28 *
384 dl 1.23 * @param callable the submitted Callable
385     * @param task the task created to execute the callable
386     * @return a task that can execute the callable
387     * @since 1.6
388     */
389 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
390 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
391     return task;
392 dl 1.19 }
393    
394 dl 1.1 /**
395 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
396     * given core pool size.
397 jsr166 1.21 *
398 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
399     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
400     * @throws IllegalArgumentException if {@code corePoolSize < 0}
401 dl 1.1 */
402     public ScheduledThreadPoolExecutor(int corePoolSize) {
403     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
404     new DelayedWorkQueue());
405     }
406    
407     /**
408 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
409     * given initial parameters.
410 jsr166 1.21 *
411 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
412     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
413 dl 1.1 * @param threadFactory the factory to use when the executor
414 jsr166 1.39 * creates a new thread
415     * @throws IllegalArgumentException if {@code corePoolSize < 0}
416     * @throws NullPointerException if {@code threadFactory} is null
417 dl 1.1 */
418     public ScheduledThreadPoolExecutor(int corePoolSize,
419     ThreadFactory threadFactory) {
420     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
421     new DelayedWorkQueue(), threadFactory);
422     }
423    
424     /**
425 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
426     * initial parameters.
427 jsr166 1.21 *
428 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
429     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
430 dl 1.1 * @param handler the handler to use when execution is blocked
431 jsr166 1.39 * because the thread bounds and queue capacities are reached
432     * @throws IllegalArgumentException if {@code corePoolSize < 0}
433     * @throws NullPointerException if {@code handler} is null
434 dl 1.1 */
435     public ScheduledThreadPoolExecutor(int corePoolSize,
436     RejectedExecutionHandler handler) {
437     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
438     new DelayedWorkQueue(), handler);
439     }
440    
441     /**
442 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
443     * initial parameters.
444 jsr166 1.21 *
445 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
446     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
447 dl 1.1 * @param threadFactory the factory to use when the executor
448 jsr166 1.39 * creates a new thread
449 dl 1.1 * @param handler the handler to use when execution is blocked
450 jsr166 1.39 * because the thread bounds and queue capacities are reached
451     * @throws IllegalArgumentException if {@code corePoolSize < 0}
452     * @throws NullPointerException if {@code threadFactory} or
453     * {@code handler} is null
454 dl 1.1 */
455     public ScheduledThreadPoolExecutor(int corePoolSize,
456     ThreadFactory threadFactory,
457     RejectedExecutionHandler handler) {
458     super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
459     new DelayedWorkQueue(), threadFactory, handler);
460     }
461    
462 dl 1.37 /**
463 dl 1.50 * Returns the trigger time of a delayed action
464     */
465     private static long nextTriggerTime(long delay, TimeUnit unit) {
466     long triggerTime;
467     long now = now();
468     if (delay <= 0)
469     return now; // avoid negative trigger times
470     else if ((triggerTime = now + unit.toNanos(delay)) < 0)
471     return Long.MAX_VALUE; // avoid numerical overflow
472     else
473     return triggerTime;
474     }
475    
476     /**
477 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
478     * @throws NullPointerException {@inheritDoc}
479     */
480 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
481     long delay,
482 dl 1.13 TimeUnit unit) {
483 dl 1.9 if (command == null || unit == null)
484 dl 1.1 throw new NullPointerException();
485 dl 1.50 long triggerTime = nextTriggerTime(delay, unit);
486 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
487 jsr166 1.42 new ScheduledFutureTask<Void>(command, null, triggerTime));
488 dl 1.1 delayedExecute(t);
489     return t;
490     }
491 dl 1.50
492 dl 1.37 /**
493     * @throws RejectedExecutionException {@inheritDoc}
494     * @throws NullPointerException {@inheritDoc}
495     */
496 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
497     long delay,
498 dl 1.13 TimeUnit unit) {
499 dl 1.9 if (callable == null || unit == null)
500 dl 1.1 throw new NullPointerException();
501 dl 1.50 long triggerTime = nextTriggerTime(delay, unit);
502 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
503     new ScheduledFutureTask<V>(callable, triggerTime));
504 dl 1.1 delayedExecute(t);
505     return t;
506     }
507    
508 dl 1.37 /**
509     * @throws RejectedExecutionException {@inheritDoc}
510     * @throws NullPointerException {@inheritDoc}
511     * @throws IllegalArgumentException {@inheritDoc}
512     */
513 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
514     long initialDelay,
515     long period,
516 dl 1.13 TimeUnit unit) {
517 dl 1.9 if (command == null || unit == null)
518 dl 1.1 throw new NullPointerException();
519     if (period <= 0)
520     throw new IllegalArgumentException();
521 dl 1.16 if (initialDelay < 0) initialDelay = 0;
522 dl 1.50 long triggerTime = nextTriggerTime(initialDelay, unit);
523 jsr166 1.48 ScheduledFutureTask<Void> sft =
524     new ScheduledFutureTask<Void>(command,
525     null,
526     triggerTime,
527     unit.toNanos(period));
528 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
529 jsr166 1.48 sft.outerTask = t;
530 dl 1.1 delayedExecute(t);
531     return t;
532     }
533 jsr166 1.21
534 dl 1.37 /**
535     * @throws RejectedExecutionException {@inheritDoc}
536     * @throws NullPointerException {@inheritDoc}
537     * @throws IllegalArgumentException {@inheritDoc}
538     */
539 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
540     long initialDelay,
541     long delay,
542 dl 1.13 TimeUnit unit) {
543 dl 1.9 if (command == null || unit == null)
544 dl 1.1 throw new NullPointerException();
545     if (delay <= 0)
546     throw new IllegalArgumentException();
547 dl 1.50 long triggerTime = nextTriggerTime(initialDelay, unit);
548 jsr166 1.48 ScheduledFutureTask<Void> sft =
549     new ScheduledFutureTask<Void>(command,
550     null,
551     triggerTime,
552     unit.toNanos(-delay));
553 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
554 jsr166 1.48 sft.outerTask = t;
555 dl 1.1 delayedExecute(t);
556     return t;
557     }
558 jsr166 1.21
559 dl 1.1 /**
560 jsr166 1.39 * Executes {@code command} with zero required delay.
561     * This has effect equivalent to
562     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
563     * Note that inspections of the queue and of the list returned by
564     * {@code shutdownNow} will access the zero-delayed
565     * {@link ScheduledFuture}, not the {@code command} itself.
566     *
567     * <p>A consequence of the use of {@code ScheduledFuture} objects is
568     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
569     * called with a null second {@code Throwable} argument, even if the
570     * {@code command} terminated abruptly. Instead, the {@code Throwable}
571     * thrown by such a task can be obtained via {@link Future#get}.
572 dl 1.1 *
573     * @throws RejectedExecutionException at discretion of
574 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
575     * cannot be accepted for execution because the
576     * executor has been shut down
577     * @throws NullPointerException {@inheritDoc}
578 dl 1.1 */
579     public void execute(Runnable command) {
580     schedule(command, 0, TimeUnit.NANOSECONDS);
581     }
582    
583 dl 1.13 // Override AbstractExecutorService methods
584    
585 dl 1.37 /**
586     * @throws RejectedExecutionException {@inheritDoc}
587     * @throws NullPointerException {@inheritDoc}
588     */
589 dl 1.7 public Future<?> submit(Runnable task) {
590     return schedule(task, 0, TimeUnit.NANOSECONDS);
591     }
592    
593 dl 1.37 /**
594     * @throws RejectedExecutionException {@inheritDoc}
595     * @throws NullPointerException {@inheritDoc}
596     */
597 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
598 jsr166 1.21 return schedule(Executors.callable(task, result),
599 dl 1.13 0, TimeUnit.NANOSECONDS);
600 dl 1.7 }
601    
602 dl 1.37 /**
603     * @throws RejectedExecutionException {@inheritDoc}
604     * @throws NullPointerException {@inheritDoc}
605     */
606 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
607     return schedule(task, 0, TimeUnit.NANOSECONDS);
608     }
609 dl 1.1
610     /**
611 dl 1.37 * Sets the policy on whether to continue executing existing
612 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
613     * In this case, these tasks will only terminate upon
614     * {@code shutdownNow} or after setting the policy to
615     * {@code false} when already shutdown.
616     * This value is by default {@code false}.
617 jsr166 1.30 *
618 jsr166 1.39 * @param value if {@code true}, continue after shutdown, else don't.
619 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
620 dl 1.1 */
621     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
622     continueExistingPeriodicTasksAfterShutdown = value;
623 jsr166 1.39 if (!value && isShutdown())
624 dl 1.37 onShutdown();
625 dl 1.1 }
626    
627     /**
628 jsr166 1.21 * Gets the policy on whether to continue executing existing
629 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
630     * In this case, these tasks will only terminate upon
631     * {@code shutdownNow} or after setting the policy to
632     * {@code false} when already shutdown.
633     * This value is by default {@code false}.
634 jsr166 1.30 *
635 jsr166 1.39 * @return {@code true} if will continue after shutdown
636 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
637 dl 1.1 */
638     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
639     return continueExistingPeriodicTasksAfterShutdown;
640     }
641    
642     /**
643 jsr166 1.21 * Sets the policy on whether to execute existing delayed
644 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
645     * In this case, these tasks will only terminate upon
646     * {@code shutdownNow}, or after setting the policy to
647     * {@code false} when already shutdown.
648     * This value is by default {@code true}.
649 jsr166 1.30 *
650 jsr166 1.39 * @param value if {@code true}, execute after shutdown, else don't.
651 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
652 dl 1.1 */
653     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
654     executeExistingDelayedTasksAfterShutdown = value;
655 jsr166 1.39 if (!value && isShutdown())
656 dl 1.37 onShutdown();
657 dl 1.1 }
658    
659     /**
660 jsr166 1.21 * Gets the policy on whether to execute existing delayed
661 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
662     * In this case, these tasks will only terminate upon
663     * {@code shutdownNow}, or after setting the policy to
664     * {@code false} when already shutdown.
665     * This value is by default {@code true}.
666 jsr166 1.30 *
667 jsr166 1.39 * @return {@code true} if will execute after shutdown
668 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
669 dl 1.1 */
670     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
671     return executeExistingDelayedTasksAfterShutdown;
672     }
673    
674     /**
675 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
676     * removed from the work queue at time of cancellation. This value is
677     * by default {@code false}.
678 dl 1.41 *
679 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
680 dl 1.41 * @see #getRemoveOnCancelPolicy
681 jsr166 1.43 * @since 1.7
682 dl 1.41 */
683     public void setRemoveOnCancelPolicy(boolean value) {
684     removeOnCancel = value;
685     }
686    
687     /**
688 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
689     * removed from the work queue at time of cancellation. This value is
690     * by default {@code false}.
691 dl 1.41 *
692 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
693     * from the queue
694 dl 1.41 * @see #setRemoveOnCancelPolicy
695 jsr166 1.43 * @since 1.7
696 dl 1.41 */
697     public boolean getRemoveOnCancelPolicy() {
698     return removeOnCancel;
699     }
700    
701     /**
702 dl 1.1 * Initiates an orderly shutdown in which previously submitted
703 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
704     * Invocation has no additional effect if already shut down.
705     *
706     * <p>This method does not wait for previously submitted tasks to
707     * complete execution. Use {@link #awaitTermination awaitTermination}
708     * to do that.
709     *
710     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
711     * has been set {@code false}, existing delayed tasks whose delays
712     * have not yet elapsed are cancelled. And unless the {@code
713     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
714     * {@code true}, future executions of existing periodic tasks will
715     * be cancelled.
716 jsr166 1.39 *
717     * @throws SecurityException {@inheritDoc}
718 dl 1.1 */
719     public void shutdown() {
720     super.shutdown();
721     }
722    
723     /**
724     * Attempts to stop all actively executing tasks, halts the
725 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
726     * that were awaiting execution.
727 jsr166 1.21 *
728 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
729     * terminate. Use {@link #awaitTermination awaitTermination} to
730     * do that.
731     *
732 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
733 dl 1.18 * processing actively executing tasks. This implementation
734 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
735     * fails to respond to interrupts may never terminate.
736 dl 1.1 *
737 jsr166 1.39 * @return list of tasks that never commenced execution.
738     * Each element of this list is a {@link ScheduledFuture},
739     * including those tasks submitted using {@code execute},
740     * which are for scheduling purposes used as the basis of a
741     * zero-delay {@code ScheduledFuture}.
742 jsr166 1.31 * @throws SecurityException {@inheritDoc}
743 dl 1.1 */
744 tim 1.4 public List<Runnable> shutdownNow() {
745 dl 1.1 return super.shutdownNow();
746     }
747    
748     /**
749     * Returns the task queue used by this executor. Each element of
750     * this queue is a {@link ScheduledFuture}, including those
751 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
752 dl 1.1 * purposes used as the basis of a zero-delay
753 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
754 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
755 dl 1.1 * which they will execute.
756     *
757     * @return the task queue
758     */
759     public BlockingQueue<Runnable> getQueue() {
760     return super.getQueue();
761     }
762    
763 dl 1.13 /**
764 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
765     * class must be declared as a BlockingQueue<Runnable> even though
766 jsr166 1.42 * it can only hold RunnableScheduledFutures.
767 jsr166 1.21 */
768 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
769 dl 1.13 implements BlockingQueue<Runnable> {
770 jsr166 1.21
771 dl 1.40 /*
772     * A DelayedWorkQueue is based on a heap-based data structure
773     * like those in DelayQueue and PriorityQueue, except that
774     * every ScheduledFutureTask also records its index into the
775     * heap array. This eliminates the need to find a task upon
776     * cancellation, greatly speeding up removal (down from O(n)
777     * to O(log n)), and reducing garbage retention that would
778     * otherwise occur by waiting for the element to rise to top
779     * before clearing. But because the queue may also hold
780     * RunnableScheduledFutures that are not ScheduledFutureTasks,
781     * we are not guaranteed to have such indices available, in
782     * which case we fall back to linear search. (We expect that
783     * most tasks will not be decorated, and that the faster cases
784     * will be much more common.)
785     *
786     * All heap operations must record index changes -- mainly
787     * within siftUp and siftDown. Upon removal, a task's
788     * heapIndex is set to -1. Note that ScheduledFutureTasks can
789     * appear at most once in the queue (this need not be true for
790     * other kinds of tasks or work queues), so are uniquely
791     * identified by heapIndex.
792     */
793    
794 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
795     private RunnableScheduledFuture[] queue =
796 dl 1.40 new RunnableScheduledFuture[INITIAL_CAPACITY];
797 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
798 dl 1.40 private int size = 0;
799    
800 jsr166 1.48 /**
801     * Thread designated to wait for the task at the head of the
802     * queue. This variant of the Leader-Follower pattern
803     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
804     * minimize unnecessary timed waiting. When a thread becomes
805     * the leader, it waits only for the next delay to elapse, but
806     * other threads await indefinitely. The leader thread must
807     * signal some other thread before returning from take() or
808     * poll(...), unless some other thread becomes leader in the
809     * interim. Whenever the head of the queue is replaced with a
810     * task with an earlier expiration time, the leader field is
811     * invalidated by being reset to null, and some waiting
812     * thread, but not necessarily the current leader, is
813     * signalled. So waiting threads must be prepared to acquire
814     * and lose leadership while waiting.
815     */
816     private Thread leader = null;
817    
818     /**
819     * Condition signalled when a newer task becomes available at the
820     * head of the queue or a new thread may need to become leader.
821     */
822     private final Condition available = lock.newCondition();
823 dl 1.40
824     /**
825 jsr166 1.42 * Set f's heapIndex if it is a ScheduledFutureTask.
826 dl 1.40 */
827 jsr166 1.45 private void setIndex(RunnableScheduledFuture f, int idx) {
828 dl 1.40 if (f instanceof ScheduledFutureTask)
829     ((ScheduledFutureTask)f).heapIndex = idx;
830     }
831    
832     /**
833 jsr166 1.42 * Sift element added at bottom up to its heap-ordered spot.
834 dl 1.40 * Call only when holding lock.
835     */
836     private void siftUp(int k, RunnableScheduledFuture key) {
837     while (k > 0) {
838     int parent = (k - 1) >>> 1;
839     RunnableScheduledFuture e = queue[parent];
840     if (key.compareTo(e) >= 0)
841     break;
842     queue[k] = e;
843     setIndex(e, k);
844     k = parent;
845     }
846     queue[k] = key;
847     setIndex(key, k);
848     }
849    
850     /**
851 jsr166 1.42 * Sift element added at top down to its heap-ordered spot.
852 dl 1.40 * Call only when holding lock.
853     */
854     private void siftDown(int k, RunnableScheduledFuture key) {
855 jsr166 1.42 int half = size >>> 1;
856 dl 1.40 while (k < half) {
857 jsr166 1.42 int child = (k << 1) + 1;
858 dl 1.40 RunnableScheduledFuture c = queue[child];
859     int right = child + 1;
860     if (right < size && c.compareTo(queue[right]) > 0)
861     c = queue[child = right];
862     if (key.compareTo(c) <= 0)
863     break;
864     queue[k] = c;
865     setIndex(c, k);
866     k = child;
867     }
868     queue[k] = key;
869     setIndex(key, k);
870     }
871    
872     /**
873     * Resize the heap array. Call only when holding lock.
874     */
875     private void grow() {
876     int oldCapacity = queue.length;
877     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
878     if (newCapacity < 0) // overflow
879     newCapacity = Integer.MAX_VALUE;
880     queue = Arrays.copyOf(queue, newCapacity);
881     }
882    
883     /**
884     * Find index of given object, or -1 if absent
885     */
886     private int indexOf(Object x) {
887     if (x != null) {
888 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
889     int i = ((ScheduledFutureTask) x).heapIndex;
890     // Sanity check; x could conceivably be a
891     // ScheduledFutureTask from some other pool.
892     if (i >= 0 && i < size && queue[i] == x)
893     return i;
894     } else {
895     for (int i = 0; i < size; i++)
896     if (x.equals(queue[i]))
897     return i;
898     }
899 dl 1.40 }
900     return -1;
901     }
902    
903 jsr166 1.48 public boolean contains(Object x) {
904     final ReentrantLock lock = this.lock;
905 jsr166 1.45 lock.lock();
906     try {
907 jsr166 1.48 return indexOf(x) != -1;
908 jsr166 1.45 } finally {
909     lock.unlock();
910     }
911 jsr166 1.48 }
912 jsr166 1.45
913 dl 1.40 public boolean remove(Object x) {
914     final ReentrantLock lock = this.lock;
915     lock.lock();
916     try {
917 jsr166 1.45 int i = indexOf(x);
918 jsr166 1.48 if (i < 0)
919     return false;
920 jsr166 1.45
921 jsr166 1.48 setIndex(queue[i], -1);
922     int s = --size;
923     RunnableScheduledFuture replacement = queue[s];
924     queue[s] = null;
925     if (s != i) {
926     siftDown(i, replacement);
927     if (queue[i] == replacement)
928     siftUp(i, replacement);
929     }
930     return true;
931 dl 1.40 } finally {
932     lock.unlock();
933     }
934     }
935    
936     public int size() {
937     final ReentrantLock lock = this.lock;
938     lock.lock();
939     try {
940 jsr166 1.45 return size;
941 dl 1.40 } finally {
942     lock.unlock();
943     }
944     }
945    
946 jsr166 1.42 public boolean isEmpty() {
947     return size() == 0;
948 dl 1.40 }
949    
950     public int remainingCapacity() {
951     return Integer.MAX_VALUE;
952     }
953    
954     public RunnableScheduledFuture peek() {
955     final ReentrantLock lock = this.lock;
956     lock.lock();
957     try {
958     return queue[0];
959     } finally {
960     lock.unlock();
961     }
962 dl 1.13 }
963    
964 dl 1.40 public boolean offer(Runnable x) {
965     if (x == null)
966     throw new NullPointerException();
967     RunnableScheduledFuture e = (RunnableScheduledFuture)x;
968     final ReentrantLock lock = this.lock;
969     lock.lock();
970     try {
971     int i = size;
972     if (i >= queue.length)
973     grow();
974     size = i + 1;
975     if (i == 0) {
976     queue[0] = e;
977     setIndex(e, 0);
978 jsr166 1.45 } else {
979 dl 1.40 siftUp(i, e);
980     }
981 jsr166 1.46 if (queue[0] == e) {
982 jsr166 1.48 leader = null;
983 jsr166 1.46 available.signal();
984 jsr166 1.48 }
985 dl 1.40 } finally {
986     lock.unlock();
987     }
988     return true;
989 jsr166 1.48 }
990 dl 1.40
991     public void put(Runnable e) {
992     offer(e);
993     }
994    
995     public boolean add(Runnable e) {
996 jsr166 1.48 return offer(e);
997     }
998 dl 1.40
999     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1000     return offer(e);
1001     }
1002 jsr166 1.42
1003 jsr166 1.46 /**
1004     * Performs common bookkeeping for poll and take: Replaces
1005 jsr166 1.47 * first element with last and sifts it down. Call only when
1006     * holding lock.
1007 jsr166 1.46 * @param f the task to remove and return
1008     */
1009     private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1010     int s = --size;
1011     RunnableScheduledFuture x = queue[s];
1012     queue[s] = null;
1013     if (s != 0)
1014     siftDown(0, x);
1015     setIndex(f, -1);
1016     return f;
1017     }
1018    
1019 dl 1.40 public RunnableScheduledFuture poll() {
1020     final ReentrantLock lock = this.lock;
1021     lock.lock();
1022     try {
1023     RunnableScheduledFuture first = queue[0];
1024     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1025     return null;
1026 jsr166 1.42 else
1027 dl 1.40 return finishPoll(first);
1028     } finally {
1029     lock.unlock();
1030     }
1031     }
1032    
1033     public RunnableScheduledFuture take() throws InterruptedException {
1034     final ReentrantLock lock = this.lock;
1035     lock.lockInterruptibly();
1036     try {
1037     for (;;) {
1038     RunnableScheduledFuture first = queue[0];
1039 jsr166 1.42 if (first == null)
1040 dl 1.40 available.await();
1041     else {
1042 jsr166 1.48 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1043     if (delay <= 0)
1044     return finishPoll(first);
1045     else if (leader != null)
1046     available.await();
1047     else {
1048     Thread thisThread = Thread.currentThread();
1049     leader = thisThread;
1050     try {
1051     available.awaitNanos(delay);
1052     } finally {
1053     if (leader == thisThread)
1054     leader = null;
1055     }
1056     }
1057 dl 1.40 }
1058     }
1059     } finally {
1060 jsr166 1.48 if (leader == null && queue[0] != null)
1061     available.signal();
1062 dl 1.40 lock.unlock();
1063     }
1064     }
1065    
1066 jsr166 1.42 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1067 dl 1.40 throws InterruptedException {
1068     long nanos = unit.toNanos(timeout);
1069     final ReentrantLock lock = this.lock;
1070     lock.lockInterruptibly();
1071     try {
1072     for (;;) {
1073     RunnableScheduledFuture first = queue[0];
1074     if (first == null) {
1075     if (nanos <= 0)
1076     return null;
1077     else
1078     nanos = available.awaitNanos(nanos);
1079     } else {
1080     long delay = first.getDelay(TimeUnit.NANOSECONDS);
1081 jsr166 1.48 if (delay <= 0)
1082 dl 1.40 return finishPoll(first);
1083 jsr166 1.48 if (nanos <= 0)
1084     return null;
1085     if (nanos < delay || leader != null)
1086     nanos = available.awaitNanos(nanos);
1087     else {
1088     Thread thisThread = Thread.currentThread();
1089     leader = thisThread;
1090     try {
1091     long timeLeft = available.awaitNanos(delay);
1092     nanos -= delay - timeLeft;
1093     } finally {
1094     if (leader == thisThread)
1095     leader = null;
1096     }
1097     }
1098     }
1099     }
1100 dl 1.40 } finally {
1101 jsr166 1.48 if (leader == null && queue[0] != null)
1102     available.signal();
1103 dl 1.40 lock.unlock();
1104     }
1105     }
1106    
1107     public void clear() {
1108     final ReentrantLock lock = this.lock;
1109     lock.lock();
1110     try {
1111     for (int i = 0; i < size; i++) {
1112     RunnableScheduledFuture t = queue[i];
1113     if (t != null) {
1114     queue[i] = null;
1115     setIndex(t, -1);
1116     }
1117     }
1118     size = 0;
1119     } finally {
1120     lock.unlock();
1121     }
1122 dl 1.13 }
1123 dl 1.40
1124     /**
1125     * Return and remove first element only if it is expired.
1126     * Used only by drainTo. Call only when holding lock.
1127     */
1128     private RunnableScheduledFuture pollExpired() {
1129     RunnableScheduledFuture first = queue[0];
1130 jsr166 1.42 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1131 dl 1.40 return null;
1132 jsr166 1.48 return finishPoll(first);
1133 dl 1.40 }
1134    
1135     public int drainTo(Collection<? super Runnable> c) {
1136     if (c == null)
1137     throw new NullPointerException();
1138     if (c == this)
1139     throw new IllegalArgumentException();
1140     final ReentrantLock lock = this.lock;
1141     lock.lock();
1142     try {
1143 jsr166 1.48 RunnableScheduledFuture first;
1144 dl 1.40 int n = 0;
1145 jsr166 1.45 while ((first = pollExpired()) != null) {
1146 jsr166 1.48 c.add(first);
1147     ++n;
1148     }
1149 dl 1.40 return n;
1150     } finally {
1151     lock.unlock();
1152     }
1153 dl 1.13 }
1154    
1155 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1156 dl 1.40 if (c == null)
1157     throw new NullPointerException();
1158     if (c == this)
1159     throw new IllegalArgumentException();
1160     if (maxElements <= 0)
1161     return 0;
1162     final ReentrantLock lock = this.lock;
1163     lock.lock();
1164     try {
1165 jsr166 1.48 RunnableScheduledFuture first;
1166 dl 1.40 int n = 0;
1167 jsr166 1.45 while (n < maxElements && (first = pollExpired()) != null) {
1168 jsr166 1.48 c.add(first);
1169     ++n;
1170     }
1171 dl 1.40 return n;
1172     } finally {
1173     lock.unlock();
1174     }
1175     }
1176    
1177     public Object[] toArray() {
1178     final ReentrantLock lock = this.lock;
1179     lock.lock();
1180     try {
1181 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1182 dl 1.40 } finally {
1183     lock.unlock();
1184     }
1185     }
1186    
1187 jsr166 1.48 @SuppressWarnings("unchecked")
1188 dl 1.40 public <T> T[] toArray(T[] a) {
1189     final ReentrantLock lock = this.lock;
1190     lock.lock();
1191     try {
1192     if (a.length < size)
1193     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1194     System.arraycopy(queue, 0, a, 0, size);
1195     if (a.length > size)
1196     a[size] = null;
1197     return a;
1198     } finally {
1199     lock.unlock();
1200     }
1201 dl 1.13 }
1202    
1203 jsr166 1.21 public Iterator<Runnable> iterator() {
1204 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1205 dl 1.40 }
1206 jsr166 1.42
1207 dl 1.40 /**
1208     * Snapshot iterator that works off copy of underlying q array.
1209     */
1210     private class Itr implements Iterator<Runnable> {
1211 jsr166 1.45 final RunnableScheduledFuture[] array;
1212 jsr166 1.48 int cursor = 0; // index of next element to return
1213     int lastRet = -1; // index of last element, or -1 if no such
1214 jsr166 1.42
1215 jsr166 1.45 Itr(RunnableScheduledFuture[] array) {
1216 dl 1.40 this.array = array;
1217     }
1218 jsr166 1.42
1219 dl 1.40 public boolean hasNext() {
1220     return cursor < array.length;
1221     }
1222 jsr166 1.42
1223 dl 1.40 public Runnable next() {
1224     if (cursor >= array.length)
1225     throw new NoSuchElementException();
1226     lastRet = cursor;
1227 jsr166 1.45 return array[cursor++];
1228 dl 1.40 }
1229 jsr166 1.42
1230 dl 1.40 public void remove() {
1231     if (lastRet < 0)
1232     throw new IllegalStateException();
1233     DelayedWorkQueue.this.remove(array[lastRet]);
1234     lastRet = -1;
1235     }
1236 dl 1.13 }
1237     }
1238 dl 1.1 }