ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.73
Committed: Fri Sep 27 01:10:10 2013 UTC (10 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.72: +2 -2 lines
Log Message:
small internal doc improvement

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9     import java.util.concurrent.atomic.AtomicLong;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12 dl 1.1 import java.util.*;
13    
14     /**
15 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
16     * commands to run after a given delay, or to execute
17     * periodically. This class is preferable to {@link java.util.Timer}
18     * when multiple worker threads are needed, or when the additional
19     * flexibility or capabilities of {@link ThreadPoolExecutor} (which
20     * this class extends) are required.
21 dl 1.1 *
22 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
23 dl 1.18 * without any real-time guarantees about when, after they are
24     * enabled, they will commence. Tasks scheduled for exactly the same
25     * execution time are enabled in first-in-first-out (FIFO) order of
26 jsr166 1.46 * submission.
27     *
28     * <p>When a submitted task is cancelled before it is run, execution
29     * is suppressed. By default, such a cancelled task is not
30     * automatically removed from the work queue until its delay
31     * elapses. While this enables further inspection and monitoring, it
32     * may also cause unbounded retention of cancelled tasks. To avoid
33     * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
34     * causes tasks to be immediately removed from the work queue at
35     * time of cancellation.
36 dl 1.1 *
37 dl 1.51 * <p>Successive executions of a task scheduled via
38 jsr166 1.55 * {@code scheduleAtFixedRate} or
39     * {@code scheduleWithFixedDelay} do not overlap. While different
40 dl 1.51 * executions may be performed by different threads, the effects of
41     * prior executions <a
42     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
43     * those of subsequent ones.
44     *
45 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46 dl 1.8 * of the inherited tuning methods are not useful for it. In
47 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
48     * {@code corePoolSize} threads and an unbounded queue, adjustments
49     * to {@code maximumPoolSize} have no useful effect. Additionally, it
50     * is almost never a good idea to set {@code corePoolSize} to zero or
51     * use {@code allowCoreThreadTimeOut} because this may leave the pool
52     * without threads to handle tasks once they become eligible to run.
53 dl 1.1 *
54 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
55 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
56 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
57     * methods to generate internal {@link ScheduledFuture} objects to
58     * control per-task delays and scheduling. To preserve
59     * functionality, any further overrides of these methods in
60 dl 1.32 * subclasses must invoke superclass versions, which effectively
61 jsr166 1.39 * disables additional task customization. However, this class
62 dl 1.32 * provides alternative protected extension method
63 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
64     * {@code Callable}) that can be used to customize the concrete task
65     * types used to execute commands entered via {@code execute},
66     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
67     * and {@code scheduleWithFixedDelay}. By default, a
68     * {@code ScheduledThreadPoolExecutor} uses a task type extending
69 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
70     * subclasses of the form:
71     *
72 jsr166 1.39 * <pre> {@code
73 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
74     *
75 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
76 dl 1.23 *
77 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
78     * Runnable r, RunnableScheduledFuture<V> task) {
79     * return new CustomTask<V>(r, task);
80 jsr166 1.29 * }
81 dl 1.23 *
82 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
83     * Callable<V> c, RunnableScheduledFuture<V> task) {
84     * return new CustomTask<V>(c, task);
85 jsr166 1.29 * }
86     * // ... add constructors, etc.
87 jsr166 1.39 * }}</pre>
88     *
89 dl 1.1 * @since 1.5
90     * @author Doug Lea
91     */
92 jsr166 1.21 public class ScheduledThreadPoolExecutor
93     extends ThreadPoolExecutor
94 tim 1.3 implements ScheduledExecutorService {
95 dl 1.1
96 dl 1.37 /*
97     * This class specializes ThreadPoolExecutor implementation by
98     *
99     * 1. Using a custom task type, ScheduledFutureTask for
100     * tasks, even those that don't require scheduling (i.e.,
101     * those submitted using ExecutorService execute, not
102     * ScheduledExecutorService methods) which are treated as
103     * delayed tasks with a delay of zero.
104     *
105 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
106 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
107     * the fact that corePoolSize and maximumPoolSize are
108     * effectively identical simplifies some execution mechanics
109 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
110 dl 1.37 *
111     * 3. Supporting optional run-after-shutdown parameters, which
112     * leads to overrides of shutdown methods to remove and cancel
113     * tasks that should NOT be run after shutdown, as well as
114     * different recheck logic when task (re)submission overlaps
115     * with a shutdown.
116     *
117     * 4. Task decoration methods to allow interception and
118     * instrumentation, which are needed because subclasses cannot
119     * otherwise override submit methods to get this effect. These
120     * don't have any impact on pool control logic though.
121     */
122    
123 dl 1.1 /**
124     * False if should cancel/suppress periodic tasks on shutdown.
125     */
126     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
127    
128     /**
129     * False if should cancel non-periodic tasks on shutdown.
130     */
131     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
132    
133     /**
134 dl 1.41 * True if ScheduledFutureTask.cancel should remove from queue
135     */
136     private volatile boolean removeOnCancel = false;
137    
138     /**
139 dl 1.1 * Sequence number to break scheduling ties, and in turn to
140     * guarantee FIFO order among tied entries.
141     */
142 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
143 dl 1.14
144     /**
145 jsr166 1.39 * Returns current nanosecond time.
146 dl 1.14 */
147 jsr166 1.54 final long now() {
148     return System.nanoTime();
149 dl 1.14 }
150    
151 jsr166 1.21 private class ScheduledFutureTask<V>
152 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
153 jsr166 1.21
154 dl 1.1 /** Sequence number to break ties FIFO */
155     private final long sequenceNumber;
156 jsr166 1.44
157 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
158     private long time;
159 jsr166 1.44
160 dl 1.16 /**
161     * Period in nanoseconds for repeating tasks. A positive
162     * value indicates fixed-rate execution. A negative value
163     * indicates fixed-delay execution. A value of 0 indicates a
164     * non-repeating task.
165     */
166 dl 1.1 private final long period;
167    
168 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
169     RunnableScheduledFuture<V> outerTask = this;
170 jsr166 1.44
171 dl 1.1 /**
172 dl 1.40 * Index into delay queue, to support faster cancellation.
173     */
174     int heapIndex;
175    
176     /**
177 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
178 dl 1.1 */
179     ScheduledFutureTask(Runnable r, V result, long ns) {
180     super(r, result);
181     this.time = ns;
182     this.period = 0;
183     this.sequenceNumber = sequencer.getAndIncrement();
184     }
185    
186     /**
187 jsr166 1.30 * Creates a periodic action with given nano time and period.
188 dl 1.1 */
189 jsr166 1.30 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
190 dl 1.1 super(r, result);
191     this.time = ns;
192     this.period = period;
193     this.sequenceNumber = sequencer.getAndIncrement();
194     }
195    
196     /**
197 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
198 dl 1.1 */
199     ScheduledFutureTask(Callable<V> callable, long ns) {
200     super(callable);
201     this.time = ns;
202     this.period = 0;
203     this.sequenceNumber = sequencer.getAndIncrement();
204     }
205    
206     public long getDelay(TimeUnit unit) {
207 jsr166 1.62 return unit.convert(time - now(), NANOSECONDS);
208 dl 1.1 }
209    
210 dl 1.20 public int compareTo(Delayed other) {
211 dl 1.59 if (other == this) // compare zero if same object
212 dl 1.1 return 0;
213 dl 1.34 if (other instanceof ScheduledFutureTask) {
214     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
215     long diff = time - x.time;
216     if (diff < 0)
217     return -1;
218     else if (diff > 0)
219     return 1;
220     else if (sequenceNumber < x.sequenceNumber)
221     return -1;
222     else
223     return 1;
224     }
225 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
226 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
227 dl 1.1 }
228    
229     /**
230 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
231 jsr166 1.30 *
232 jsr166 1.70 * @return {@code true} if periodic
233 dl 1.1 */
234 dl 1.23 public boolean isPeriodic() {
235 dl 1.16 return period != 0;
236 dl 1.1 }
237    
238     /**
239 jsr166 1.39 * Sets the next time to run for a periodic task.
240 dl 1.13 */
241 dl 1.37 private void setNextRunTime() {
242     long p = period;
243     if (p > 0)
244     time += p;
245     else
246 jsr166 1.54 time = triggerTime(-p);
247 dl 1.13 }
248    
249 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
250 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
251     if (cancelled && removeOnCancel && heapIndex >= 0)
252 jsr166 1.42 remove(this);
253 dl 1.41 return cancelled;
254 dl 1.40 }
255    
256 dl 1.13 /**
257 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
258 jsr166 1.21 */
259 dl 1.1 public void run() {
260 dl 1.37 boolean periodic = isPeriodic();
261     if (!canRunInCurrentRunState(periodic))
262     cancel(false);
263     else if (!periodic)
264 dl 1.5 ScheduledFutureTask.super.run();
265 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
266     setNextRunTime();
267 jsr166 1.44 reExecutePeriodic(outerTask);
268 dl 1.37 }
269 dl 1.1 }
270     }
271    
272     /**
273 dl 1.37 * Returns true if can run a task given current run state
274 jsr166 1.39 * and run-after-shutdown parameters.
275     *
276 dl 1.37 * @param periodic true if this task periodic, false if delayed
277     */
278     boolean canRunInCurrentRunState(boolean periodic) {
279 jsr166 1.38 return isRunningOrShutdown(periodic ?
280 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
281     executeExistingDelayedTasksAfterShutdown);
282     }
283    
284     /**
285     * Main execution method for delayed or periodic tasks. If pool
286     * is shut down, rejects the task. Otherwise adds task to queue
287     * and starts a thread, if necessary, to run it. (We cannot
288     * prestart the thread to run the task because the task (probably)
289 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
290 dl 1.37 * is being added, cancel and remove it if required by state and
291 jsr166 1.39 * run-after-shutdown parameters.
292     *
293 dl 1.37 * @param task the task
294     */
295     private void delayedExecute(RunnableScheduledFuture<?> task) {
296     if (isShutdown())
297     reject(task);
298     else {
299     super.getQueue().add(task);
300     if (isShutdown() &&
301     !canRunInCurrentRunState(task.isPeriodic()) &&
302     remove(task))
303     task.cancel(false);
304 jsr166 1.48 else
305 dl 1.63 ensurePrestart();
306 dl 1.37 }
307     }
308 jsr166 1.21
309 dl 1.37 /**
310 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
311     * Same idea as delayedExecute except drops task rather than rejecting.
312     *
313 dl 1.37 * @param task the task
314     */
315     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
316     if (canRunInCurrentRunState(true)) {
317     super.getQueue().add(task);
318     if (!canRunInCurrentRunState(true) && remove(task))
319     task.cancel(false);
320 jsr166 1.48 else
321 dl 1.63 ensurePrestart();
322 dl 1.37 }
323 dl 1.13 }
324 dl 1.1
325 dl 1.13 /**
326 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
327 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
328 dl 1.13 */
329 dl 1.37 @Override void onShutdown() {
330     BlockingQueue<Runnable> q = super.getQueue();
331     boolean keepDelayed =
332     getExecuteExistingDelayedTasksAfterShutdownPolicy();
333     boolean keepPeriodic =
334     getContinueExistingPeriodicTasksAfterShutdownPolicy();
335 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
336     for (Object e : q.toArray())
337     if (e instanceof RunnableScheduledFuture<?>)
338     ((RunnableScheduledFuture<?>) e).cancel(false);
339 dl 1.37 q.clear();
340 jsr166 1.57 }
341 dl 1.37 else {
342     // Traverse snapshot to avoid iterator exceptions
343 jsr166 1.39 for (Object e : q.toArray()) {
344 dl 1.23 if (e instanceof RunnableScheduledFuture) {
345 dl 1.37 RunnableScheduledFuture<?> t =
346     (RunnableScheduledFuture<?>)e;
347 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
348     t.isCancelled()) { // also remove if already cancelled
349     if (q.remove(t))
350     t.cancel(false);
351     }
352 dl 1.13 }
353     }
354 dl 1.1 }
355 jsr166 1.48 tryTerminate();
356 dl 1.1 }
357    
358 dl 1.23 /**
359 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
360 jsr166 1.28 * This method can be used to override the concrete
361 dl 1.23 * class used for managing internal tasks.
362 jsr166 1.30 * The default implementation simply returns the given task.
363 jsr166 1.28 *
364 dl 1.23 * @param runnable the submitted Runnable
365     * @param task the task created to execute the runnable
366 jsr166 1.72 * @param <V> the type of the task's result
367 dl 1.23 * @return a task that can execute the runnable
368     * @since 1.6
369     */
370 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
371 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
372     return task;
373 peierls 1.22 }
374    
375 dl 1.23 /**
376 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
377 jsr166 1.28 * This method can be used to override the concrete
378 dl 1.23 * class used for managing internal tasks.
379 jsr166 1.30 * The default implementation simply returns the given task.
380 jsr166 1.28 *
381 dl 1.23 * @param callable the submitted Callable
382     * @param task the task created to execute the callable
383 jsr166 1.72 * @param <V> the type of the task's result
384 dl 1.23 * @return a task that can execute the callable
385     * @since 1.6
386     */
387 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
388 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
389     return task;
390 dl 1.19 }
391    
392 dl 1.1 /**
393 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
394     * given core pool size.
395 jsr166 1.21 *
396 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
397     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
398     * @throws IllegalArgumentException if {@code corePoolSize < 0}
399 dl 1.1 */
400     public ScheduledThreadPoolExecutor(int corePoolSize) {
401 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
402 dl 1.1 new DelayedWorkQueue());
403     }
404    
405     /**
406 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
407     * given initial parameters.
408 jsr166 1.21 *
409 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
410     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
411 dl 1.1 * @param threadFactory the factory to use when the executor
412 jsr166 1.39 * creates a new thread
413     * @throws IllegalArgumentException if {@code corePoolSize < 0}
414     * @throws NullPointerException if {@code threadFactory} is null
415 dl 1.1 */
416     public ScheduledThreadPoolExecutor(int corePoolSize,
417 jsr166 1.56 ThreadFactory threadFactory) {
418 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
419 dl 1.1 new DelayedWorkQueue(), threadFactory);
420     }
421    
422     /**
423 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
424     * initial parameters.
425 jsr166 1.21 *
426 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
427     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
428 dl 1.1 * @param handler the handler to use when execution is blocked
429 jsr166 1.39 * because the thread bounds and queue capacities are reached
430     * @throws IllegalArgumentException if {@code corePoolSize < 0}
431     * @throws NullPointerException if {@code handler} is null
432 dl 1.1 */
433     public ScheduledThreadPoolExecutor(int corePoolSize,
434 jsr166 1.56 RejectedExecutionHandler handler) {
435 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
436 dl 1.1 new DelayedWorkQueue(), handler);
437     }
438    
439     /**
440 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
441     * initial parameters.
442 jsr166 1.21 *
443 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
444     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
445 dl 1.1 * @param threadFactory the factory to use when the executor
446 jsr166 1.39 * creates a new thread
447 dl 1.1 * @param handler the handler to use when execution is blocked
448 jsr166 1.39 * because the thread bounds and queue capacities are reached
449     * @throws IllegalArgumentException if {@code corePoolSize < 0}
450     * @throws NullPointerException if {@code threadFactory} or
451     * {@code handler} is null
452 dl 1.1 */
453     public ScheduledThreadPoolExecutor(int corePoolSize,
454 jsr166 1.56 ThreadFactory threadFactory,
455     RejectedExecutionHandler handler) {
456 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
457 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
458     }
459    
460 dl 1.37 /**
461 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
462 jsr166 1.54 */
463     private long triggerTime(long delay, TimeUnit unit) {
464     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
465     }
466    
467     /**
468 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
469 dl 1.50 */
470 jsr166 1.54 long triggerTime(long delay) {
471     return now() +
472     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
473     }
474    
475     /**
476     * Constrains the values of all delays in the queue to be within
477     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
478     * This may occur if a task is eligible to be dequeued, but has
479     * not yet been, while some other task is added with a delay of
480     * Long.MAX_VALUE.
481     */
482     private long overflowFree(long delay) {
483     Delayed head = (Delayed) super.getQueue().peek();
484     if (head != null) {
485 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
486 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
487     delay = Long.MAX_VALUE + headDelay;
488     }
489     return delay;
490 dl 1.50 }
491    
492     /**
493 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
494     * @throws NullPointerException {@inheritDoc}
495     */
496 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
497     long delay,
498 dl 1.13 TimeUnit unit) {
499 dl 1.9 if (command == null || unit == null)
500 dl 1.1 throw new NullPointerException();
501 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
502 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
503     triggerTime(delay, unit)));
504 dl 1.1 delayedExecute(t);
505     return t;
506     }
507 jsr166 1.52
508 dl 1.37 /**
509     * @throws RejectedExecutionException {@inheritDoc}
510     * @throws NullPointerException {@inheritDoc}
511     */
512 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
513     long delay,
514 dl 1.13 TimeUnit unit) {
515 dl 1.9 if (callable == null || unit == null)
516 dl 1.1 throw new NullPointerException();
517 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
518 jsr166 1.54 new ScheduledFutureTask<V>(callable,
519     triggerTime(delay, unit)));
520 dl 1.1 delayedExecute(t);
521     return t;
522     }
523    
524 dl 1.37 /**
525     * @throws RejectedExecutionException {@inheritDoc}
526     * @throws NullPointerException {@inheritDoc}
527     * @throws IllegalArgumentException {@inheritDoc}
528     */
529 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
530     long initialDelay,
531     long period,
532 dl 1.13 TimeUnit unit) {
533 dl 1.9 if (command == null || unit == null)
534 dl 1.1 throw new NullPointerException();
535     if (period <= 0)
536     throw new IllegalArgumentException();
537 jsr166 1.48 ScheduledFutureTask<Void> sft =
538     new ScheduledFutureTask<Void>(command,
539     null,
540 jsr166 1.54 triggerTime(initialDelay, unit),
541 jsr166 1.48 unit.toNanos(period));
542 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
543 jsr166 1.48 sft.outerTask = t;
544 dl 1.1 delayedExecute(t);
545     return t;
546     }
547 jsr166 1.21
548 dl 1.37 /**
549     * @throws RejectedExecutionException {@inheritDoc}
550     * @throws NullPointerException {@inheritDoc}
551     * @throws IllegalArgumentException {@inheritDoc}
552     */
553 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
554     long initialDelay,
555     long delay,
556 dl 1.13 TimeUnit unit) {
557 dl 1.9 if (command == null || unit == null)
558 dl 1.1 throw new NullPointerException();
559     if (delay <= 0)
560     throw new IllegalArgumentException();
561 jsr166 1.48 ScheduledFutureTask<Void> sft =
562     new ScheduledFutureTask<Void>(command,
563     null,
564 jsr166 1.54 triggerTime(initialDelay, unit),
565 jsr166 1.48 unit.toNanos(-delay));
566 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
567 jsr166 1.48 sft.outerTask = t;
568 dl 1.1 delayedExecute(t);
569     return t;
570     }
571 jsr166 1.21
572 dl 1.1 /**
573 jsr166 1.39 * Executes {@code command} with zero required delay.
574     * This has effect equivalent to
575     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
576     * Note that inspections of the queue and of the list returned by
577     * {@code shutdownNow} will access the zero-delayed
578     * {@link ScheduledFuture}, not the {@code command} itself.
579     *
580     * <p>A consequence of the use of {@code ScheduledFuture} objects is
581     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
582     * called with a null second {@code Throwable} argument, even if the
583     * {@code command} terminated abruptly. Instead, the {@code Throwable}
584     * thrown by such a task can be obtained via {@link Future#get}.
585 dl 1.1 *
586     * @throws RejectedExecutionException at discretion of
587 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
588     * cannot be accepted for execution because the
589     * executor has been shut down
590     * @throws NullPointerException {@inheritDoc}
591 dl 1.1 */
592     public void execute(Runnable command) {
593 jsr166 1.62 schedule(command, 0, NANOSECONDS);
594 dl 1.1 }
595    
596 dl 1.13 // Override AbstractExecutorService methods
597    
598 dl 1.37 /**
599     * @throws RejectedExecutionException {@inheritDoc}
600     * @throws NullPointerException {@inheritDoc}
601     */
602 dl 1.7 public Future<?> submit(Runnable task) {
603 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
604 dl 1.7 }
605    
606 dl 1.37 /**
607     * @throws RejectedExecutionException {@inheritDoc}
608     * @throws NullPointerException {@inheritDoc}
609     */
610 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
611 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
612 dl 1.7 }
613    
614 dl 1.37 /**
615     * @throws RejectedExecutionException {@inheritDoc}
616     * @throws NullPointerException {@inheritDoc}
617     */
618 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
619 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
620 dl 1.7 }
621 dl 1.1
622     /**
623 dl 1.37 * Sets the policy on whether to continue executing existing
624 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
625     * In this case, these tasks will only terminate upon
626     * {@code shutdownNow} or after setting the policy to
627     * {@code false} when already shutdown.
628     * This value is by default {@code false}.
629 jsr166 1.30 *
630 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
631 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
632 dl 1.1 */
633     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
634     continueExistingPeriodicTasksAfterShutdown = value;
635 jsr166 1.39 if (!value && isShutdown())
636 dl 1.37 onShutdown();
637 dl 1.1 }
638    
639     /**
640 jsr166 1.21 * Gets the policy on whether to continue executing existing
641 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
642     * In this case, these tasks will only terminate upon
643     * {@code shutdownNow} or after setting the policy to
644     * {@code false} when already shutdown.
645     * This value is by default {@code false}.
646 jsr166 1.30 *
647 jsr166 1.39 * @return {@code true} if will continue after shutdown
648 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
649 dl 1.1 */
650     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
651     return continueExistingPeriodicTasksAfterShutdown;
652     }
653    
654     /**
655 jsr166 1.21 * Sets the policy on whether to execute existing delayed
656 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
657     * In this case, these tasks will only terminate upon
658     * {@code shutdownNow}, or after setting the policy to
659     * {@code false} when already shutdown.
660     * This value is by default {@code true}.
661 jsr166 1.30 *
662 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
663 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
664 dl 1.1 */
665     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
666     executeExistingDelayedTasksAfterShutdown = value;
667 jsr166 1.39 if (!value && isShutdown())
668 dl 1.37 onShutdown();
669 dl 1.1 }
670    
671     /**
672 jsr166 1.21 * Gets the policy on whether to execute existing delayed
673 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
674     * In this case, these tasks will only terminate upon
675     * {@code shutdownNow}, or after setting the policy to
676     * {@code false} when already shutdown.
677     * This value is by default {@code true}.
678 jsr166 1.30 *
679 jsr166 1.39 * @return {@code true} if will execute after shutdown
680 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
681 dl 1.1 */
682     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
683     return executeExistingDelayedTasksAfterShutdown;
684     }
685    
686     /**
687 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
688     * removed from the work queue at time of cancellation. This value is
689     * by default {@code false}.
690 dl 1.41 *
691 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
692 dl 1.41 * @see #getRemoveOnCancelPolicy
693 jsr166 1.43 * @since 1.7
694 dl 1.41 */
695     public void setRemoveOnCancelPolicy(boolean value) {
696     removeOnCancel = value;
697     }
698    
699     /**
700 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
701     * removed from the work queue at time of cancellation. This value is
702     * by default {@code false}.
703 dl 1.41 *
704 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
705     * from the queue
706 dl 1.41 * @see #setRemoveOnCancelPolicy
707 jsr166 1.43 * @since 1.7
708 dl 1.41 */
709     public boolean getRemoveOnCancelPolicy() {
710     return removeOnCancel;
711     }
712    
713     /**
714 dl 1.1 * Initiates an orderly shutdown in which previously submitted
715 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
716     * Invocation has no additional effect if already shut down.
717     *
718     * <p>This method does not wait for previously submitted tasks to
719     * complete execution. Use {@link #awaitTermination awaitTermination}
720     * to do that.
721     *
722     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
723     * has been set {@code false}, existing delayed tasks whose delays
724     * have not yet elapsed are cancelled. And unless the {@code
725     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
726     * {@code true}, future executions of existing periodic tasks will
727     * be cancelled.
728 jsr166 1.39 *
729     * @throws SecurityException {@inheritDoc}
730 dl 1.1 */
731     public void shutdown() {
732     super.shutdown();
733     }
734    
735     /**
736     * Attempts to stop all actively executing tasks, halts the
737 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
738     * that were awaiting execution.
739 jsr166 1.21 *
740 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
741     * terminate. Use {@link #awaitTermination awaitTermination} to
742     * do that.
743     *
744 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
745 dl 1.18 * processing actively executing tasks. This implementation
746 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
747     * fails to respond to interrupts may never terminate.
748 dl 1.1 *
749 jsr166 1.39 * @return list of tasks that never commenced execution.
750     * Each element of this list is a {@link ScheduledFuture},
751     * including those tasks submitted using {@code execute},
752     * which are for scheduling purposes used as the basis of a
753     * zero-delay {@code ScheduledFuture}.
754 jsr166 1.31 * @throws SecurityException {@inheritDoc}
755 dl 1.1 */
756 tim 1.4 public List<Runnable> shutdownNow() {
757 dl 1.1 return super.shutdownNow();
758     }
759    
760     /**
761     * Returns the task queue used by this executor. Each element of
762     * this queue is a {@link ScheduledFuture}, including those
763 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
764 dl 1.1 * purposes used as the basis of a zero-delay
765 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
766 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
767 dl 1.1 * which they will execute.
768     *
769     * @return the task queue
770     */
771     public BlockingQueue<Runnable> getQueue() {
772     return super.getQueue();
773     }
774    
775 dl 1.13 /**
776 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
777     * class must be declared as a BlockingQueue<Runnable> even though
778 jsr166 1.42 * it can only hold RunnableScheduledFutures.
779 jsr166 1.21 */
780 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
781 dl 1.13 implements BlockingQueue<Runnable> {
782 jsr166 1.21
783 dl 1.40 /*
784     * A DelayedWorkQueue is based on a heap-based data structure
785     * like those in DelayQueue and PriorityQueue, except that
786     * every ScheduledFutureTask also records its index into the
787     * heap array. This eliminates the need to find a task upon
788     * cancellation, greatly speeding up removal (down from O(n)
789     * to O(log n)), and reducing garbage retention that would
790     * otherwise occur by waiting for the element to rise to top
791     * before clearing. But because the queue may also hold
792     * RunnableScheduledFutures that are not ScheduledFutureTasks,
793     * we are not guaranteed to have such indices available, in
794     * which case we fall back to linear search. (We expect that
795     * most tasks will not be decorated, and that the faster cases
796     * will be much more common.)
797     *
798     * All heap operations must record index changes -- mainly
799     * within siftUp and siftDown. Upon removal, a task's
800     * heapIndex is set to -1. Note that ScheduledFutureTasks can
801     * appear at most once in the queue (this need not be true for
802     * other kinds of tasks or work queues), so are uniquely
803     * identified by heapIndex.
804     */
805    
806 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
807 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
808     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
809 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
810 dl 1.40 private int size = 0;
811    
812 jsr166 1.48 /**
813     * Thread designated to wait for the task at the head of the
814     * queue. This variant of the Leader-Follower pattern
815     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
816     * minimize unnecessary timed waiting. When a thread becomes
817     * the leader, it waits only for the next delay to elapse, but
818     * other threads await indefinitely. The leader thread must
819     * signal some other thread before returning from take() or
820     * poll(...), unless some other thread becomes leader in the
821     * interim. Whenever the head of the queue is replaced with a
822     * task with an earlier expiration time, the leader field is
823     * invalidated by being reset to null, and some waiting
824     * thread, but not necessarily the current leader, is
825     * signalled. So waiting threads must be prepared to acquire
826     * and lose leadership while waiting.
827     */
828     private Thread leader = null;
829    
830     /**
831     * Condition signalled when a newer task becomes available at the
832     * head of the queue or a new thread may need to become leader.
833     */
834     private final Condition available = lock.newCondition();
835 dl 1.40
836     /**
837 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
838 dl 1.40 */
839 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
840 dl 1.40 if (f instanceof ScheduledFutureTask)
841     ((ScheduledFutureTask)f).heapIndex = idx;
842     }
843    
844     /**
845 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
846 dl 1.40 * Call only when holding lock.
847     */
848 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
849 dl 1.40 while (k > 0) {
850     int parent = (k - 1) >>> 1;
851 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
852 dl 1.40 if (key.compareTo(e) >= 0)
853     break;
854     queue[k] = e;
855     setIndex(e, k);
856     k = parent;
857     }
858     queue[k] = key;
859     setIndex(key, k);
860     }
861    
862     /**
863 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
864 dl 1.40 * Call only when holding lock.
865     */
866 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
867 jsr166 1.42 int half = size >>> 1;
868 dl 1.40 while (k < half) {
869 jsr166 1.42 int child = (k << 1) + 1;
870 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
871 dl 1.40 int right = child + 1;
872     if (right < size && c.compareTo(queue[right]) > 0)
873     c = queue[child = right];
874     if (key.compareTo(c) <= 0)
875     break;
876     queue[k] = c;
877     setIndex(c, k);
878     k = child;
879     }
880     queue[k] = key;
881     setIndex(key, k);
882     }
883    
884     /**
885 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
886 dl 1.40 */
887     private void grow() {
888     int oldCapacity = queue.length;
889     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
890     if (newCapacity < 0) // overflow
891     newCapacity = Integer.MAX_VALUE;
892     queue = Arrays.copyOf(queue, newCapacity);
893     }
894    
895     /**
896 jsr166 1.66 * Finds index of given object, or -1 if absent.
897 dl 1.40 */
898     private int indexOf(Object x) {
899     if (x != null) {
900 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
901     int i = ((ScheduledFutureTask) x).heapIndex;
902     // Sanity check; x could conceivably be a
903     // ScheduledFutureTask from some other pool.
904     if (i >= 0 && i < size && queue[i] == x)
905     return i;
906     } else {
907     for (int i = 0; i < size; i++)
908     if (x.equals(queue[i]))
909     return i;
910     }
911 dl 1.40 }
912     return -1;
913     }
914    
915 jsr166 1.48 public boolean contains(Object x) {
916     final ReentrantLock lock = this.lock;
917 jsr166 1.45 lock.lock();
918     try {
919 jsr166 1.48 return indexOf(x) != -1;
920 jsr166 1.45 } finally {
921     lock.unlock();
922     }
923 jsr166 1.48 }
924 jsr166 1.45
925 dl 1.40 public boolean remove(Object x) {
926     final ReentrantLock lock = this.lock;
927     lock.lock();
928     try {
929 jsr166 1.45 int i = indexOf(x);
930 jsr166 1.48 if (i < 0)
931     return false;
932 jsr166 1.45
933 jsr166 1.48 setIndex(queue[i], -1);
934     int s = --size;
935 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
936 jsr166 1.48 queue[s] = null;
937     if (s != i) {
938     siftDown(i, replacement);
939     if (queue[i] == replacement)
940     siftUp(i, replacement);
941     }
942     return true;
943 dl 1.40 } finally {
944     lock.unlock();
945     }
946     }
947    
948     public int size() {
949     final ReentrantLock lock = this.lock;
950     lock.lock();
951     try {
952 jsr166 1.45 return size;
953 dl 1.40 } finally {
954     lock.unlock();
955     }
956     }
957    
958 jsr166 1.42 public boolean isEmpty() {
959     return size() == 0;
960 dl 1.40 }
961    
962     public int remainingCapacity() {
963     return Integer.MAX_VALUE;
964     }
965    
966 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
967 dl 1.40 final ReentrantLock lock = this.lock;
968     lock.lock();
969     try {
970     return queue[0];
971     } finally {
972     lock.unlock();
973     }
974 dl 1.13 }
975    
976 dl 1.40 public boolean offer(Runnable x) {
977     if (x == null)
978     throw new NullPointerException();
979 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
980 dl 1.40 final ReentrantLock lock = this.lock;
981     lock.lock();
982     try {
983     int i = size;
984     if (i >= queue.length)
985     grow();
986     size = i + 1;
987     if (i == 0) {
988     queue[0] = e;
989     setIndex(e, 0);
990 jsr166 1.45 } else {
991 dl 1.40 siftUp(i, e);
992     }
993 jsr166 1.46 if (queue[0] == e) {
994 jsr166 1.48 leader = null;
995 jsr166 1.46 available.signal();
996 jsr166 1.48 }
997 dl 1.40 } finally {
998     lock.unlock();
999     }
1000     return true;
1001 jsr166 1.48 }
1002 dl 1.40
1003     public void put(Runnable e) {
1004     offer(e);
1005     }
1006    
1007     public boolean add(Runnable e) {
1008 jsr166 1.48 return offer(e);
1009     }
1010 dl 1.40
1011     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1012     return offer(e);
1013     }
1014 jsr166 1.42
1015 jsr166 1.46 /**
1016     * Performs common bookkeeping for poll and take: Replaces
1017 jsr166 1.47 * first element with last and sifts it down. Call only when
1018     * holding lock.
1019 jsr166 1.46 * @param f the task to remove and return
1020     */
1021 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1022 jsr166 1.46 int s = --size;
1023 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1024 jsr166 1.46 queue[s] = null;
1025     if (s != 0)
1026     siftDown(0, x);
1027     setIndex(f, -1);
1028     return f;
1029     }
1030    
1031 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1032 dl 1.40 final ReentrantLock lock = this.lock;
1033     lock.lock();
1034     try {
1035 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1036 jsr166 1.62 if (first == null || first.getDelay(NANOSECONDS) > 0)
1037 dl 1.40 return null;
1038 jsr166 1.42 else
1039 dl 1.40 return finishPoll(first);
1040     } finally {
1041     lock.unlock();
1042     }
1043     }
1044    
1045 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1046 dl 1.40 final ReentrantLock lock = this.lock;
1047     lock.lockInterruptibly();
1048     try {
1049     for (;;) {
1050 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1051 jsr166 1.42 if (first == null)
1052 dl 1.40 available.await();
1053     else {
1054 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1055 jsr166 1.48 if (delay <= 0)
1056     return finishPoll(first);
1057 jsr166 1.71 first = null; // don't retain ref while waiting
1058     if (leader != null)
1059 jsr166 1.48 available.await();
1060     else {
1061     Thread thisThread = Thread.currentThread();
1062     leader = thisThread;
1063     try {
1064     available.awaitNanos(delay);
1065     } finally {
1066     if (leader == thisThread)
1067     leader = null;
1068     }
1069     }
1070 dl 1.40 }
1071     }
1072     } finally {
1073 jsr166 1.48 if (leader == null && queue[0] != null)
1074     available.signal();
1075 dl 1.40 lock.unlock();
1076     }
1077     }
1078    
1079 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1080 dl 1.40 throws InterruptedException {
1081     long nanos = unit.toNanos(timeout);
1082     final ReentrantLock lock = this.lock;
1083     lock.lockInterruptibly();
1084     try {
1085     for (;;) {
1086 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1087 dl 1.40 if (first == null) {
1088     if (nanos <= 0)
1089     return null;
1090     else
1091     nanos = available.awaitNanos(nanos);
1092     } else {
1093 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1094 jsr166 1.48 if (delay <= 0)
1095 dl 1.40 return finishPoll(first);
1096 jsr166 1.48 if (nanos <= 0)
1097     return null;
1098 jsr166 1.71 first = null; // don't retain ref while waiting
1099 jsr166 1.48 if (nanos < delay || leader != null)
1100     nanos = available.awaitNanos(nanos);
1101     else {
1102     Thread thisThread = Thread.currentThread();
1103     leader = thisThread;
1104     try {
1105     long timeLeft = available.awaitNanos(delay);
1106     nanos -= delay - timeLeft;
1107     } finally {
1108     if (leader == thisThread)
1109     leader = null;
1110     }
1111     }
1112     }
1113     }
1114 dl 1.40 } finally {
1115 jsr166 1.48 if (leader == null && queue[0] != null)
1116     available.signal();
1117 dl 1.40 lock.unlock();
1118     }
1119     }
1120    
1121     public void clear() {
1122     final ReentrantLock lock = this.lock;
1123     lock.lock();
1124     try {
1125     for (int i = 0; i < size; i++) {
1126 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1127 dl 1.40 if (t != null) {
1128     queue[i] = null;
1129     setIndex(t, -1);
1130     }
1131     }
1132     size = 0;
1133     } finally {
1134     lock.unlock();
1135     }
1136 dl 1.13 }
1137 dl 1.40
1138     /**
1139 jsr166 1.66 * Returns first element only if it is expired.
1140 dl 1.40 * Used only by drainTo. Call only when holding lock.
1141     */
1142 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1143     // assert lock.isHeldByCurrentThread();
1144 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1145 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1146     null : first;
1147 dl 1.40 }
1148    
1149     public int drainTo(Collection<? super Runnable> c) {
1150     if (c == null)
1151     throw new NullPointerException();
1152     if (c == this)
1153     throw new IllegalArgumentException();
1154     final ReentrantLock lock = this.lock;
1155     lock.lock();
1156     try {
1157 jsr166 1.61 RunnableScheduledFuture<?> first;
1158 dl 1.40 int n = 0;
1159 jsr166 1.62 while ((first = peekExpired()) != null) {
1160     c.add(first); // In this order, in case add() throws.
1161     finishPoll(first);
1162 jsr166 1.48 ++n;
1163     }
1164 dl 1.40 return n;
1165     } finally {
1166     lock.unlock();
1167     }
1168 dl 1.13 }
1169    
1170 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1171 dl 1.40 if (c == null)
1172     throw new NullPointerException();
1173     if (c == this)
1174     throw new IllegalArgumentException();
1175     if (maxElements <= 0)
1176     return 0;
1177     final ReentrantLock lock = this.lock;
1178     lock.lock();
1179     try {
1180 jsr166 1.61 RunnableScheduledFuture<?> first;
1181 dl 1.40 int n = 0;
1182 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1183     c.add(first); // In this order, in case add() throws.
1184     finishPoll(first);
1185 jsr166 1.48 ++n;
1186     }
1187 dl 1.40 return n;
1188     } finally {
1189     lock.unlock();
1190     }
1191     }
1192    
1193     public Object[] toArray() {
1194     final ReentrantLock lock = this.lock;
1195     lock.lock();
1196     try {
1197 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1198 dl 1.40 } finally {
1199     lock.unlock();
1200     }
1201     }
1202    
1203 jsr166 1.48 @SuppressWarnings("unchecked")
1204 dl 1.40 public <T> T[] toArray(T[] a) {
1205     final ReentrantLock lock = this.lock;
1206     lock.lock();
1207     try {
1208     if (a.length < size)
1209     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1210     System.arraycopy(queue, 0, a, 0, size);
1211     if (a.length > size)
1212     a[size] = null;
1213     return a;
1214     } finally {
1215     lock.unlock();
1216     }
1217 dl 1.13 }
1218    
1219 jsr166 1.21 public Iterator<Runnable> iterator() {
1220 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1221 dl 1.40 }
1222 jsr166 1.42
1223 dl 1.40 /**
1224     * Snapshot iterator that works off copy of underlying q array.
1225     */
1226     private class Itr implements Iterator<Runnable> {
1227 jsr166 1.45 final RunnableScheduledFuture[] array;
1228 jsr166 1.48 int cursor = 0; // index of next element to return
1229     int lastRet = -1; // index of last element, or -1 if no such
1230 jsr166 1.42
1231 jsr166 1.45 Itr(RunnableScheduledFuture[] array) {
1232 dl 1.40 this.array = array;
1233     }
1234 jsr166 1.42
1235 dl 1.40 public boolean hasNext() {
1236     return cursor < array.length;
1237     }
1238 jsr166 1.42
1239 dl 1.40 public Runnable next() {
1240     if (cursor >= array.length)
1241     throw new NoSuchElementException();
1242     lastRet = cursor;
1243 jsr166 1.45 return array[cursor++];
1244 dl 1.40 }
1245 jsr166 1.42
1246 dl 1.40 public void remove() {
1247     if (lastRet < 0)
1248     throw new IllegalStateException();
1249     DelayedWorkQueue.this.remove(array[lastRet]);
1250     lastRet = -1;
1251     }
1252 dl 1.13 }
1253     }
1254 dl 1.1 }