ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.77
Committed: Wed Feb 19 20:33:25 2014 UTC (10 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.76: +15 -11 lines
Log Message:
clarify nature of ScheduledFuture elements in queue

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9     import java.util.concurrent.atomic.AtomicLong;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12 dl 1.1 import java.util.*;
13    
14     /**
15 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
16 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
17     * This class is preferable to {@link java.util.Timer} when multiple
18     * worker threads are needed, or when the additional flexibility or
19     * capabilities of {@link ThreadPoolExecutor} (which this class
20     * extends) are required.
21 dl 1.1 *
22 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
23 dl 1.18 * without any real-time guarantees about when, after they are
24     * enabled, they will commence. Tasks scheduled for exactly the same
25     * execution time are enabled in first-in-first-out (FIFO) order of
26 jsr166 1.46 * submission.
27     *
28     * <p>When a submitted task is cancelled before it is run, execution
29 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
30     * automatically removed from the work queue until its delay elapses.
31     * While this enables further inspection and monitoring, it may also
32     * cause unbounded retention of cancelled tasks. To avoid this, use
33     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
34     * removed from the work queue at time of cancellation.
35 dl 1.1 *
36 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
37     * {@link #scheduleAtFixedRate} or
38     * {@link #scheduleWithFixedDelay} do not overlap. While different
39 dl 1.51 * executions may be performed by different threads, the effects of
40     * prior executions <a
41     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
42     * those of subsequent ones.
43     *
44 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
45 dl 1.8 * of the inherited tuning methods are not useful for it. In
46 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
47     * {@code corePoolSize} threads and an unbounded queue, adjustments
48     * to {@code maximumPoolSize} have no useful effect. Additionally, it
49     * is almost never a good idea to set {@code corePoolSize} to zero or
50     * use {@code allowCoreThreadTimeOut} because this may leave the pool
51     * without threads to handle tasks once they become eligible to run.
52 dl 1.1 *
53 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
54 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
55 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
56     * methods to generate internal {@link ScheduledFuture} objects to
57     * control per-task delays and scheduling. To preserve
58     * functionality, any further overrides of these methods in
59 dl 1.32 * subclasses must invoke superclass versions, which effectively
60 jsr166 1.39 * disables additional task customization. However, this class
61 dl 1.32 * provides alternative protected extension method
62 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
63     * {@code Callable}) that can be used to customize the concrete task
64     * types used to execute commands entered via {@code execute},
65     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
66     * and {@code scheduleWithFixedDelay}. By default, a
67     * {@code ScheduledThreadPoolExecutor} uses a task type extending
68 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
69     * subclasses of the form:
70     *
71 jsr166 1.39 * <pre> {@code
72 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
73     *
74 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
75 dl 1.23 *
76 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
77     * Runnable r, RunnableScheduledFuture<V> task) {
78     * return new CustomTask<V>(r, task);
79 jsr166 1.29 * }
80 dl 1.23 *
81 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
82     * Callable<V> c, RunnableScheduledFuture<V> task) {
83     * return new CustomTask<V>(c, task);
84 jsr166 1.29 * }
85     * // ... add constructors, etc.
86 jsr166 1.39 * }}</pre>
87     *
88 dl 1.1 * @since 1.5
89     * @author Doug Lea
90     */
91 jsr166 1.21 public class ScheduledThreadPoolExecutor
92     extends ThreadPoolExecutor
93 tim 1.3 implements ScheduledExecutorService {
94 dl 1.1
95 dl 1.37 /*
96     * This class specializes ThreadPoolExecutor implementation by
97     *
98     * 1. Using a custom task type, ScheduledFutureTask for
99     * tasks, even those that don't require scheduling (i.e.,
100     * those submitted using ExecutorService execute, not
101     * ScheduledExecutorService methods) which are treated as
102     * delayed tasks with a delay of zero.
103     *
104 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
105 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
106     * the fact that corePoolSize and maximumPoolSize are
107     * effectively identical simplifies some execution mechanics
108 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
109 dl 1.37 *
110     * 3. Supporting optional run-after-shutdown parameters, which
111     * leads to overrides of shutdown methods to remove and cancel
112     * tasks that should NOT be run after shutdown, as well as
113     * different recheck logic when task (re)submission overlaps
114     * with a shutdown.
115     *
116     * 4. Task decoration methods to allow interception and
117     * instrumentation, which are needed because subclasses cannot
118     * otherwise override submit methods to get this effect. These
119     * don't have any impact on pool control logic though.
120     */
121    
122 dl 1.1 /**
123     * False if should cancel/suppress periodic tasks on shutdown.
124     */
125     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
126    
127     /**
128     * False if should cancel non-periodic tasks on shutdown.
129     */
130     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
131    
132     /**
133 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
134 dl 1.41 */
135     private volatile boolean removeOnCancel = false;
136    
137     /**
138 dl 1.1 * Sequence number to break scheduling ties, and in turn to
139     * guarantee FIFO order among tied entries.
140     */
141 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
142 dl 1.14
143     /**
144 jsr166 1.39 * Returns current nanosecond time.
145 dl 1.14 */
146 jsr166 1.54 final long now() {
147     return System.nanoTime();
148 dl 1.14 }
149    
150 jsr166 1.21 private class ScheduledFutureTask<V>
151 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152 jsr166 1.21
153 dl 1.1 /** Sequence number to break ties FIFO */
154     private final long sequenceNumber;
155 jsr166 1.44
156 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
157     private long time;
158 jsr166 1.44
159 dl 1.16 /**
160 jsr166 1.75 * Period in nanoseconds for repeating tasks.
161     * A positive value indicates fixed-rate execution.
162     * A negative value indicates fixed-delay execution.
163     * A value of 0 indicates a non-repeating (one-shot) task.
164 dl 1.16 */
165 dl 1.1 private final long period;
166    
167 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
168     RunnableScheduledFuture<V> outerTask = this;
169 jsr166 1.44
170 dl 1.1 /**
171 dl 1.40 * Index into delay queue, to support faster cancellation.
172     */
173     int heapIndex;
174    
175     /**
176 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
177 dl 1.1 */
178 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
179 dl 1.1 super(r, result);
180 jsr166 1.75 this.time = triggerTime;
181 dl 1.1 this.period = 0;
182     this.sequenceNumber = sequencer.getAndIncrement();
183     }
184    
185     /**
186 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
187     * trigger time and period.
188 dl 1.1 */
189 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
190     long period) {
191 dl 1.1 super(r, result);
192 jsr166 1.75 this.time = triggerTime;
193 dl 1.1 this.period = period;
194     this.sequenceNumber = sequencer.getAndIncrement();
195     }
196    
197     /**
198 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
199 dl 1.1 */
200 jsr166 1.75 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
201 dl 1.1 super(callable);
202 jsr166 1.75 this.time = triggerTime;
203 dl 1.1 this.period = 0;
204     this.sequenceNumber = sequencer.getAndIncrement();
205     }
206    
207     public long getDelay(TimeUnit unit) {
208 jsr166 1.62 return unit.convert(time - now(), NANOSECONDS);
209 dl 1.1 }
210    
211 dl 1.20 public int compareTo(Delayed other) {
212 dl 1.59 if (other == this) // compare zero if same object
213 dl 1.1 return 0;
214 dl 1.34 if (other instanceof ScheduledFutureTask) {
215     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
216     long diff = time - x.time;
217     if (diff < 0)
218     return -1;
219     else if (diff > 0)
220     return 1;
221     else if (sequenceNumber < x.sequenceNumber)
222     return -1;
223     else
224     return 1;
225     }
226 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
227 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
228 dl 1.1 }
229    
230     /**
231 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
232 jsr166 1.30 *
233 jsr166 1.70 * @return {@code true} if periodic
234 dl 1.1 */
235 dl 1.23 public boolean isPeriodic() {
236 dl 1.16 return period != 0;
237 dl 1.1 }
238    
239     /**
240 jsr166 1.39 * Sets the next time to run for a periodic task.
241 dl 1.13 */
242 dl 1.37 private void setNextRunTime() {
243     long p = period;
244     if (p > 0)
245     time += p;
246     else
247 jsr166 1.54 time = triggerTime(-p);
248 dl 1.13 }
249    
250 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
251 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
252     if (cancelled && removeOnCancel && heapIndex >= 0)
253 jsr166 1.42 remove(this);
254 dl 1.41 return cancelled;
255 dl 1.40 }
256    
257 dl 1.13 /**
258 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
259 jsr166 1.21 */
260 dl 1.1 public void run() {
261 dl 1.37 boolean periodic = isPeriodic();
262     if (!canRunInCurrentRunState(periodic))
263     cancel(false);
264     else if (!periodic)
265 dl 1.5 ScheduledFutureTask.super.run();
266 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
267     setNextRunTime();
268 jsr166 1.44 reExecutePeriodic(outerTask);
269 dl 1.37 }
270 dl 1.1 }
271     }
272    
273     /**
274 dl 1.37 * Returns true if can run a task given current run state
275 jsr166 1.39 * and run-after-shutdown parameters.
276     *
277 dl 1.37 * @param periodic true if this task periodic, false if delayed
278     */
279     boolean canRunInCurrentRunState(boolean periodic) {
280 jsr166 1.38 return isRunningOrShutdown(periodic ?
281 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
282     executeExistingDelayedTasksAfterShutdown);
283     }
284    
285     /**
286     * Main execution method for delayed or periodic tasks. If pool
287     * is shut down, rejects the task. Otherwise adds task to queue
288     * and starts a thread, if necessary, to run it. (We cannot
289     * prestart the thread to run the task because the task (probably)
290 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
291 dl 1.37 * is being added, cancel and remove it if required by state and
292 jsr166 1.39 * run-after-shutdown parameters.
293     *
294 dl 1.37 * @param task the task
295     */
296     private void delayedExecute(RunnableScheduledFuture<?> task) {
297     if (isShutdown())
298     reject(task);
299     else {
300     super.getQueue().add(task);
301     if (isShutdown() &&
302     !canRunInCurrentRunState(task.isPeriodic()) &&
303     remove(task))
304     task.cancel(false);
305 jsr166 1.48 else
306 dl 1.63 ensurePrestart();
307 dl 1.37 }
308     }
309 jsr166 1.21
310 dl 1.37 /**
311 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
312     * Same idea as delayedExecute except drops task rather than rejecting.
313     *
314 dl 1.37 * @param task the task
315     */
316     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
317     if (canRunInCurrentRunState(true)) {
318     super.getQueue().add(task);
319     if (!canRunInCurrentRunState(true) && remove(task))
320     task.cancel(false);
321 jsr166 1.48 else
322 dl 1.63 ensurePrestart();
323 dl 1.37 }
324 dl 1.13 }
325 dl 1.1
326 dl 1.13 /**
327 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
328 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
329 dl 1.13 */
330 dl 1.37 @Override void onShutdown() {
331     BlockingQueue<Runnable> q = super.getQueue();
332     boolean keepDelayed =
333     getExecuteExistingDelayedTasksAfterShutdownPolicy();
334     boolean keepPeriodic =
335     getContinueExistingPeriodicTasksAfterShutdownPolicy();
336 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
337     for (Object e : q.toArray())
338     if (e instanceof RunnableScheduledFuture<?>)
339     ((RunnableScheduledFuture<?>) e).cancel(false);
340 dl 1.37 q.clear();
341 jsr166 1.57 }
342 dl 1.37 else {
343     // Traverse snapshot to avoid iterator exceptions
344 jsr166 1.39 for (Object e : q.toArray()) {
345 dl 1.23 if (e instanceof RunnableScheduledFuture) {
346 dl 1.37 RunnableScheduledFuture<?> t =
347     (RunnableScheduledFuture<?>)e;
348 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
349     t.isCancelled()) { // also remove if already cancelled
350     if (q.remove(t))
351     t.cancel(false);
352     }
353 dl 1.13 }
354     }
355 dl 1.1 }
356 jsr166 1.48 tryTerminate();
357 dl 1.1 }
358    
359 dl 1.23 /**
360 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
361 jsr166 1.28 * This method can be used to override the concrete
362 dl 1.23 * class used for managing internal tasks.
363 jsr166 1.30 * The default implementation simply returns the given task.
364 jsr166 1.28 *
365 dl 1.23 * @param runnable the submitted Runnable
366     * @param task the task created to execute the runnable
367 jsr166 1.72 * @param <V> the type of the task's result
368 dl 1.23 * @return a task that can execute the runnable
369     * @since 1.6
370     */
371 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
372 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
373     return task;
374 peierls 1.22 }
375    
376 dl 1.23 /**
377 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
378 jsr166 1.28 * This method can be used to override the concrete
379 dl 1.23 * class used for managing internal tasks.
380 jsr166 1.30 * The default implementation simply returns the given task.
381 jsr166 1.28 *
382 dl 1.23 * @param callable the submitted Callable
383     * @param task the task created to execute the callable
384 jsr166 1.72 * @param <V> the type of the task's result
385 dl 1.23 * @return a task that can execute the callable
386     * @since 1.6
387     */
388 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
389 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
390     return task;
391 dl 1.19 }
392    
393 dl 1.1 /**
394 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
395     * given core pool size.
396 jsr166 1.21 *
397 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
398     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
399     * @throws IllegalArgumentException if {@code corePoolSize < 0}
400 dl 1.1 */
401     public ScheduledThreadPoolExecutor(int corePoolSize) {
402 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
403 dl 1.1 new DelayedWorkQueue());
404     }
405    
406     /**
407 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
408     * given initial parameters.
409 jsr166 1.21 *
410 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
411     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
412 dl 1.1 * @param threadFactory the factory to use when the executor
413 jsr166 1.39 * creates a new thread
414     * @throws IllegalArgumentException if {@code corePoolSize < 0}
415     * @throws NullPointerException if {@code threadFactory} is null
416 dl 1.1 */
417     public ScheduledThreadPoolExecutor(int corePoolSize,
418 jsr166 1.56 ThreadFactory threadFactory) {
419 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
420 dl 1.1 new DelayedWorkQueue(), threadFactory);
421     }
422    
423     /**
424 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
425     * initial parameters.
426 jsr166 1.21 *
427 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
428     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
429 dl 1.1 * @param handler the handler to use when execution is blocked
430 jsr166 1.39 * because the thread bounds and queue capacities are reached
431     * @throws IllegalArgumentException if {@code corePoolSize < 0}
432     * @throws NullPointerException if {@code handler} is null
433 dl 1.1 */
434     public ScheduledThreadPoolExecutor(int corePoolSize,
435 jsr166 1.56 RejectedExecutionHandler handler) {
436 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
437 dl 1.1 new DelayedWorkQueue(), handler);
438     }
439    
440     /**
441 dl 1.13 * Creates a new ScheduledThreadPoolExecutor with the given
442     * initial parameters.
443 jsr166 1.21 *
444 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
445     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
446 dl 1.1 * @param threadFactory the factory to use when the executor
447 jsr166 1.39 * creates a new thread
448 dl 1.1 * @param handler the handler to use when execution is blocked
449 jsr166 1.39 * because the thread bounds and queue capacities are reached
450     * @throws IllegalArgumentException if {@code corePoolSize < 0}
451     * @throws NullPointerException if {@code threadFactory} or
452     * {@code handler} is null
453 dl 1.1 */
454     public ScheduledThreadPoolExecutor(int corePoolSize,
455 jsr166 1.56 ThreadFactory threadFactory,
456     RejectedExecutionHandler handler) {
457 jsr166 1.62 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
458 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
459     }
460    
461 dl 1.37 /**
462 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
463 jsr166 1.54 */
464     private long triggerTime(long delay, TimeUnit unit) {
465     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
466     }
467    
468     /**
469 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
470 dl 1.50 */
471 jsr166 1.54 long triggerTime(long delay) {
472     return now() +
473     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
474     }
475    
476     /**
477     * Constrains the values of all delays in the queue to be within
478     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
479     * This may occur if a task is eligible to be dequeued, but has
480     * not yet been, while some other task is added with a delay of
481     * Long.MAX_VALUE.
482     */
483     private long overflowFree(long delay) {
484     Delayed head = (Delayed) super.getQueue().peek();
485     if (head != null) {
486 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
487 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
488     delay = Long.MAX_VALUE + headDelay;
489     }
490     return delay;
491 dl 1.50 }
492    
493     /**
494 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
495     * @throws NullPointerException {@inheritDoc}
496     */
497 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
498     long delay,
499 dl 1.13 TimeUnit unit) {
500 dl 1.9 if (command == null || unit == null)
501 dl 1.1 throw new NullPointerException();
502 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
503 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
504     triggerTime(delay, unit)));
505 dl 1.1 delayedExecute(t);
506     return t;
507     }
508 jsr166 1.52
509 dl 1.37 /**
510     * @throws RejectedExecutionException {@inheritDoc}
511     * @throws NullPointerException {@inheritDoc}
512     */
513 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
514     long delay,
515 dl 1.13 TimeUnit unit) {
516 dl 1.9 if (callable == null || unit == null)
517 dl 1.1 throw new NullPointerException();
518 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
519 jsr166 1.54 new ScheduledFutureTask<V>(callable,
520     triggerTime(delay, unit)));
521 dl 1.1 delayedExecute(t);
522     return t;
523     }
524    
525 dl 1.37 /**
526     * @throws RejectedExecutionException {@inheritDoc}
527     * @throws NullPointerException {@inheritDoc}
528     * @throws IllegalArgumentException {@inheritDoc}
529     */
530 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
531     long initialDelay,
532     long period,
533 dl 1.13 TimeUnit unit) {
534 dl 1.9 if (command == null || unit == null)
535 dl 1.1 throw new NullPointerException();
536     if (period <= 0)
537     throw new IllegalArgumentException();
538 jsr166 1.48 ScheduledFutureTask<Void> sft =
539     new ScheduledFutureTask<Void>(command,
540     null,
541 jsr166 1.54 triggerTime(initialDelay, unit),
542 jsr166 1.48 unit.toNanos(period));
543 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
544 jsr166 1.48 sft.outerTask = t;
545 dl 1.1 delayedExecute(t);
546     return t;
547     }
548 jsr166 1.21
549 dl 1.37 /**
550     * @throws RejectedExecutionException {@inheritDoc}
551     * @throws NullPointerException {@inheritDoc}
552     * @throws IllegalArgumentException {@inheritDoc}
553     */
554 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
555     long initialDelay,
556     long delay,
557 dl 1.13 TimeUnit unit) {
558 dl 1.9 if (command == null || unit == null)
559 dl 1.1 throw new NullPointerException();
560     if (delay <= 0)
561     throw new IllegalArgumentException();
562 jsr166 1.48 ScheduledFutureTask<Void> sft =
563     new ScheduledFutureTask<Void>(command,
564     null,
565 jsr166 1.54 triggerTime(initialDelay, unit),
566 jsr166 1.48 unit.toNanos(-delay));
567 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
568 jsr166 1.48 sft.outerTask = t;
569 dl 1.1 delayedExecute(t);
570     return t;
571     }
572 jsr166 1.21
573 dl 1.1 /**
574 jsr166 1.39 * Executes {@code command} with zero required delay.
575     * This has effect equivalent to
576     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
577     * Note that inspections of the queue and of the list returned by
578     * {@code shutdownNow} will access the zero-delayed
579     * {@link ScheduledFuture}, not the {@code command} itself.
580     *
581     * <p>A consequence of the use of {@code ScheduledFuture} objects is
582     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
583     * called with a null second {@code Throwable} argument, even if the
584     * {@code command} terminated abruptly. Instead, the {@code Throwable}
585     * thrown by such a task can be obtained via {@link Future#get}.
586 dl 1.1 *
587     * @throws RejectedExecutionException at discretion of
588 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
589     * cannot be accepted for execution because the
590     * executor has been shut down
591     * @throws NullPointerException {@inheritDoc}
592 dl 1.1 */
593     public void execute(Runnable command) {
594 jsr166 1.62 schedule(command, 0, NANOSECONDS);
595 dl 1.1 }
596    
597 dl 1.13 // Override AbstractExecutorService methods
598    
599 dl 1.37 /**
600     * @throws RejectedExecutionException {@inheritDoc}
601     * @throws NullPointerException {@inheritDoc}
602     */
603 dl 1.7 public Future<?> submit(Runnable task) {
604 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
605 dl 1.7 }
606    
607 dl 1.37 /**
608     * @throws RejectedExecutionException {@inheritDoc}
609     * @throws NullPointerException {@inheritDoc}
610     */
611 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
612 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
613 dl 1.7 }
614    
615 dl 1.37 /**
616     * @throws RejectedExecutionException {@inheritDoc}
617     * @throws NullPointerException {@inheritDoc}
618     */
619 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
620 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
621 dl 1.7 }
622 dl 1.1
623     /**
624 dl 1.37 * Sets the policy on whether to continue executing existing
625 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
626     * In this case, these tasks will only terminate upon
627     * {@code shutdownNow} or after setting the policy to
628     * {@code false} when already shutdown.
629     * This value is by default {@code false}.
630 jsr166 1.30 *
631 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
632 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
633 dl 1.1 */
634     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
635     continueExistingPeriodicTasksAfterShutdown = value;
636 jsr166 1.39 if (!value && isShutdown())
637 dl 1.37 onShutdown();
638 dl 1.1 }
639    
640     /**
641 jsr166 1.21 * Gets the policy on whether to continue executing existing
642 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
643     * In this case, these tasks will only terminate upon
644     * {@code shutdownNow} or after setting the policy to
645     * {@code false} when already shutdown.
646     * This value is by default {@code false}.
647 jsr166 1.30 *
648 jsr166 1.39 * @return {@code true} if will continue after shutdown
649 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
650 dl 1.1 */
651     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
652     return continueExistingPeriodicTasksAfterShutdown;
653     }
654    
655     /**
656 jsr166 1.21 * Sets the policy on whether to execute existing delayed
657 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
658     * In this case, these tasks will only terminate upon
659     * {@code shutdownNow}, or after setting the policy to
660     * {@code false} when already shutdown.
661     * This value is by default {@code true}.
662 jsr166 1.30 *
663 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
664 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
665 dl 1.1 */
666     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
667     executeExistingDelayedTasksAfterShutdown = value;
668 jsr166 1.39 if (!value && isShutdown())
669 dl 1.37 onShutdown();
670 dl 1.1 }
671    
672     /**
673 jsr166 1.21 * Gets the policy on whether to execute existing delayed
674 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
675     * In this case, these tasks will only terminate upon
676     * {@code shutdownNow}, or after setting the policy to
677     * {@code false} when already shutdown.
678     * This value is by default {@code true}.
679 jsr166 1.30 *
680 jsr166 1.39 * @return {@code true} if will execute after shutdown
681 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
682 dl 1.1 */
683     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
684     return executeExistingDelayedTasksAfterShutdown;
685     }
686    
687     /**
688 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
689     * removed from the work queue at time of cancellation. This value is
690     * by default {@code false}.
691 dl 1.41 *
692 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
693 dl 1.41 * @see #getRemoveOnCancelPolicy
694 jsr166 1.43 * @since 1.7
695 dl 1.41 */
696     public void setRemoveOnCancelPolicy(boolean value) {
697     removeOnCancel = value;
698     }
699    
700     /**
701 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
702     * removed from the work queue at time of cancellation. This value is
703     * by default {@code false}.
704 dl 1.41 *
705 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
706     * from the queue
707 dl 1.41 * @see #setRemoveOnCancelPolicy
708 jsr166 1.43 * @since 1.7
709 dl 1.41 */
710     public boolean getRemoveOnCancelPolicy() {
711     return removeOnCancel;
712     }
713    
714     /**
715 dl 1.1 * Initiates an orderly shutdown in which previously submitted
716 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
717     * Invocation has no additional effect if already shut down.
718     *
719     * <p>This method does not wait for previously submitted tasks to
720     * complete execution. Use {@link #awaitTermination awaitTermination}
721     * to do that.
722     *
723     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
724     * has been set {@code false}, existing delayed tasks whose delays
725     * have not yet elapsed are cancelled. And unless the {@code
726     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
727     * {@code true}, future executions of existing periodic tasks will
728     * be cancelled.
729 jsr166 1.39 *
730     * @throws SecurityException {@inheritDoc}
731 dl 1.1 */
732     public void shutdown() {
733     super.shutdown();
734     }
735    
736     /**
737     * Attempts to stop all actively executing tasks, halts the
738 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
739     * that were awaiting execution.
740 jsr166 1.21 *
741 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
742     * terminate. Use {@link #awaitTermination awaitTermination} to
743     * do that.
744     *
745 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
746 dl 1.18 * processing actively executing tasks. This implementation
747 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
748     * fails to respond to interrupts may never terminate.
749 dl 1.1 *
750 jsr166 1.39 * @return list of tasks that never commenced execution.
751 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
752     * For tasks submitted via one of the {@code schedule}
753     * methods, the element will be identical to the returned
754     * {@code ScheduledFuture}. For tasks submitted using
755     * {@link #execute}, the element will be a zero-delay {@code
756     * ScheduledFuture}.
757 jsr166 1.31 * @throws SecurityException {@inheritDoc}
758 dl 1.1 */
759 tim 1.4 public List<Runnable> shutdownNow() {
760 dl 1.1 return super.shutdownNow();
761     }
762    
763     /**
764 jsr166 1.77 * Returns the task queue used by this executor.
765     * Each element of this list is a {@link ScheduledFuture}.
766     * For tasks submitted via one of the {@code schedule} methods, the
767     * element will be identical to the returned {@code ScheduledFuture}.
768     * For tasks submitted using {@link #execute}, the element will be a
769     * zero-delay {@code ScheduledFuture}.
770     *
771     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
772     * tasks in the order in which they will execute.
773 dl 1.1 *
774     * @return the task queue
775     */
776     public BlockingQueue<Runnable> getQueue() {
777     return super.getQueue();
778     }
779    
780 dl 1.13 /**
781 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
782     * class must be declared as a BlockingQueue<Runnable> even though
783 jsr166 1.42 * it can only hold RunnableScheduledFutures.
784 jsr166 1.21 */
785 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
786 dl 1.13 implements BlockingQueue<Runnable> {
787 jsr166 1.21
788 dl 1.40 /*
789     * A DelayedWorkQueue is based on a heap-based data structure
790     * like those in DelayQueue and PriorityQueue, except that
791     * every ScheduledFutureTask also records its index into the
792     * heap array. This eliminates the need to find a task upon
793     * cancellation, greatly speeding up removal (down from O(n)
794     * to O(log n)), and reducing garbage retention that would
795     * otherwise occur by waiting for the element to rise to top
796     * before clearing. But because the queue may also hold
797     * RunnableScheduledFutures that are not ScheduledFutureTasks,
798     * we are not guaranteed to have such indices available, in
799     * which case we fall back to linear search. (We expect that
800     * most tasks will not be decorated, and that the faster cases
801     * will be much more common.)
802     *
803     * All heap operations must record index changes -- mainly
804     * within siftUp and siftDown. Upon removal, a task's
805     * heapIndex is set to -1. Note that ScheduledFutureTasks can
806     * appear at most once in the queue (this need not be true for
807     * other kinds of tasks or work queues), so are uniquely
808     * identified by heapIndex.
809     */
810    
811 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
812 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
813     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
814 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
815 dl 1.40 private int size = 0;
816    
817 jsr166 1.48 /**
818     * Thread designated to wait for the task at the head of the
819     * queue. This variant of the Leader-Follower pattern
820     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
821     * minimize unnecessary timed waiting. When a thread becomes
822     * the leader, it waits only for the next delay to elapse, but
823     * other threads await indefinitely. The leader thread must
824     * signal some other thread before returning from take() or
825     * poll(...), unless some other thread becomes leader in the
826     * interim. Whenever the head of the queue is replaced with a
827     * task with an earlier expiration time, the leader field is
828     * invalidated by being reset to null, and some waiting
829     * thread, but not necessarily the current leader, is
830     * signalled. So waiting threads must be prepared to acquire
831     * and lose leadership while waiting.
832     */
833     private Thread leader = null;
834    
835     /**
836     * Condition signalled when a newer task becomes available at the
837     * head of the queue or a new thread may need to become leader.
838     */
839     private final Condition available = lock.newCondition();
840 dl 1.40
841     /**
842 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
843 dl 1.40 */
844 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
845 dl 1.40 if (f instanceof ScheduledFutureTask)
846     ((ScheduledFutureTask)f).heapIndex = idx;
847     }
848    
849     /**
850 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
851 dl 1.40 * Call only when holding lock.
852     */
853 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
854 dl 1.40 while (k > 0) {
855     int parent = (k - 1) >>> 1;
856 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
857 dl 1.40 if (key.compareTo(e) >= 0)
858     break;
859     queue[k] = e;
860     setIndex(e, k);
861     k = parent;
862     }
863     queue[k] = key;
864     setIndex(key, k);
865     }
866    
867     /**
868 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
869 dl 1.40 * Call only when holding lock.
870     */
871 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
872 jsr166 1.42 int half = size >>> 1;
873 dl 1.40 while (k < half) {
874 jsr166 1.42 int child = (k << 1) + 1;
875 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
876 dl 1.40 int right = child + 1;
877     if (right < size && c.compareTo(queue[right]) > 0)
878     c = queue[child = right];
879     if (key.compareTo(c) <= 0)
880     break;
881     queue[k] = c;
882     setIndex(c, k);
883     k = child;
884     }
885     queue[k] = key;
886     setIndex(key, k);
887     }
888    
889     /**
890 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
891 dl 1.40 */
892     private void grow() {
893     int oldCapacity = queue.length;
894     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
895     if (newCapacity < 0) // overflow
896     newCapacity = Integer.MAX_VALUE;
897     queue = Arrays.copyOf(queue, newCapacity);
898     }
899    
900     /**
901 jsr166 1.66 * Finds index of given object, or -1 if absent.
902 dl 1.40 */
903     private int indexOf(Object x) {
904     if (x != null) {
905 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
906     int i = ((ScheduledFutureTask) x).heapIndex;
907     // Sanity check; x could conceivably be a
908     // ScheduledFutureTask from some other pool.
909     if (i >= 0 && i < size && queue[i] == x)
910     return i;
911     } else {
912     for (int i = 0; i < size; i++)
913     if (x.equals(queue[i]))
914     return i;
915     }
916 dl 1.40 }
917     return -1;
918     }
919    
920 jsr166 1.48 public boolean contains(Object x) {
921     final ReentrantLock lock = this.lock;
922 jsr166 1.45 lock.lock();
923     try {
924 jsr166 1.48 return indexOf(x) != -1;
925 jsr166 1.45 } finally {
926     lock.unlock();
927     }
928 jsr166 1.48 }
929 jsr166 1.45
930 dl 1.40 public boolean remove(Object x) {
931     final ReentrantLock lock = this.lock;
932     lock.lock();
933     try {
934 jsr166 1.45 int i = indexOf(x);
935 jsr166 1.48 if (i < 0)
936     return false;
937 jsr166 1.45
938 jsr166 1.48 setIndex(queue[i], -1);
939     int s = --size;
940 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
941 jsr166 1.48 queue[s] = null;
942     if (s != i) {
943     siftDown(i, replacement);
944     if (queue[i] == replacement)
945     siftUp(i, replacement);
946     }
947     return true;
948 dl 1.40 } finally {
949     lock.unlock();
950     }
951     }
952    
953     public int size() {
954     final ReentrantLock lock = this.lock;
955     lock.lock();
956     try {
957 jsr166 1.45 return size;
958 dl 1.40 } finally {
959     lock.unlock();
960     }
961     }
962    
963 jsr166 1.42 public boolean isEmpty() {
964     return size() == 0;
965 dl 1.40 }
966    
967     public int remainingCapacity() {
968     return Integer.MAX_VALUE;
969     }
970    
971 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
972 dl 1.40 final ReentrantLock lock = this.lock;
973     lock.lock();
974     try {
975     return queue[0];
976     } finally {
977     lock.unlock();
978     }
979 dl 1.13 }
980    
981 dl 1.40 public boolean offer(Runnable x) {
982     if (x == null)
983     throw new NullPointerException();
984 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
985 dl 1.40 final ReentrantLock lock = this.lock;
986     lock.lock();
987     try {
988     int i = size;
989     if (i >= queue.length)
990     grow();
991     size = i + 1;
992     if (i == 0) {
993     queue[0] = e;
994     setIndex(e, 0);
995 jsr166 1.45 } else {
996 dl 1.40 siftUp(i, e);
997     }
998 jsr166 1.46 if (queue[0] == e) {
999 jsr166 1.48 leader = null;
1000 jsr166 1.46 available.signal();
1001 jsr166 1.48 }
1002 dl 1.40 } finally {
1003     lock.unlock();
1004     }
1005     return true;
1006 jsr166 1.48 }
1007 dl 1.40
1008     public void put(Runnable e) {
1009     offer(e);
1010     }
1011    
1012     public boolean add(Runnable e) {
1013 jsr166 1.48 return offer(e);
1014     }
1015 dl 1.40
1016     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1017     return offer(e);
1018     }
1019 jsr166 1.42
1020 jsr166 1.46 /**
1021     * Performs common bookkeeping for poll and take: Replaces
1022 jsr166 1.47 * first element with last and sifts it down. Call only when
1023     * holding lock.
1024 jsr166 1.46 * @param f the task to remove and return
1025     */
1026 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1027 jsr166 1.46 int s = --size;
1028 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1029 jsr166 1.46 queue[s] = null;
1030     if (s != 0)
1031     siftDown(0, x);
1032     setIndex(f, -1);
1033     return f;
1034     }
1035    
1036 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1037 dl 1.40 final ReentrantLock lock = this.lock;
1038     lock.lock();
1039     try {
1040 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1041 jsr166 1.62 if (first == null || first.getDelay(NANOSECONDS) > 0)
1042 dl 1.40 return null;
1043 jsr166 1.42 else
1044 dl 1.40 return finishPoll(first);
1045     } finally {
1046     lock.unlock();
1047     }
1048     }
1049    
1050 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1051 dl 1.40 final ReentrantLock lock = this.lock;
1052     lock.lockInterruptibly();
1053     try {
1054     for (;;) {
1055 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1056 jsr166 1.42 if (first == null)
1057 dl 1.40 available.await();
1058     else {
1059 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1060 jsr166 1.48 if (delay <= 0)
1061     return finishPoll(first);
1062 jsr166 1.71 first = null; // don't retain ref while waiting
1063     if (leader != null)
1064 jsr166 1.48 available.await();
1065     else {
1066     Thread thisThread = Thread.currentThread();
1067     leader = thisThread;
1068     try {
1069     available.awaitNanos(delay);
1070     } finally {
1071     if (leader == thisThread)
1072     leader = null;
1073     }
1074     }
1075 dl 1.40 }
1076     }
1077     } finally {
1078 jsr166 1.48 if (leader == null && queue[0] != null)
1079     available.signal();
1080 dl 1.40 lock.unlock();
1081     }
1082     }
1083    
1084 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1085 dl 1.40 throws InterruptedException {
1086     long nanos = unit.toNanos(timeout);
1087     final ReentrantLock lock = this.lock;
1088     lock.lockInterruptibly();
1089     try {
1090     for (;;) {
1091 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1092 dl 1.40 if (first == null) {
1093     if (nanos <= 0)
1094     return null;
1095     else
1096     nanos = available.awaitNanos(nanos);
1097     } else {
1098 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1099 jsr166 1.48 if (delay <= 0)
1100 dl 1.40 return finishPoll(first);
1101 jsr166 1.48 if (nanos <= 0)
1102     return null;
1103 jsr166 1.71 first = null; // don't retain ref while waiting
1104 jsr166 1.48 if (nanos < delay || leader != null)
1105     nanos = available.awaitNanos(nanos);
1106     else {
1107     Thread thisThread = Thread.currentThread();
1108     leader = thisThread;
1109     try {
1110     long timeLeft = available.awaitNanos(delay);
1111     nanos -= delay - timeLeft;
1112     } finally {
1113     if (leader == thisThread)
1114     leader = null;
1115     }
1116     }
1117     }
1118     }
1119 dl 1.40 } finally {
1120 jsr166 1.48 if (leader == null && queue[0] != null)
1121     available.signal();
1122 dl 1.40 lock.unlock();
1123     }
1124     }
1125    
1126     public void clear() {
1127     final ReentrantLock lock = this.lock;
1128     lock.lock();
1129     try {
1130     for (int i = 0; i < size; i++) {
1131 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1132 dl 1.40 if (t != null) {
1133     queue[i] = null;
1134     setIndex(t, -1);
1135     }
1136     }
1137     size = 0;
1138     } finally {
1139     lock.unlock();
1140     }
1141 dl 1.13 }
1142 dl 1.40
1143     /**
1144 jsr166 1.66 * Returns first element only if it is expired.
1145 dl 1.40 * Used only by drainTo. Call only when holding lock.
1146     */
1147 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1148     // assert lock.isHeldByCurrentThread();
1149 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1150 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1151     null : first;
1152 dl 1.40 }
1153    
1154     public int drainTo(Collection<? super Runnable> c) {
1155     if (c == null)
1156     throw new NullPointerException();
1157     if (c == this)
1158     throw new IllegalArgumentException();
1159     final ReentrantLock lock = this.lock;
1160     lock.lock();
1161     try {
1162 jsr166 1.61 RunnableScheduledFuture<?> first;
1163 dl 1.40 int n = 0;
1164 jsr166 1.62 while ((first = peekExpired()) != null) {
1165     c.add(first); // In this order, in case add() throws.
1166     finishPoll(first);
1167 jsr166 1.48 ++n;
1168     }
1169 dl 1.40 return n;
1170     } finally {
1171     lock.unlock();
1172     }
1173 dl 1.13 }
1174    
1175 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1176 dl 1.40 if (c == null)
1177     throw new NullPointerException();
1178     if (c == this)
1179     throw new IllegalArgumentException();
1180     if (maxElements <= 0)
1181     return 0;
1182     final ReentrantLock lock = this.lock;
1183     lock.lock();
1184     try {
1185 jsr166 1.61 RunnableScheduledFuture<?> first;
1186 dl 1.40 int n = 0;
1187 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1188     c.add(first); // In this order, in case add() throws.
1189     finishPoll(first);
1190 jsr166 1.48 ++n;
1191     }
1192 dl 1.40 return n;
1193     } finally {
1194     lock.unlock();
1195     }
1196     }
1197    
1198     public Object[] toArray() {
1199     final ReentrantLock lock = this.lock;
1200     lock.lock();
1201     try {
1202 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1203 dl 1.40 } finally {
1204     lock.unlock();
1205     }
1206     }
1207    
1208 jsr166 1.48 @SuppressWarnings("unchecked")
1209 dl 1.40 public <T> T[] toArray(T[] a) {
1210     final ReentrantLock lock = this.lock;
1211     lock.lock();
1212     try {
1213     if (a.length < size)
1214     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1215     System.arraycopy(queue, 0, a, 0, size);
1216     if (a.length > size)
1217     a[size] = null;
1218     return a;
1219     } finally {
1220     lock.unlock();
1221     }
1222 dl 1.13 }
1223    
1224 jsr166 1.21 public Iterator<Runnable> iterator() {
1225 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1226 dl 1.40 }
1227 jsr166 1.42
1228 dl 1.40 /**
1229     * Snapshot iterator that works off copy of underlying q array.
1230     */
1231     private class Itr implements Iterator<Runnable> {
1232 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1233 jsr166 1.48 int cursor = 0; // index of next element to return
1234     int lastRet = -1; // index of last element, or -1 if no such
1235 jsr166 1.42
1236 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1237 dl 1.40 this.array = array;
1238     }
1239 jsr166 1.42
1240 dl 1.40 public boolean hasNext() {
1241     return cursor < array.length;
1242     }
1243 jsr166 1.42
1244 dl 1.40 public Runnable next() {
1245     if (cursor >= array.length)
1246     throw new NoSuchElementException();
1247     lastRet = cursor;
1248 jsr166 1.45 return array[cursor++];
1249 dl 1.40 }
1250 jsr166 1.42
1251 dl 1.40 public void remove() {
1252     if (lastRet < 0)
1253     throw new IllegalStateException();
1254     DelayedWorkQueue.this.remove(array[lastRet]);
1255     lastRet = -1;
1256     }
1257 dl 1.13 }
1258     }
1259 dl 1.1 }