ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.75
Committed: Sun Feb 16 14:03:53 2014 UTC (10 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.74: +28 -27 lines
Log Message:
tidy internal documentation

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9     import java.util.concurrent.atomic.AtomicLong;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12 dl 1.1 import java.util.*;
13    
14     /**
15 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
16 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
17     * This class is preferable to {@link java.util.Timer} when multiple
18     * worker threads are needed, or when the additional flexibility or
19     * capabilities of {@link ThreadPoolExecutor} (which this class
20     * extends) are required.
21 dl 1.1 *
22 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
23 dl 1.18 * without any real-time guarantees about when, after they are
24     * enabled, they will commence. Tasks scheduled for exactly the same
25     * execution time are enabled in first-in-first-out (FIFO) order of
26 jsr166 1.46 * submission.
27     *
28     * <p>When a submitted task is cancelled before it is run, execution
29 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
30     * automatically removed from the work queue until its delay elapses.
31     * While this enables further inspection and monitoring, it may also
32     * cause unbounded retention of cancelled tasks. To avoid this, use
33     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
34     * removed from the work queue at time of cancellation.
35 dl 1.1 *
36 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
37     * {@link #scheduleAtFixedRate} or
38     * {@link #scheduleWithFixedDelay} do not overlap. While different
39 dl 1.51 * executions may be performed by different threads, the effects of
40     * prior executions <a
41     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
42     * those of subsequent ones.
43     *
44 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
45 dl 1.8 * of the inherited tuning methods are not useful for it. In
46 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
47     * {@code corePoolSize} threads and an unbounded queue, adjustments
48     * to {@code maximumPoolSize} have no useful effect. Additionally, it
49     * is almost never a good idea to set {@code corePoolSize} to zero or
50     * use {@code allowCoreThreadTimeOut} because this may leave the pool
51     * without threads to handle tasks once they become eligible to run.
52 dl 1.1 *
53 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
54 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
55 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
56     * methods to generate internal {@link ScheduledFuture} objects to
57     * control per-task delays and scheduling. To preserve
58     * functionality, any further overrides of these methods in
59 dl 1.32 * subclasses must invoke superclass versions, which effectively
60 jsr166 1.39 * disables additional task customization. However, this class
61 dl 1.32 * provides alternative protected extension method
62 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
63     * {@code Callable}) that can be used to customize the concrete task
64     * types used to execute commands entered via {@code execute},
65     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
66     * and {@code scheduleWithFixedDelay}. By default, a
67     * {@code ScheduledThreadPoolExecutor} uses a task type extending
68 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
69     * subclasses of the form:
70     *
71 jsr166 1.39 * <pre> {@code
72 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
73     *
74 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
75 dl 1.23 *
76 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
77     * Runnable r, RunnableScheduledFuture<V> task) {
78     * return new CustomTask<V>(r, task);
79 jsr166 1.29 * }
80 dl 1.23 *
81 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
82     * Callable<V> c, RunnableScheduledFuture<V> task) {
83     * return new CustomTask<V>(c, task);
84 jsr166 1.29 * }
85     * // ... add constructors, etc.
86 jsr166 1.39 * }}</pre>
87     *
88 dl 1.1 * @since 1.5
89     * @author Doug Lea
90     */
91 jsr166 1.21 public class ScheduledThreadPoolExecutor
92     extends ThreadPoolExecutor
93 tim 1.3 implements ScheduledExecutorService {
94 dl 1.1
95 dl 1.37 /*
96     * This class specializes ThreadPoolExecutor implementation by
97     *
98     * 1. Using a custom task type, ScheduledFutureTask for
99     * tasks, even those that don't require scheduling (i.e.,
100     * those submitted using ExecutorService execute, not
101     * ScheduledExecutorService methods) which are treated as
102     * delayed tasks with a delay of zero.
103     *
104 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
105 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
106     * the fact that corePoolSize and maximumPoolSize are
107     * effectively identical simplifies some execution mechanics
108 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
109 dl 1.37 *
110     * 3. Supporting optional run-after-shutdown parameters, which
111     * leads to overrides of shutdown methods to remove and cancel
112     * tasks that should NOT be run after shutdown, as well as
113     * different recheck logic when task (re)submission overlaps
114     * with a shutdown.
115     *
116     * 4. Task decoration methods to allow interception and
117     * instrumentation, which are needed because subclasses cannot
118     * otherwise override submit methods to get this effect. These
119     * don't have any impact on pool control logic though.
120     */
121    
122 dl 1.1 /**
123     * False if should cancel/suppress periodic tasks on shutdown.
124     */
125     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
126    
127     /**
128     * False if should cancel non-periodic tasks on shutdown.
129     */
130     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
131    
132     /**
133 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
134 dl 1.41 */
135     private volatile boolean removeOnCancel = false;
136    
137     /**
138 dl 1.1 * Sequence number to break scheduling ties, and in turn to
139     * guarantee FIFO order among tied entries.
140     */
141 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
142 dl 1.14
143     /**
144 jsr166 1.39 * Returns current nanosecond time.
145 dl 1.14 */
146 jsr166 1.54 final long now() {
147     return System.nanoTime();
148 dl 1.14 }
149    
150 jsr166 1.21 private class ScheduledFutureTask<V>
151 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152 jsr166 1.21
153 dl 1.1 /** Sequence number to break ties FIFO */
154     private final long sequenceNumber;
155 jsr166 1.44
156 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
157     private long time;
158 jsr166 1.44
159 dl 1.16 /**
160 jsr166 1.75 * Period in nanoseconds for repeating tasks.
161     * A positive value indicates fixed-rate execution.
162     * A negative value indicates fixed-delay execution.
163     * A value of 0 indicates a non-repeating (one-shot) task.
164 dl 1.16 */
165 dl 1.1 private final long period;
166    
167 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
168     RunnableScheduledFuture<V> outerTask = this;
169 jsr166 1.44
170 dl 1.1 /**
171 dl 1.40 * Index into delay queue, to support faster cancellation.
172     */
173     int heapIndex;
174    
175     /**
176 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
177 dl 1.1 */
178 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
179 dl 1.1 super(r, result);
180 jsr166 1.75 this.time = triggerTime;
181 dl 1.1 this.period = 0;
182     this.sequenceNumber = sequencer.getAndIncrement();
183     }
184    
185     /**
186 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
187     * trigger time and period.
188 dl 1.1 */
189 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
190     long period) {
191 dl 1.1 super(r, result);
192 jsr166 1.75 this.time = triggerTime;
193 dl 1.1 this.period = period;
194     this.sequenceNumber = sequencer.getAndIncrement();
195     }
196    
197     /**
198 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
199 dl 1.1 */
200 jsr166 1.75 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
201 dl 1.1 super(callable);
202 jsr166 1.75 this.time = triggerTime;
203 dl 1.1 this.period = 0;
204     this.sequenceNumber = sequencer.getAndIncrement();
205     }
206    
207     public long getDelay(TimeUnit unit) {
208 jsr166 1.62 return unit.convert(time - now(), NANOSECONDS);
209 dl 1.1 }
210    
211 dl 1.20 public int compareTo(Delayed other) {
212 dl 1.59 if (other == this) // compare zero if same object
213 dl 1.1 return 0;
214 dl 1.34 if (other instanceof ScheduledFutureTask) {
215     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
216     long diff = time - x.time;
217     if (diff < 0)
218     return -1;
219     else if (diff > 0)
220     return 1;
221     else if (sequenceNumber < x.sequenceNumber)
222     return -1;
223     else
224     return 1;
225     }
226 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
227 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
228 dl 1.1 }
229    
230     /**
231 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
232 jsr166 1.30 *
233 jsr166 1.70 * @return {@code true} if periodic
234 dl 1.1 */
235 dl 1.23 public boolean isPeriodic() {
236 dl 1.16 return period != 0;
237 dl 1.1 }
238    
239     /**
240 jsr166 1.39 * Sets the next time to run for a periodic task.
241 dl 1.13 */
242 dl 1.37 private void setNextRunTime() {
243     long p = period;
244     if (p > 0)
245     time += p;
246     else
247 jsr166 1.54 time = triggerTime(-p);
248 dl 1.13 }
249    
250 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
251 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
252     if (cancelled && removeOnCancel && heapIndex >= 0)
253 jsr166 1.42 remove(this);
254 dl 1.41 return cancelled;
255 dl 1.40 }
256    
257 dl 1.13 /**
258 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
259 jsr166 1.21 */
260 dl 1.1 public void run() {
261 dl 1.37 boolean periodic = isPeriodic();
262     if (!canRunInCurrentRunState(periodic))
263     cancel(false);
264     else if (!periodic)
265 dl 1.5 ScheduledFutureTask.super.run();
266 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
267     setNextRunTime();
268 jsr166 1.44 reExecutePeriodic(outerTask);
269 dl 1.37 }
270 dl 1.1 }
271     }
272    
273     /**
274 dl 1.37 * Returns true if can run a task given current run state
275 jsr166 1.39 * and run-after-shutdown parameters.
276     *
277 dl 1.37 * @param periodic true if this task periodic, false if delayed
278     */
279     boolean canRunInCurrentRunState(boolean periodic) {
280 jsr166 1.38 return isRunningOrShutdown(periodic ?
281 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
282     executeExistingDelayedTasksAfterShutdown);
283     }
284    
285     /**
286     * Main execution method for delayed or periodic tasks. If pool
287     * is shut down, rejects the task. Otherwise adds task to queue
288     * and starts a thread, if necessary, to run it. (We cannot
289     * prestart the thread to run the task because the task (probably)
290 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
291 dl 1.37 * is being added, cancel and remove it if required by state and
292 jsr166 1.39 * run-after-shutdown parameters.
293     *
294 dl 1.37 * @param task the task
295     */
296     private void delayedExecute(RunnableScheduledFuture<?> task) {
297     if (isShutdown())
298     reject(task);
299     else {
300     super.getQueue().add(task);
301     if (isShutdown() &&
302     !canRunInCurrentRunState(task.isPeriodic()) &&
303     remove(task))
304     task.cancel(false);
305 jsr166 1.48 else
306 dl 1.63 ensurePrestart();
307 dl 1.37 }
308     }
309 jsr166 1.21
310 dl 1.37 /**
311 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
312     * Same idea as delayedExecute except drops task rather than rejecting.
313     *
314 dl 1.37 * @param task the task
315     */
316     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
317     if (canRunInCurrentRunState(true)) {
318     super.getQueue().add(task);
319     if (!canRunInCurrentRunState(true) && remove(task))
320     task.cancel(false);
321 jsr166 1.48 else
322 dl 1.63 ensurePrestart();
323 dl 1.37 }
324 dl 1.13 }
325 dl 1.1
326 dl 1.13 /**
327 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
328 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
329 dl 1.13 */
330 dl 1.37 @Override void onShutdown() {
331     BlockingQueue<Runnable> q = super.getQueue();
332     boolean keepDelayed =
333     getExecuteExistingDelayedTasksAfterShutdownPolicy();
334     boolean keepPeriodic =
335     getContinueExistingPeriodicTasksAfterShutdownPolicy();
336 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
337     for (Object e : q.toArray())
338     if (e instanceof RunnableScheduledFuture<?>)
339     ((RunnableScheduledFuture<?>) e).cancel(false);
340 dl 1.37 q.clear();
341 jsr166 1.57 }
342 dl 1.37 else {
343     // Traverse snapshot to avoid iterator exceptions
344 jsr166 1.39 for (Object e : q.toArray()) {
345 dl 1.23 if (e instanceof RunnableScheduledFuture) {
346 dl 1.37 RunnableScheduledFuture<?> t =
347     (RunnableScheduledFuture<?>)e;
348 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
349     t.isCancelled()) { // also remove if already cancelled
350     if (q.remove(t))
351     t.cancel(false);
352     }
353 dl 1.13 }
354     }
355 dl 1.1 }
356 jsr166 1.48 tryTerminate();
357 dl 1.1 }
358    
359 dl 1.23 /**
360 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
361 jsr166 1.28 * This method can be used to override the concrete
362 dl 1.23 * class used for managing internal tasks.
363 jsr166 1.30 * The default implementation simply returns the given task.
364 jsr166 1.28 *
365 dl 1.23 * @param runnable the submitted Runnable
366     * @param task the task created to execute the runnable
367 jsr166 1.72 * @param <V> the type of the task's result
368 dl 1.23 * @return a task that can execute the runnable
369     * @since 1.6
370     */
371 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
372 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
373     return task;
374 peierls 1.22 }
375    
376 dl 1.23 /**
377 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
378 jsr166 1.28 * This method can be used to override the concrete
379 dl 1.23 * class used for managing internal tasks.
380 jsr166 1.30 * The default implementation simply returns the given task.
381 jsr166 1.28 *
382 dl 1.23 * @param callable the submitted Callable
383     * @param task the task created to execute the callable
384 jsr166 1.72 * @param <V> the type of the task's result
385 dl 1.23 * @return a task that can execute the callable
386     * @since 1.6
387     */
388 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
389 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
390     return task;
391 dl 1.19 }
392    
393 dl 1.1 /**
394 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
395     * given core pool size.
396 jsr166 1.21 *
397 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
398     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
399     * @throws IllegalArgumentException if {@code corePoolSize < 0}
400 dl 1.1 */
401     public ScheduledThreadPoolExecutor(int corePoolSize) {
402 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
403 dl 1.1 new DelayedWorkQueue());
404     }
405    
406     /**
407 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
408     * given initial parameters.
409 jsr166 1.21 *
410 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
411     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
412 dl 1.1 * @param threadFactory the factory to use when the executor
413 jsr166 1.39 * creates a new thread
414     * @throws IllegalArgumentException if {@code corePoolSize < 0}
415     * @throws NullPointerException if {@code threadFactory} is null
416 dl 1.1 */
417     public ScheduledThreadPoolExecutor(int corePoolSize,
418 jsr166 1.56 ThreadFactory threadFactory) {
419 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
420 dl 1.1 new DelayedWorkQueue(), threadFactory);
421     }
422    
423     /**
424 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
425     * initial parameters.
426 jsr166 1.21 *
427 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
428     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
429 dl 1.1 * @param handler the handler to use when execution is blocked
430 jsr166 1.39 * because the thread bounds and queue capacities are reached
431     * @throws IllegalArgumentException if {@code corePoolSize < 0}
432     * @throws NullPointerException if {@code handler} is null
433 dl 1.1 */
434     public ScheduledThreadPoolExecutor(int corePoolSize,
435 jsr166 1.56 RejectedExecutionHandler handler) {
436 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
437 dl 1.1 new DelayedWorkQueue(), handler);
438     }
439    
440     /**
441 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
442     * initial parameters.
443 jsr166 1.21 *
444 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
445     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
446 dl 1.1 * @param threadFactory the factory to use when the executor
447 jsr166 1.39 * creates a new thread
448 dl 1.1 * @param handler the handler to use when execution is blocked
449 jsr166 1.39 * because the thread bounds and queue capacities are reached
450     * @throws IllegalArgumentException if {@code corePoolSize < 0}
451     * @throws NullPointerException if {@code threadFactory} or
452     * {@code handler} is null
453 dl 1.1 */
454     public ScheduledThreadPoolExecutor(int corePoolSize,
455 jsr166 1.56 ThreadFactory threadFactory,
456     RejectedExecutionHandler handler) {
457 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
458 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
459     }
460    
461 dl 1.37 /**
462 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
463 jsr166 1.54 */
464     private long triggerTime(long delay, TimeUnit unit) {
465     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
466     }
467    
468     /**
469 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
470 dl 1.50 */
471 jsr166 1.54 long triggerTime(long delay) {
472     return now() +
473     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
474     }
475    
476     /**
477     * Constrains the values of all delays in the queue to be within
478     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
479     * This may occur if a task is eligible to be dequeued, but has
480     * not yet been, while some other task is added with a delay of
481     * Long.MAX_VALUE.
482     */
483     private long overflowFree(long delay) {
484     Delayed head = (Delayed) super.getQueue().peek();
485     if (head != null) {
486 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
487 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
488     delay = Long.MAX_VALUE + headDelay;
489     }
490     return delay;
491 dl 1.50 }
492    
493     /**
494 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
495     * @throws NullPointerException {@inheritDoc}
496     */
497 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
498     long delay,
499 dl 1.13 TimeUnit unit) {
500 dl 1.9 if (command == null || unit == null)
501 dl 1.1 throw new NullPointerException();
502 peierls 1.22 RunnableScheduledFuture<?> t = decorateTask(command,
503 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
504     triggerTime(delay, unit)));
505 dl 1.1 delayedExecute(t);
506     return t;
507     }
508 jsr166 1.52
509 dl 1.37 /**
510     * @throws RejectedExecutionException {@inheritDoc}
511     * @throws NullPointerException {@inheritDoc}
512     */
513 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
514     long delay,
515 dl 1.13 TimeUnit unit) {
516 dl 1.9 if (callable == null || unit == null)
517 dl 1.1 throw new NullPointerException();
518 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
519 jsr166 1.54 new ScheduledFutureTask<V>(callable,
520     triggerTime(delay, unit)));
521 dl 1.1 delayedExecute(t);
522     return t;
523     }
524    
525 dl 1.37 /**
526     * @throws RejectedExecutionException {@inheritDoc}
527     * @throws NullPointerException {@inheritDoc}
528     * @throws IllegalArgumentException {@inheritDoc}
529     */
530 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
531     long initialDelay,
532     long period,
533 dl 1.13 TimeUnit unit) {
534 dl 1.9 if (command == null || unit == null)
535 dl 1.1 throw new NullPointerException();
536     if (period <= 0)
537     throw new IllegalArgumentException();
538 jsr166 1.48 ScheduledFutureTask<Void> sft =
539     new ScheduledFutureTask<Void>(command,
540     null,
541 jsr166 1.54 triggerTime(initialDelay, unit),
542 jsr166 1.48 unit.toNanos(period));
543 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
544 jsr166 1.48 sft.outerTask = t;
545 dl 1.1 delayedExecute(t);
546     return t;
547     }
548 jsr166 1.21
549 dl 1.37 /**
550     * @throws RejectedExecutionException {@inheritDoc}
551     * @throws NullPointerException {@inheritDoc}
552     * @throws IllegalArgumentException {@inheritDoc}
553     */
554 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
555     long initialDelay,
556     long delay,
557 dl 1.13 TimeUnit unit) {
558 dl 1.9 if (command == null || unit == null)
559 dl 1.1 throw new NullPointerException();
560     if (delay <= 0)
561     throw new IllegalArgumentException();
562 jsr166 1.48 ScheduledFutureTask<Void> sft =
563     new ScheduledFutureTask<Void>(command,
564     null,
565 jsr166 1.54 triggerTime(initialDelay, unit),
566 jsr166 1.48 unit.toNanos(-delay));
567 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
568 jsr166 1.48 sft.outerTask = t;
569 dl 1.1 delayedExecute(t);
570     return t;
571     }
572 jsr166 1.21
573 dl 1.1 /**
574 jsr166 1.39 * Executes {@code command} with zero required delay.
575     * This has effect equivalent to
576     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
577     * Note that inspections of the queue and of the list returned by
578     * {@code shutdownNow} will access the zero-delayed
579     * {@link ScheduledFuture}, not the {@code command} itself.
580     *
581     * <p>A consequence of the use of {@code ScheduledFuture} objects is
582     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
583     * called with a null second {@code Throwable} argument, even if the
584     * {@code command} terminated abruptly. Instead, the {@code Throwable}
585     * thrown by such a task can be obtained via {@link Future#get}.
586 dl 1.1 *
587     * @throws RejectedExecutionException at discretion of
588 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
589     * cannot be accepted for execution because the
590     * executor has been shut down
591     * @throws NullPointerException {@inheritDoc}
592 dl 1.1 */
593     public void execute(Runnable command) {
594 jsr166 1.62 schedule(command, 0, NANOSECONDS);
595 dl 1.1 }
596    
597 dl 1.13 // Override AbstractExecutorService methods
598    
599 dl 1.37 /**
600     * @throws RejectedExecutionException {@inheritDoc}
601     * @throws NullPointerException {@inheritDoc}
602     */
603 dl 1.7 public Future<?> submit(Runnable task) {
604 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
605 dl 1.7 }
606    
607 dl 1.37 /**
608     * @throws RejectedExecutionException {@inheritDoc}
609     * @throws NullPointerException {@inheritDoc}
610     */
611 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
612 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
613 dl 1.7 }
614    
615 dl 1.37 /**
616     * @throws RejectedExecutionException {@inheritDoc}
617     * @throws NullPointerException {@inheritDoc}
618     */
619 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
620 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
621 dl 1.7 }
622 dl 1.1
623     /**
624 dl 1.37 * Sets the policy on whether to continue executing existing
625 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
626     * In this case, these tasks will only terminate upon
627     * {@code shutdownNow} or after setting the policy to
628     * {@code false} when already shutdown.
629     * This value is by default {@code false}.
630 jsr166 1.30 *
631 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
632 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
633 dl 1.1 */
634     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
635     continueExistingPeriodicTasksAfterShutdown = value;
636 jsr166 1.39 if (!value && isShutdown())
637 dl 1.37 onShutdown();
638 dl 1.1 }
639    
640     /**
641 jsr166 1.21 * Gets the policy on whether to continue executing existing
642 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
643     * In this case, these tasks will only terminate upon
644     * {@code shutdownNow} or after setting the policy to
645     * {@code false} when already shutdown.
646     * This value is by default {@code false}.
647 jsr166 1.30 *
648 jsr166 1.39 * @return {@code true} if will continue after shutdown
649 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
650 dl 1.1 */
651     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
652     return continueExistingPeriodicTasksAfterShutdown;
653     }
654    
655     /**
656 jsr166 1.21 * Sets the policy on whether to execute existing delayed
657 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
658     * In this case, these tasks will only terminate upon
659     * {@code shutdownNow}, or after setting the policy to
660     * {@code false} when already shutdown.
661     * This value is by default {@code true}.
662 jsr166 1.30 *
663 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
664 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
665 dl 1.1 */
666     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
667     executeExistingDelayedTasksAfterShutdown = value;
668 jsr166 1.39 if (!value && isShutdown())
669 dl 1.37 onShutdown();
670 dl 1.1 }
671    
672     /**
673 jsr166 1.21 * Gets the policy on whether to execute existing delayed
674 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
675     * In this case, these tasks will only terminate upon
676     * {@code shutdownNow}, or after setting the policy to
677     * {@code false} when already shutdown.
678     * This value is by default {@code true}.
679 jsr166 1.30 *
680 jsr166 1.39 * @return {@code true} if will execute after shutdown
681 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
682 dl 1.1 */
683     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
684     return executeExistingDelayedTasksAfterShutdown;
685     }
686    
687     /**
688 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
689     * removed from the work queue at time of cancellation. This value is
690     * by default {@code false}.
691 dl 1.41 *
692 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
693 dl 1.41 * @see #getRemoveOnCancelPolicy
694 jsr166 1.43 * @since 1.7
695 dl 1.41 */
696     public void setRemoveOnCancelPolicy(boolean value) {
697     removeOnCancel = value;
698     }
699    
700     /**
701 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
702     * removed from the work queue at time of cancellation. This value is
703     * by default {@code false}.
704 dl 1.41 *
705 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
706     * from the queue
707 dl 1.41 * @see #setRemoveOnCancelPolicy
708 jsr166 1.43 * @since 1.7
709 dl 1.41 */
710     public boolean getRemoveOnCancelPolicy() {
711     return removeOnCancel;
712     }
713    
714     /**
715 dl 1.1 * Initiates an orderly shutdown in which previously submitted
716 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
717     * Invocation has no additional effect if already shut down.
718     *
719     * <p>This method does not wait for previously submitted tasks to
720     * complete execution. Use {@link #awaitTermination awaitTermination}
721     * to do that.
722     *
723     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
724     * has been set {@code false}, existing delayed tasks whose delays
725     * have not yet elapsed are cancelled. And unless the {@code
726     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
727     * {@code true}, future executions of existing periodic tasks will
728     * be cancelled.
729 jsr166 1.39 *
730     * @throws SecurityException {@inheritDoc}
731 dl 1.1 */
732     public void shutdown() {
733     super.shutdown();
734     }
735    
736     /**
737     * Attempts to stop all actively executing tasks, halts the
738 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
739     * that were awaiting execution.
740 jsr166 1.21 *
741 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
742     * terminate. Use {@link #awaitTermination awaitTermination} to
743     * do that.
744     *
745 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
746 dl 1.18 * processing actively executing tasks. This implementation
747 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
748     * fails to respond to interrupts may never terminate.
749 dl 1.1 *
750 jsr166 1.39 * @return list of tasks that never commenced execution.
751     * Each element of this list is a {@link ScheduledFuture},
752     * including those tasks submitted using {@code execute},
753     * which are for scheduling purposes used as the basis of a
754     * zero-delay {@code ScheduledFuture}.
755 jsr166 1.31 * @throws SecurityException {@inheritDoc}
756 dl 1.1 */
757 tim 1.4 public List<Runnable> shutdownNow() {
758 dl 1.1 return super.shutdownNow();
759     }
760    
761     /**
762     * Returns the task queue used by this executor. Each element of
763     * this queue is a {@link ScheduledFuture}, including those
764 jsr166 1.39 * tasks submitted using {@code execute} which are for scheduling
765 dl 1.1 * purposes used as the basis of a zero-delay
766 jsr166 1.39 * {@code ScheduledFuture}. Iteration over this queue is
767 dl 1.15 * <em>not</em> guaranteed to traverse tasks in the order in
768 dl 1.1 * which they will execute.
769     *
770     * @return the task queue
771     */
772     public BlockingQueue<Runnable> getQueue() {
773     return super.getQueue();
774     }
775    
776 dl 1.13 /**
777 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
778     * class must be declared as a BlockingQueue<Runnable> even though
779 jsr166 1.42 * it can only hold RunnableScheduledFutures.
780 jsr166 1.21 */
781 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
782 dl 1.13 implements BlockingQueue<Runnable> {
783 jsr166 1.21
784 dl 1.40 /*
785     * A DelayedWorkQueue is based on a heap-based data structure
786     * like those in DelayQueue and PriorityQueue, except that
787     * every ScheduledFutureTask also records its index into the
788     * heap array. This eliminates the need to find a task upon
789     * cancellation, greatly speeding up removal (down from O(n)
790     * to O(log n)), and reducing garbage retention that would
791     * otherwise occur by waiting for the element to rise to top
792     * before clearing. But because the queue may also hold
793     * RunnableScheduledFutures that are not ScheduledFutureTasks,
794     * we are not guaranteed to have such indices available, in
795     * which case we fall back to linear search. (We expect that
796     * most tasks will not be decorated, and that the faster cases
797     * will be much more common.)
798     *
799     * All heap operations must record index changes -- mainly
800     * within siftUp and siftDown. Upon removal, a task's
801     * heapIndex is set to -1. Note that ScheduledFutureTasks can
802     * appear at most once in the queue (this need not be true for
803     * other kinds of tasks or work queues), so are uniquely
804     * identified by heapIndex.
805     */
806    
807 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
808 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
809     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
810 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
811 dl 1.40 private int size = 0;
812    
813 jsr166 1.48 /**
814     * Thread designated to wait for the task at the head of the
815     * queue. This variant of the Leader-Follower pattern
816     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
817     * minimize unnecessary timed waiting. When a thread becomes
818     * the leader, it waits only for the next delay to elapse, but
819     * other threads await indefinitely. The leader thread must
820     * signal some other thread before returning from take() or
821     * poll(...), unless some other thread becomes leader in the
822     * interim. Whenever the head of the queue is replaced with a
823     * task with an earlier expiration time, the leader field is
824     * invalidated by being reset to null, and some waiting
825     * thread, but not necessarily the current leader, is
826     * signalled. So waiting threads must be prepared to acquire
827     * and lose leadership while waiting.
828     */
829     private Thread leader = null;
830    
831     /**
832     * Condition signalled when a newer task becomes available at the
833     * head of the queue or a new thread may need to become leader.
834     */
835     private final Condition available = lock.newCondition();
836 dl 1.40
837     /**
838 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
839 dl 1.40 */
840 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
841 dl 1.40 if (f instanceof ScheduledFutureTask)
842     ((ScheduledFutureTask)f).heapIndex = idx;
843     }
844    
845     /**
846 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
847 dl 1.40 * Call only when holding lock.
848     */
849 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
850 dl 1.40 while (k > 0) {
851     int parent = (k - 1) >>> 1;
852 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
853 dl 1.40 if (key.compareTo(e) >= 0)
854     break;
855     queue[k] = e;
856     setIndex(e, k);
857     k = parent;
858     }
859     queue[k] = key;
860     setIndex(key, k);
861     }
862    
863     /**
864 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
865 dl 1.40 * Call only when holding lock.
866     */
867 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
868 jsr166 1.42 int half = size >>> 1;
869 dl 1.40 while (k < half) {
870 jsr166 1.42 int child = (k << 1) + 1;
871 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
872 dl 1.40 int right = child + 1;
873     if (right < size && c.compareTo(queue[right]) > 0)
874     c = queue[child = right];
875     if (key.compareTo(c) <= 0)
876     break;
877     queue[k] = c;
878     setIndex(c, k);
879     k = child;
880     }
881     queue[k] = key;
882     setIndex(key, k);
883     }
884    
885     /**
886 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
887 dl 1.40 */
888     private void grow() {
889     int oldCapacity = queue.length;
890     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
891     if (newCapacity < 0) // overflow
892     newCapacity = Integer.MAX_VALUE;
893     queue = Arrays.copyOf(queue, newCapacity);
894     }
895    
896     /**
897 jsr166 1.66 * Finds index of given object, or -1 if absent.
898 dl 1.40 */
899     private int indexOf(Object x) {
900     if (x != null) {
901 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
902     int i = ((ScheduledFutureTask) x).heapIndex;
903     // Sanity check; x could conceivably be a
904     // ScheduledFutureTask from some other pool.
905     if (i >= 0 && i < size && queue[i] == x)
906     return i;
907     } else {
908     for (int i = 0; i < size; i++)
909     if (x.equals(queue[i]))
910     return i;
911     }
912 dl 1.40 }
913     return -1;
914     }
915    
916 jsr166 1.48 public boolean contains(Object x) {
917     final ReentrantLock lock = this.lock;
918 jsr166 1.45 lock.lock();
919     try {
920 jsr166 1.48 return indexOf(x) != -1;
921 jsr166 1.45 } finally {
922     lock.unlock();
923     }
924 jsr166 1.48 }
925 jsr166 1.45
926 dl 1.40 public boolean remove(Object x) {
927     final ReentrantLock lock = this.lock;
928     lock.lock();
929     try {
930 jsr166 1.45 int i = indexOf(x);
931 jsr166 1.48 if (i < 0)
932     return false;
933 jsr166 1.45
934 jsr166 1.48 setIndex(queue[i], -1);
935     int s = --size;
936 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
937 jsr166 1.48 queue[s] = null;
938     if (s != i) {
939     siftDown(i, replacement);
940     if (queue[i] == replacement)
941     siftUp(i, replacement);
942     }
943     return true;
944 dl 1.40 } finally {
945     lock.unlock();
946     }
947     }
948    
949     public int size() {
950     final ReentrantLock lock = this.lock;
951     lock.lock();
952     try {
953 jsr166 1.45 return size;
954 dl 1.40 } finally {
955     lock.unlock();
956     }
957     }
958    
959 jsr166 1.42 public boolean isEmpty() {
960     return size() == 0;
961 dl 1.40 }
962    
963     public int remainingCapacity() {
964     return Integer.MAX_VALUE;
965     }
966    
967 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
968 dl 1.40 final ReentrantLock lock = this.lock;
969     lock.lock();
970     try {
971     return queue[0];
972     } finally {
973     lock.unlock();
974     }
975 dl 1.13 }
976    
977 dl 1.40 public boolean offer(Runnable x) {
978     if (x == null)
979     throw new NullPointerException();
980 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
981 dl 1.40 final ReentrantLock lock = this.lock;
982     lock.lock();
983     try {
984     int i = size;
985     if (i >= queue.length)
986     grow();
987     size = i + 1;
988     if (i == 0) {
989     queue[0] = e;
990     setIndex(e, 0);
991 jsr166 1.45 } else {
992 dl 1.40 siftUp(i, e);
993     }
994 jsr166 1.46 if (queue[0] == e) {
995 jsr166 1.48 leader = null;
996 jsr166 1.46 available.signal();
997 jsr166 1.48 }
998 dl 1.40 } finally {
999     lock.unlock();
1000     }
1001     return true;
1002 jsr166 1.48 }
1003 dl 1.40
1004     public void put(Runnable e) {
1005     offer(e);
1006     }
1007    
1008     public boolean add(Runnable e) {
1009 jsr166 1.48 return offer(e);
1010     }
1011 dl 1.40
1012     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1013     return offer(e);
1014     }
1015 jsr166 1.42
1016 jsr166 1.46 /**
1017     * Performs common bookkeeping for poll and take: Replaces
1018 jsr166 1.47 * first element with last and sifts it down. Call only when
1019     * holding lock.
1020 jsr166 1.46 * @param f the task to remove and return
1021     */
1022 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1023 jsr166 1.46 int s = --size;
1024 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1025 jsr166 1.46 queue[s] = null;
1026     if (s != 0)
1027     siftDown(0, x);
1028     setIndex(f, -1);
1029     return f;
1030     }
1031    
1032 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1033 dl 1.40 final ReentrantLock lock = this.lock;
1034     lock.lock();
1035     try {
1036 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1037 jsr166 1.62 if (first == null || first.getDelay(NANOSECONDS) > 0)
1038 dl 1.40 return null;
1039 jsr166 1.42 else
1040 dl 1.40 return finishPoll(first);
1041     } finally {
1042     lock.unlock();
1043     }
1044     }
1045    
1046 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1047 dl 1.40 final ReentrantLock lock = this.lock;
1048     lock.lockInterruptibly();
1049     try {
1050     for (;;) {
1051 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1052 jsr166 1.42 if (first == null)
1053 dl 1.40 available.await();
1054     else {
1055 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1056 jsr166 1.48 if (delay <= 0)
1057     return finishPoll(first);
1058 jsr166 1.71 first = null; // don't retain ref while waiting
1059     if (leader != null)
1060 jsr166 1.48 available.await();
1061     else {
1062     Thread thisThread = Thread.currentThread();
1063     leader = thisThread;
1064     try {
1065     available.awaitNanos(delay);
1066     } finally {
1067     if (leader == thisThread)
1068     leader = null;
1069     }
1070     }
1071 dl 1.40 }
1072     }
1073     } finally {
1074 jsr166 1.48 if (leader == null && queue[0] != null)
1075     available.signal();
1076 dl 1.40 lock.unlock();
1077     }
1078     }
1079    
1080 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1081 dl 1.40 throws InterruptedException {
1082     long nanos = unit.toNanos(timeout);
1083     final ReentrantLock lock = this.lock;
1084     lock.lockInterruptibly();
1085     try {
1086     for (;;) {
1087 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1088 dl 1.40 if (first == null) {
1089     if (nanos <= 0)
1090     return null;
1091     else
1092     nanos = available.awaitNanos(nanos);
1093     } else {
1094 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1095 jsr166 1.48 if (delay <= 0)
1096 dl 1.40 return finishPoll(first);
1097 jsr166 1.48 if (nanos <= 0)
1098     return null;
1099 jsr166 1.71 first = null; // don't retain ref while waiting
1100 jsr166 1.48 if (nanos < delay || leader != null)
1101     nanos = available.awaitNanos(nanos);
1102     else {
1103     Thread thisThread = Thread.currentThread();
1104     leader = thisThread;
1105     try {
1106     long timeLeft = available.awaitNanos(delay);
1107     nanos -= delay - timeLeft;
1108     } finally {
1109     if (leader == thisThread)
1110     leader = null;
1111     }
1112     }
1113     }
1114     }
1115 dl 1.40 } finally {
1116 jsr166 1.48 if (leader == null && queue[0] != null)
1117     available.signal();
1118 dl 1.40 lock.unlock();
1119     }
1120     }
1121    
1122     public void clear() {
1123     final ReentrantLock lock = this.lock;
1124     lock.lock();
1125     try {
1126     for (int i = 0; i < size; i++) {
1127 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1128 dl 1.40 if (t != null) {
1129     queue[i] = null;
1130     setIndex(t, -1);
1131     }
1132     }
1133     size = 0;
1134     } finally {
1135     lock.unlock();
1136     }
1137 dl 1.13 }
1138 dl 1.40
1139     /**
1140 jsr166 1.66 * Returns first element only if it is expired.
1141 dl 1.40 * Used only by drainTo. Call only when holding lock.
1142     */
1143 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1144     // assert lock.isHeldByCurrentThread();
1145 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1146 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1147     null : first;
1148 dl 1.40 }
1149    
1150     public int drainTo(Collection<? super Runnable> c) {
1151     if (c == null)
1152     throw new NullPointerException();
1153     if (c == this)
1154     throw new IllegalArgumentException();
1155     final ReentrantLock lock = this.lock;
1156     lock.lock();
1157     try {
1158 jsr166 1.61 RunnableScheduledFuture<?> first;
1159 dl 1.40 int n = 0;
1160 jsr166 1.62 while ((first = peekExpired()) != null) {
1161     c.add(first); // In this order, in case add() throws.
1162     finishPoll(first);
1163 jsr166 1.48 ++n;
1164     }
1165 dl 1.40 return n;
1166     } finally {
1167     lock.unlock();
1168     }
1169 dl 1.13 }
1170    
1171 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1172 dl 1.40 if (c == null)
1173     throw new NullPointerException();
1174     if (c == this)
1175     throw new IllegalArgumentException();
1176     if (maxElements <= 0)
1177     return 0;
1178     final ReentrantLock lock = this.lock;
1179     lock.lock();
1180     try {
1181 jsr166 1.61 RunnableScheduledFuture<?> first;
1182 dl 1.40 int n = 0;
1183 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1184     c.add(first); // In this order, in case add() throws.
1185     finishPoll(first);
1186 jsr166 1.48 ++n;
1187     }
1188 dl 1.40 return n;
1189     } finally {
1190     lock.unlock();
1191     }
1192     }
1193    
1194     public Object[] toArray() {
1195     final ReentrantLock lock = this.lock;
1196     lock.lock();
1197     try {
1198 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1199 dl 1.40 } finally {
1200     lock.unlock();
1201     }
1202     }
1203    
1204 jsr166 1.48 @SuppressWarnings("unchecked")
1205 dl 1.40 public <T> T[] toArray(T[] a) {
1206     final ReentrantLock lock = this.lock;
1207     lock.lock();
1208     try {
1209     if (a.length < size)
1210     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1211     System.arraycopy(queue, 0, a, 0, size);
1212     if (a.length > size)
1213     a[size] = null;
1214     return a;
1215     } finally {
1216     lock.unlock();
1217     }
1218 dl 1.13 }
1219    
1220 jsr166 1.21 public Iterator<Runnable> iterator() {
1221 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1222 dl 1.40 }
1223 jsr166 1.42
1224 dl 1.40 /**
1225     * Snapshot iterator that works off copy of underlying q array.
1226     */
1227     private class Itr implements Iterator<Runnable> {
1228 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1229 jsr166 1.48 int cursor = 0; // index of next element to return
1230     int lastRet = -1; // index of last element, or -1 if no such
1231 jsr166 1.42
1232 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1233 dl 1.40 this.array = array;
1234     }
1235 jsr166 1.42
1236 dl 1.40 public boolean hasNext() {
1237     return cursor < array.length;
1238     }
1239 jsr166 1.42
1240 dl 1.40 public Runnable next() {
1241     if (cursor >= array.length)
1242     throw new NoSuchElementException();
1243     lastRet = cursor;
1244 jsr166 1.45 return array[cursor++];
1245 dl 1.40 }
1246 jsr166 1.42
1247 dl 1.40 public void remove() {
1248     if (lastRet < 0)
1249     throw new IllegalStateException();
1250     DelayedWorkQueue.this.remove(array[lastRet]);
1251     lastRet = -1;
1252     }
1253 dl 1.13 }
1254     }
1255 dl 1.1 }