ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.84
Committed: Thu Jan 15 19:29:22 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.83: +9 -9 lines
Log Message:
generate more readable html for @links

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.83 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.83
12 jsr166 1.82 import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 jsr166 1.83 import java.util.concurrent.atomic.AtomicLong;
19     import java.util.concurrent.locks.Condition;
20     import java.util.concurrent.locks.ReentrantLock;
21 dl 1.1
22     /**
23 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
24 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
25     * This class is preferable to {@link java.util.Timer} when multiple
26     * worker threads are needed, or when the additional flexibility or
27     * capabilities of {@link ThreadPoolExecutor} (which this class
28     * extends) are required.
29 dl 1.1 *
30 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
31 dl 1.18 * without any real-time guarantees about when, after they are
32     * enabled, they will commence. Tasks scheduled for exactly the same
33     * execution time are enabled in first-in-first-out (FIFO) order of
34 jsr166 1.46 * submission.
35     *
36     * <p>When a submitted task is cancelled before it is run, execution
37 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
38     * automatically removed from the work queue until its delay elapses.
39     * While this enables further inspection and monitoring, it may also
40     * cause unbounded retention of cancelled tasks. To avoid this, use
41     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42     * removed from the work queue at time of cancellation.
43 dl 1.1 *
44 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
45 jsr166 1.84 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47     * do not overlap. While different executions may be performed by
48     * different threads, the effects of prior executions
49     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 dl 1.51 * those of subsequent ones.
51     *
52 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 dl 1.8 * of the inherited tuning methods are not useful for it. In
54 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
55     * {@code corePoolSize} threads and an unbounded queue, adjustments
56     * to {@code maximumPoolSize} have no useful effect. Additionally, it
57     * is almost never a good idea to set {@code corePoolSize} to zero or
58     * use {@code allowCoreThreadTimeOut} because this may leave the pool
59     * without threads to handle tasks once they become eligible to run.
60 dl 1.1 *
61 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
62 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
64     * methods to generate internal {@link ScheduledFuture} objects to
65     * control per-task delays and scheduling. To preserve
66     * functionality, any further overrides of these methods in
67 dl 1.32 * subclasses must invoke superclass versions, which effectively
68 jsr166 1.39 * disables additional task customization. However, this class
69 dl 1.32 * provides alternative protected extension method
70 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
71     * {@code Callable}) that can be used to customize the concrete task
72     * types used to execute commands entered via {@code execute},
73     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74     * and {@code scheduleWithFixedDelay}. By default, a
75     * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
77     * subclasses of the form:
78     *
79 jsr166 1.39 * <pre> {@code
80 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81     *
82 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 dl 1.23 *
84 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
85     * Runnable r, RunnableScheduledFuture<V> task) {
86     * return new CustomTask<V>(r, task);
87 jsr166 1.29 * }
88 dl 1.23 *
89 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
90     * Callable<V> c, RunnableScheduledFuture<V> task) {
91     * return new CustomTask<V>(c, task);
92 jsr166 1.29 * }
93     * // ... add constructors, etc.
94 jsr166 1.39 * }}</pre>
95     *
96 dl 1.1 * @since 1.5
97     * @author Doug Lea
98     */
99 jsr166 1.21 public class ScheduledThreadPoolExecutor
100     extends ThreadPoolExecutor
101 tim 1.3 implements ScheduledExecutorService {
102 dl 1.1
103 dl 1.37 /*
104     * This class specializes ThreadPoolExecutor implementation by
105     *
106     * 1. Using a custom task type, ScheduledFutureTask for
107     * tasks, even those that don't require scheduling (i.e.,
108     * those submitted using ExecutorService execute, not
109     * ScheduledExecutorService methods) which are treated as
110     * delayed tasks with a delay of zero.
111     *
112 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
113 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
114     * the fact that corePoolSize and maximumPoolSize are
115     * effectively identical simplifies some execution mechanics
116 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
117 dl 1.37 *
118     * 3. Supporting optional run-after-shutdown parameters, which
119     * leads to overrides of shutdown methods to remove and cancel
120     * tasks that should NOT be run after shutdown, as well as
121     * different recheck logic when task (re)submission overlaps
122     * with a shutdown.
123     *
124     * 4. Task decoration methods to allow interception and
125     * instrumentation, which are needed because subclasses cannot
126     * otherwise override submit methods to get this effect. These
127     * don't have any impact on pool control logic though.
128     */
129    
130 dl 1.1 /**
131     * False if should cancel/suppress periodic tasks on shutdown.
132     */
133     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
134    
135     /**
136     * False if should cancel non-periodic tasks on shutdown.
137     */
138     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
139    
140     /**
141 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
142 dl 1.41 */
143     private volatile boolean removeOnCancel = false;
144    
145     /**
146 dl 1.1 * Sequence number to break scheduling ties, and in turn to
147     * guarantee FIFO order among tied entries.
148     */
149 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
150 dl 1.14
151     /**
152 jsr166 1.39 * Returns current nanosecond time.
153 dl 1.14 */
154 jsr166 1.81 static final long now() {
155 jsr166 1.54 return System.nanoTime();
156 dl 1.14 }
157    
158 jsr166 1.21 private class ScheduledFutureTask<V>
159 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
160 jsr166 1.21
161 dl 1.1 /** Sequence number to break ties FIFO */
162     private final long sequenceNumber;
163 jsr166 1.44
164 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
165     private long time;
166 jsr166 1.44
167 dl 1.16 /**
168 jsr166 1.75 * Period in nanoseconds for repeating tasks.
169     * A positive value indicates fixed-rate execution.
170     * A negative value indicates fixed-delay execution.
171     * A value of 0 indicates a non-repeating (one-shot) task.
172 dl 1.16 */
173 dl 1.1 private final long period;
174    
175 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
176     RunnableScheduledFuture<V> outerTask = this;
177 jsr166 1.44
178 dl 1.1 /**
179 dl 1.40 * Index into delay queue, to support faster cancellation.
180     */
181     int heapIndex;
182    
183     /**
184 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
185 dl 1.1 */
186 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
187 dl 1.1 super(r, result);
188 jsr166 1.75 this.time = triggerTime;
189 dl 1.1 this.period = 0;
190     this.sequenceNumber = sequencer.getAndIncrement();
191     }
192    
193     /**
194 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
195     * trigger time and period.
196 dl 1.1 */
197 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
198     long period) {
199 dl 1.1 super(r, result);
200 jsr166 1.75 this.time = triggerTime;
201 dl 1.1 this.period = period;
202     this.sequenceNumber = sequencer.getAndIncrement();
203     }
204    
205     /**
206 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
207 dl 1.1 */
208 jsr166 1.75 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
209 dl 1.1 super(callable);
210 jsr166 1.75 this.time = triggerTime;
211 dl 1.1 this.period = 0;
212     this.sequenceNumber = sequencer.getAndIncrement();
213     }
214    
215     public long getDelay(TimeUnit unit) {
216 jsr166 1.62 return unit.convert(time - now(), NANOSECONDS);
217 dl 1.1 }
218    
219 dl 1.20 public int compareTo(Delayed other) {
220 dl 1.59 if (other == this) // compare zero if same object
221 dl 1.1 return 0;
222 dl 1.34 if (other instanceof ScheduledFutureTask) {
223     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
224     long diff = time - x.time;
225     if (diff < 0)
226     return -1;
227     else if (diff > 0)
228     return 1;
229     else if (sequenceNumber < x.sequenceNumber)
230     return -1;
231     else
232     return 1;
233     }
234 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
235 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
236 dl 1.1 }
237    
238     /**
239 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
240 jsr166 1.30 *
241 jsr166 1.70 * @return {@code true} if periodic
242 dl 1.1 */
243 dl 1.23 public boolean isPeriodic() {
244 dl 1.16 return period != 0;
245 dl 1.1 }
246    
247     /**
248 jsr166 1.39 * Sets the next time to run for a periodic task.
249 dl 1.13 */
250 dl 1.37 private void setNextRunTime() {
251     long p = period;
252     if (p > 0)
253     time += p;
254     else
255 jsr166 1.54 time = triggerTime(-p);
256 dl 1.13 }
257    
258 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
259 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
260     if (cancelled && removeOnCancel && heapIndex >= 0)
261 jsr166 1.42 remove(this);
262 dl 1.41 return cancelled;
263 dl 1.40 }
264    
265 dl 1.13 /**
266 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
267 jsr166 1.21 */
268 dl 1.1 public void run() {
269 dl 1.37 boolean periodic = isPeriodic();
270     if (!canRunInCurrentRunState(periodic))
271     cancel(false);
272     else if (!periodic)
273 dl 1.5 ScheduledFutureTask.super.run();
274 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
275     setNextRunTime();
276 jsr166 1.44 reExecutePeriodic(outerTask);
277 dl 1.37 }
278 dl 1.1 }
279     }
280    
281     /**
282 dl 1.37 * Returns true if can run a task given current run state
283 jsr166 1.39 * and run-after-shutdown parameters.
284     *
285 dl 1.37 * @param periodic true if this task periodic, false if delayed
286     */
287     boolean canRunInCurrentRunState(boolean periodic) {
288 jsr166 1.38 return isRunningOrShutdown(periodic ?
289 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
290     executeExistingDelayedTasksAfterShutdown);
291     }
292    
293     /**
294     * Main execution method for delayed or periodic tasks. If pool
295     * is shut down, rejects the task. Otherwise adds task to queue
296     * and starts a thread, if necessary, to run it. (We cannot
297     * prestart the thread to run the task because the task (probably)
298 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
299 dl 1.37 * is being added, cancel and remove it if required by state and
300 jsr166 1.39 * run-after-shutdown parameters.
301     *
302 dl 1.37 * @param task the task
303     */
304     private void delayedExecute(RunnableScheduledFuture<?> task) {
305     if (isShutdown())
306     reject(task);
307     else {
308     super.getQueue().add(task);
309     if (isShutdown() &&
310     !canRunInCurrentRunState(task.isPeriodic()) &&
311     remove(task))
312     task.cancel(false);
313 jsr166 1.48 else
314 dl 1.63 ensurePrestart();
315 dl 1.37 }
316     }
317 jsr166 1.21
318 dl 1.37 /**
319 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
320     * Same idea as delayedExecute except drops task rather than rejecting.
321     *
322 dl 1.37 * @param task the task
323     */
324     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
325     if (canRunInCurrentRunState(true)) {
326     super.getQueue().add(task);
327     if (!canRunInCurrentRunState(true) && remove(task))
328     task.cancel(false);
329 jsr166 1.48 else
330 dl 1.63 ensurePrestart();
331 dl 1.37 }
332 dl 1.13 }
333 dl 1.1
334 dl 1.13 /**
335 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
336 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
337 dl 1.13 */
338 dl 1.37 @Override void onShutdown() {
339     BlockingQueue<Runnable> q = super.getQueue();
340     boolean keepDelayed =
341     getExecuteExistingDelayedTasksAfterShutdownPolicy();
342     boolean keepPeriodic =
343     getContinueExistingPeriodicTasksAfterShutdownPolicy();
344 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
345     for (Object e : q.toArray())
346     if (e instanceof RunnableScheduledFuture<?>)
347     ((RunnableScheduledFuture<?>) e).cancel(false);
348 dl 1.37 q.clear();
349 jsr166 1.57 }
350 dl 1.37 else {
351     // Traverse snapshot to avoid iterator exceptions
352 jsr166 1.39 for (Object e : q.toArray()) {
353 dl 1.23 if (e instanceof RunnableScheduledFuture) {
354 dl 1.37 RunnableScheduledFuture<?> t =
355     (RunnableScheduledFuture<?>)e;
356 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
357     t.isCancelled()) { // also remove if already cancelled
358     if (q.remove(t))
359     t.cancel(false);
360     }
361 dl 1.13 }
362     }
363 dl 1.1 }
364 jsr166 1.48 tryTerminate();
365 dl 1.1 }
366    
367 dl 1.23 /**
368 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
369 jsr166 1.28 * This method can be used to override the concrete
370 dl 1.23 * class used for managing internal tasks.
371 jsr166 1.30 * The default implementation simply returns the given task.
372 jsr166 1.28 *
373 dl 1.23 * @param runnable the submitted Runnable
374     * @param task the task created to execute the runnable
375 jsr166 1.72 * @param <V> the type of the task's result
376 dl 1.23 * @return a task that can execute the runnable
377     * @since 1.6
378     */
379 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
380 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
381     return task;
382 peierls 1.22 }
383    
384 dl 1.23 /**
385 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
386 jsr166 1.28 * This method can be used to override the concrete
387 dl 1.23 * class used for managing internal tasks.
388 jsr166 1.30 * The default implementation simply returns the given task.
389 jsr166 1.28 *
390 dl 1.23 * @param callable the submitted Callable
391     * @param task the task created to execute the callable
392 jsr166 1.72 * @param <V> the type of the task's result
393 dl 1.23 * @return a task that can execute the callable
394     * @since 1.6
395     */
396 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
397 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
398     return task;
399 dl 1.19 }
400    
401 dl 1.1 /**
402 jsr166 1.78 * The default keep-alive time for pool threads.
403     *
404     * Normally, this value is unused because all pool threads will be
405     * core threads, but if a user creates a pool with a corePoolSize
406     * of zero (against our advice), we keep a thread alive as long as
407     * there are queued tasks. If the keep alive time is zero (the
408     * historic value), we end up hot-spinning in getTask, wasting a
409     * CPU. But on the other hand, if we set the value too high, and
410     * users create a one-shot pool which they don't cleanly shutdown,
411     * the pool's non-daemon threads will prevent JVM termination. A
412     * small but non-zero value (relative to a JVM's lifetime) seems
413     * best.
414     */
415     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
416    
417     /**
418 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
419     * given core pool size.
420 jsr166 1.21 *
421 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
422     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
423     * @throws IllegalArgumentException if {@code corePoolSize < 0}
424 dl 1.1 */
425     public ScheduledThreadPoolExecutor(int corePoolSize) {
426 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
427     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
428 dl 1.1 new DelayedWorkQueue());
429     }
430    
431     /**
432 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
433     * given initial parameters.
434 jsr166 1.21 *
435 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
436     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
437 dl 1.1 * @param threadFactory the factory to use when the executor
438 jsr166 1.39 * creates a new thread
439     * @throws IllegalArgumentException if {@code corePoolSize < 0}
440     * @throws NullPointerException if {@code threadFactory} is null
441 dl 1.1 */
442     public ScheduledThreadPoolExecutor(int corePoolSize,
443 jsr166 1.56 ThreadFactory threadFactory) {
444 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
445     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
446 dl 1.1 new DelayedWorkQueue(), threadFactory);
447     }
448    
449     /**
450 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
451     * given initial parameters.
452 jsr166 1.21 *
453 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
454     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
455 dl 1.1 * @param handler the handler to use when execution is blocked
456 jsr166 1.39 * because the thread bounds and queue capacities are reached
457     * @throws IllegalArgumentException if {@code corePoolSize < 0}
458     * @throws NullPointerException if {@code handler} is null
459 dl 1.1 */
460     public ScheduledThreadPoolExecutor(int corePoolSize,
461 jsr166 1.56 RejectedExecutionHandler handler) {
462 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
463     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
464 dl 1.1 new DelayedWorkQueue(), handler);
465     }
466    
467     /**
468 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
469     * given initial parameters.
470 jsr166 1.21 *
471 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
472     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
473 dl 1.1 * @param threadFactory the factory to use when the executor
474 jsr166 1.39 * creates a new thread
475 dl 1.1 * @param handler the handler to use when execution is blocked
476 jsr166 1.39 * because the thread bounds and queue capacities are reached
477     * @throws IllegalArgumentException if {@code corePoolSize < 0}
478     * @throws NullPointerException if {@code threadFactory} or
479     * {@code handler} is null
480 dl 1.1 */
481     public ScheduledThreadPoolExecutor(int corePoolSize,
482 jsr166 1.56 ThreadFactory threadFactory,
483     RejectedExecutionHandler handler) {
484 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
485     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
486 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
487     }
488    
489 dl 1.37 /**
490 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
491 jsr166 1.54 */
492     private long triggerTime(long delay, TimeUnit unit) {
493     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
494     }
495    
496     /**
497 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
498 dl 1.50 */
499 jsr166 1.54 long triggerTime(long delay) {
500     return now() +
501     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
502     }
503    
504     /**
505     * Constrains the values of all delays in the queue to be within
506     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
507     * This may occur if a task is eligible to be dequeued, but has
508     * not yet been, while some other task is added with a delay of
509     * Long.MAX_VALUE.
510     */
511     private long overflowFree(long delay) {
512     Delayed head = (Delayed) super.getQueue().peek();
513     if (head != null) {
514 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
515 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
516     delay = Long.MAX_VALUE + headDelay;
517     }
518     return delay;
519 dl 1.50 }
520    
521     /**
522 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
523     * @throws NullPointerException {@inheritDoc}
524     */
525 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
526     long delay,
527 dl 1.13 TimeUnit unit) {
528 dl 1.9 if (command == null || unit == null)
529 dl 1.1 throw new NullPointerException();
530 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
531 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
532     triggerTime(delay, unit)));
533 dl 1.1 delayedExecute(t);
534     return t;
535     }
536 jsr166 1.52
537 dl 1.37 /**
538     * @throws RejectedExecutionException {@inheritDoc}
539     * @throws NullPointerException {@inheritDoc}
540     */
541 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542     long delay,
543 dl 1.13 TimeUnit unit) {
544 dl 1.9 if (callable == null || unit == null)
545 dl 1.1 throw new NullPointerException();
546 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
547 jsr166 1.54 new ScheduledFutureTask<V>(callable,
548     triggerTime(delay, unit)));
549 dl 1.1 delayedExecute(t);
550     return t;
551     }
552    
553 dl 1.37 /**
554     * @throws RejectedExecutionException {@inheritDoc}
555     * @throws NullPointerException {@inheritDoc}
556     * @throws IllegalArgumentException {@inheritDoc}
557     */
558 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
559     long initialDelay,
560     long period,
561 dl 1.13 TimeUnit unit) {
562 dl 1.9 if (command == null || unit == null)
563 dl 1.1 throw new NullPointerException();
564     if (period <= 0)
565     throw new IllegalArgumentException();
566 jsr166 1.48 ScheduledFutureTask<Void> sft =
567     new ScheduledFutureTask<Void>(command,
568     null,
569 jsr166 1.54 triggerTime(initialDelay, unit),
570 jsr166 1.48 unit.toNanos(period));
571 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
572 jsr166 1.48 sft.outerTask = t;
573 dl 1.1 delayedExecute(t);
574     return t;
575     }
576 jsr166 1.21
577 dl 1.37 /**
578     * @throws RejectedExecutionException {@inheritDoc}
579     * @throws NullPointerException {@inheritDoc}
580     * @throws IllegalArgumentException {@inheritDoc}
581     */
582 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
583     long initialDelay,
584     long delay,
585 dl 1.13 TimeUnit unit) {
586 dl 1.9 if (command == null || unit == null)
587 dl 1.1 throw new NullPointerException();
588     if (delay <= 0)
589     throw new IllegalArgumentException();
590 jsr166 1.48 ScheduledFutureTask<Void> sft =
591     new ScheduledFutureTask<Void>(command,
592     null,
593 jsr166 1.54 triggerTime(initialDelay, unit),
594 jsr166 1.48 unit.toNanos(-delay));
595 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
596 jsr166 1.48 sft.outerTask = t;
597 dl 1.1 delayedExecute(t);
598     return t;
599     }
600 jsr166 1.21
601 dl 1.1 /**
602 jsr166 1.39 * Executes {@code command} with zero required delay.
603     * This has effect equivalent to
604     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
605     * Note that inspections of the queue and of the list returned by
606     * {@code shutdownNow} will access the zero-delayed
607     * {@link ScheduledFuture}, not the {@code command} itself.
608     *
609     * <p>A consequence of the use of {@code ScheduledFuture} objects is
610     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
611     * called with a null second {@code Throwable} argument, even if the
612     * {@code command} terminated abruptly. Instead, the {@code Throwable}
613     * thrown by such a task can be obtained via {@link Future#get}.
614 dl 1.1 *
615     * @throws RejectedExecutionException at discretion of
616 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
617     * cannot be accepted for execution because the
618     * executor has been shut down
619     * @throws NullPointerException {@inheritDoc}
620 dl 1.1 */
621     public void execute(Runnable command) {
622 jsr166 1.62 schedule(command, 0, NANOSECONDS);
623 dl 1.1 }
624    
625 dl 1.13 // Override AbstractExecutorService methods
626    
627 dl 1.37 /**
628     * @throws RejectedExecutionException {@inheritDoc}
629     * @throws NullPointerException {@inheritDoc}
630     */
631 dl 1.7 public Future<?> submit(Runnable task) {
632 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
633 dl 1.7 }
634    
635 dl 1.37 /**
636     * @throws RejectedExecutionException {@inheritDoc}
637     * @throws NullPointerException {@inheritDoc}
638     */
639 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
640 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
641 dl 1.7 }
642    
643 dl 1.37 /**
644     * @throws RejectedExecutionException {@inheritDoc}
645     * @throws NullPointerException {@inheritDoc}
646     */
647 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
648 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
649 dl 1.7 }
650 dl 1.1
651     /**
652 dl 1.37 * Sets the policy on whether to continue executing existing
653 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
654     * In this case, these tasks will only terminate upon
655     * {@code shutdownNow} or after setting the policy to
656     * {@code false} when already shutdown.
657     * This value is by default {@code false}.
658 jsr166 1.30 *
659 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
660 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
661 dl 1.1 */
662     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
663     continueExistingPeriodicTasksAfterShutdown = value;
664 jsr166 1.39 if (!value && isShutdown())
665 dl 1.37 onShutdown();
666 dl 1.1 }
667    
668     /**
669 jsr166 1.21 * Gets the policy on whether to continue executing existing
670 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
671     * In this case, these tasks will only terminate upon
672     * {@code shutdownNow} or after setting the policy to
673     * {@code false} when already shutdown.
674     * This value is by default {@code false}.
675 jsr166 1.30 *
676 jsr166 1.39 * @return {@code true} if will continue after shutdown
677 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
678 dl 1.1 */
679     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
680     return continueExistingPeriodicTasksAfterShutdown;
681     }
682    
683     /**
684 jsr166 1.21 * Sets the policy on whether to execute existing delayed
685 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
686     * In this case, these tasks will only terminate upon
687     * {@code shutdownNow}, or after setting the policy to
688     * {@code false} when already shutdown.
689     * This value is by default {@code true}.
690 jsr166 1.30 *
691 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
692 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
693 dl 1.1 */
694     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
695     executeExistingDelayedTasksAfterShutdown = value;
696 jsr166 1.39 if (!value && isShutdown())
697 dl 1.37 onShutdown();
698 dl 1.1 }
699    
700     /**
701 jsr166 1.21 * Gets the policy on whether to execute existing delayed
702 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
703     * In this case, these tasks will only terminate upon
704     * {@code shutdownNow}, or after setting the policy to
705     * {@code false} when already shutdown.
706     * This value is by default {@code true}.
707 jsr166 1.30 *
708 jsr166 1.39 * @return {@code true} if will execute after shutdown
709 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
710 dl 1.1 */
711     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
712     return executeExistingDelayedTasksAfterShutdown;
713     }
714    
715     /**
716 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
717     * removed from the work queue at time of cancellation. This value is
718     * by default {@code false}.
719 dl 1.41 *
720 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
721 dl 1.41 * @see #getRemoveOnCancelPolicy
722 jsr166 1.43 * @since 1.7
723 dl 1.41 */
724     public void setRemoveOnCancelPolicy(boolean value) {
725     removeOnCancel = value;
726     }
727    
728     /**
729 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
730     * removed from the work queue at time of cancellation. This value is
731     * by default {@code false}.
732 dl 1.41 *
733 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
734     * from the queue
735 dl 1.41 * @see #setRemoveOnCancelPolicy
736 jsr166 1.43 * @since 1.7
737 dl 1.41 */
738     public boolean getRemoveOnCancelPolicy() {
739     return removeOnCancel;
740     }
741    
742     /**
743 dl 1.1 * Initiates an orderly shutdown in which previously submitted
744 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
745     * Invocation has no additional effect if already shut down.
746     *
747     * <p>This method does not wait for previously submitted tasks to
748     * complete execution. Use {@link #awaitTermination awaitTermination}
749     * to do that.
750     *
751     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
752     * has been set {@code false}, existing delayed tasks whose delays
753     * have not yet elapsed are cancelled. And unless the {@code
754     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
755     * {@code true}, future executions of existing periodic tasks will
756     * be cancelled.
757 jsr166 1.39 *
758     * @throws SecurityException {@inheritDoc}
759 dl 1.1 */
760     public void shutdown() {
761     super.shutdown();
762     }
763    
764     /**
765     * Attempts to stop all actively executing tasks, halts the
766 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
767     * that were awaiting execution.
768 jsr166 1.21 *
769 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
770     * terminate. Use {@link #awaitTermination awaitTermination} to
771     * do that.
772     *
773 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
774 dl 1.18 * processing actively executing tasks. This implementation
775 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
776     * fails to respond to interrupts may never terminate.
777 dl 1.1 *
778 jsr166 1.39 * @return list of tasks that never commenced execution.
779 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
780     * For tasks submitted via one of the {@code schedule}
781     * methods, the element will be identical to the returned
782     * {@code ScheduledFuture}. For tasks submitted using
783 jsr166 1.84 * {@link #execute execute}, the element will be a
784     * zero-delay {@code ScheduledFuture}.
785 jsr166 1.31 * @throws SecurityException {@inheritDoc}
786 dl 1.1 */
787 tim 1.4 public List<Runnable> shutdownNow() {
788 dl 1.1 return super.shutdownNow();
789     }
790    
791     /**
792 jsr166 1.77 * Returns the task queue used by this executor.
793     * Each element of this list is a {@link ScheduledFuture}.
794     * For tasks submitted via one of the {@code schedule} methods, the
795     * element will be identical to the returned {@code ScheduledFuture}.
796 jsr166 1.84 * For tasks submitted using {@link #execute execute}, the element
797     * will be a zero-delay {@code ScheduledFuture}.
798 jsr166 1.77 *
799     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
800     * tasks in the order in which they will execute.
801 dl 1.1 *
802     * @return the task queue
803     */
804     public BlockingQueue<Runnable> getQueue() {
805     return super.getQueue();
806     }
807    
808 dl 1.13 /**
809 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
810     * class must be declared as a BlockingQueue<Runnable> even though
811 jsr166 1.42 * it can only hold RunnableScheduledFutures.
812 jsr166 1.21 */
813 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
814 dl 1.13 implements BlockingQueue<Runnable> {
815 jsr166 1.21
816 dl 1.40 /*
817     * A DelayedWorkQueue is based on a heap-based data structure
818     * like those in DelayQueue and PriorityQueue, except that
819     * every ScheduledFutureTask also records its index into the
820     * heap array. This eliminates the need to find a task upon
821     * cancellation, greatly speeding up removal (down from O(n)
822     * to O(log n)), and reducing garbage retention that would
823     * otherwise occur by waiting for the element to rise to top
824     * before clearing. But because the queue may also hold
825     * RunnableScheduledFutures that are not ScheduledFutureTasks,
826     * we are not guaranteed to have such indices available, in
827     * which case we fall back to linear search. (We expect that
828     * most tasks will not be decorated, and that the faster cases
829     * will be much more common.)
830     *
831     * All heap operations must record index changes -- mainly
832     * within siftUp and siftDown. Upon removal, a task's
833     * heapIndex is set to -1. Note that ScheduledFutureTasks can
834     * appear at most once in the queue (this need not be true for
835     * other kinds of tasks or work queues), so are uniquely
836     * identified by heapIndex.
837     */
838    
839 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
840 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
841     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
842 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
843 jsr166 1.80 private int size;
844 dl 1.40
845 jsr166 1.48 /**
846     * Thread designated to wait for the task at the head of the
847     * queue. This variant of the Leader-Follower pattern
848     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
849     * minimize unnecessary timed waiting. When a thread becomes
850     * the leader, it waits only for the next delay to elapse, but
851     * other threads await indefinitely. The leader thread must
852     * signal some other thread before returning from take() or
853     * poll(...), unless some other thread becomes leader in the
854     * interim. Whenever the head of the queue is replaced with a
855     * task with an earlier expiration time, the leader field is
856     * invalidated by being reset to null, and some waiting
857     * thread, but not necessarily the current leader, is
858     * signalled. So waiting threads must be prepared to acquire
859     * and lose leadership while waiting.
860     */
861 jsr166 1.80 private Thread leader;
862 jsr166 1.48
863     /**
864     * Condition signalled when a newer task becomes available at the
865     * head of the queue or a new thread may need to become leader.
866     */
867     private final Condition available = lock.newCondition();
868 dl 1.40
869     /**
870 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
871 dl 1.40 */
872 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
873 dl 1.40 if (f instanceof ScheduledFutureTask)
874     ((ScheduledFutureTask)f).heapIndex = idx;
875     }
876    
877     /**
878 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
879 dl 1.40 * Call only when holding lock.
880     */
881 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
882 dl 1.40 while (k > 0) {
883     int parent = (k - 1) >>> 1;
884 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
885 dl 1.40 if (key.compareTo(e) >= 0)
886     break;
887     queue[k] = e;
888     setIndex(e, k);
889     k = parent;
890     }
891     queue[k] = key;
892     setIndex(key, k);
893     }
894    
895     /**
896 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
897 dl 1.40 * Call only when holding lock.
898     */
899 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
900 jsr166 1.42 int half = size >>> 1;
901 dl 1.40 while (k < half) {
902 jsr166 1.42 int child = (k << 1) + 1;
903 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
904 dl 1.40 int right = child + 1;
905     if (right < size && c.compareTo(queue[right]) > 0)
906     c = queue[child = right];
907     if (key.compareTo(c) <= 0)
908     break;
909     queue[k] = c;
910     setIndex(c, k);
911     k = child;
912     }
913     queue[k] = key;
914     setIndex(key, k);
915     }
916    
917     /**
918 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
919 dl 1.40 */
920     private void grow() {
921     int oldCapacity = queue.length;
922     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
923     if (newCapacity < 0) // overflow
924     newCapacity = Integer.MAX_VALUE;
925     queue = Arrays.copyOf(queue, newCapacity);
926     }
927    
928     /**
929 jsr166 1.66 * Finds index of given object, or -1 if absent.
930 dl 1.40 */
931     private int indexOf(Object x) {
932     if (x != null) {
933 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
934     int i = ((ScheduledFutureTask) x).heapIndex;
935     // Sanity check; x could conceivably be a
936     // ScheduledFutureTask from some other pool.
937     if (i >= 0 && i < size && queue[i] == x)
938     return i;
939     } else {
940     for (int i = 0; i < size; i++)
941     if (x.equals(queue[i]))
942     return i;
943     }
944 dl 1.40 }
945     return -1;
946     }
947    
948 jsr166 1.48 public boolean contains(Object x) {
949     final ReentrantLock lock = this.lock;
950 jsr166 1.45 lock.lock();
951     try {
952 jsr166 1.48 return indexOf(x) != -1;
953 jsr166 1.45 } finally {
954     lock.unlock();
955     }
956 jsr166 1.48 }
957 jsr166 1.45
958 dl 1.40 public boolean remove(Object x) {
959     final ReentrantLock lock = this.lock;
960     lock.lock();
961     try {
962 jsr166 1.45 int i = indexOf(x);
963 jsr166 1.48 if (i < 0)
964     return false;
965 jsr166 1.45
966 jsr166 1.48 setIndex(queue[i], -1);
967     int s = --size;
968 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
969 jsr166 1.48 queue[s] = null;
970     if (s != i) {
971     siftDown(i, replacement);
972     if (queue[i] == replacement)
973     siftUp(i, replacement);
974     }
975     return true;
976 dl 1.40 } finally {
977     lock.unlock();
978     }
979     }
980    
981     public int size() {
982     final ReentrantLock lock = this.lock;
983     lock.lock();
984     try {
985 jsr166 1.45 return size;
986 dl 1.40 } finally {
987     lock.unlock();
988     }
989     }
990    
991 jsr166 1.42 public boolean isEmpty() {
992     return size() == 0;
993 dl 1.40 }
994    
995     public int remainingCapacity() {
996     return Integer.MAX_VALUE;
997     }
998    
999 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
1000 dl 1.40 final ReentrantLock lock = this.lock;
1001     lock.lock();
1002     try {
1003     return queue[0];
1004     } finally {
1005     lock.unlock();
1006     }
1007 dl 1.13 }
1008    
1009 dl 1.40 public boolean offer(Runnable x) {
1010     if (x == null)
1011     throw new NullPointerException();
1012 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1013 dl 1.40 final ReentrantLock lock = this.lock;
1014     lock.lock();
1015     try {
1016     int i = size;
1017     if (i >= queue.length)
1018     grow();
1019     size = i + 1;
1020     if (i == 0) {
1021     queue[0] = e;
1022     setIndex(e, 0);
1023 jsr166 1.45 } else {
1024 dl 1.40 siftUp(i, e);
1025     }
1026 jsr166 1.46 if (queue[0] == e) {
1027 jsr166 1.48 leader = null;
1028 jsr166 1.46 available.signal();
1029 jsr166 1.48 }
1030 dl 1.40 } finally {
1031     lock.unlock();
1032     }
1033     return true;
1034 jsr166 1.48 }
1035 dl 1.40
1036     public void put(Runnable e) {
1037     offer(e);
1038     }
1039    
1040     public boolean add(Runnable e) {
1041 jsr166 1.48 return offer(e);
1042     }
1043 dl 1.40
1044     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1045     return offer(e);
1046     }
1047 jsr166 1.42
1048 jsr166 1.46 /**
1049     * Performs common bookkeeping for poll and take: Replaces
1050 jsr166 1.47 * first element with last and sifts it down. Call only when
1051     * holding lock.
1052 jsr166 1.46 * @param f the task to remove and return
1053     */
1054 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1055 jsr166 1.46 int s = --size;
1056 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1057 jsr166 1.46 queue[s] = null;
1058     if (s != 0)
1059     siftDown(0, x);
1060     setIndex(f, -1);
1061     return f;
1062     }
1063    
1064 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1065 dl 1.40 final ReentrantLock lock = this.lock;
1066     lock.lock();
1067     try {
1068 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1069 jsr166 1.62 if (first == null || first.getDelay(NANOSECONDS) > 0)
1070 dl 1.40 return null;
1071 jsr166 1.42 else
1072 dl 1.40 return finishPoll(first);
1073     } finally {
1074     lock.unlock();
1075     }
1076     }
1077    
1078 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1079 dl 1.40 final ReentrantLock lock = this.lock;
1080     lock.lockInterruptibly();
1081     try {
1082     for (;;) {
1083 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1084 jsr166 1.42 if (first == null)
1085 dl 1.40 available.await();
1086     else {
1087 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1088 jsr166 1.48 if (delay <= 0)
1089     return finishPoll(first);
1090 jsr166 1.71 first = null; // don't retain ref while waiting
1091     if (leader != null)
1092 jsr166 1.48 available.await();
1093     else {
1094     Thread thisThread = Thread.currentThread();
1095     leader = thisThread;
1096     try {
1097     available.awaitNanos(delay);
1098     } finally {
1099     if (leader == thisThread)
1100     leader = null;
1101     }
1102     }
1103 dl 1.40 }
1104     }
1105     } finally {
1106 jsr166 1.48 if (leader == null && queue[0] != null)
1107     available.signal();
1108 dl 1.40 lock.unlock();
1109     }
1110     }
1111    
1112 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1113 dl 1.40 throws InterruptedException {
1114     long nanos = unit.toNanos(timeout);
1115     final ReentrantLock lock = this.lock;
1116     lock.lockInterruptibly();
1117     try {
1118     for (;;) {
1119 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1120 dl 1.40 if (first == null) {
1121     if (nanos <= 0)
1122     return null;
1123     else
1124     nanos = available.awaitNanos(nanos);
1125     } else {
1126 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1127 jsr166 1.48 if (delay <= 0)
1128 dl 1.40 return finishPoll(first);
1129 jsr166 1.48 if (nanos <= 0)
1130     return null;
1131 jsr166 1.71 first = null; // don't retain ref while waiting
1132 jsr166 1.48 if (nanos < delay || leader != null)
1133     nanos = available.awaitNanos(nanos);
1134     else {
1135     Thread thisThread = Thread.currentThread();
1136     leader = thisThread;
1137     try {
1138     long timeLeft = available.awaitNanos(delay);
1139     nanos -= delay - timeLeft;
1140     } finally {
1141     if (leader == thisThread)
1142     leader = null;
1143     }
1144     }
1145     }
1146     }
1147 dl 1.40 } finally {
1148 jsr166 1.48 if (leader == null && queue[0] != null)
1149     available.signal();
1150 dl 1.40 lock.unlock();
1151     }
1152     }
1153    
1154     public void clear() {
1155     final ReentrantLock lock = this.lock;
1156     lock.lock();
1157     try {
1158     for (int i = 0; i < size; i++) {
1159 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1160 dl 1.40 if (t != null) {
1161     queue[i] = null;
1162     setIndex(t, -1);
1163     }
1164     }
1165     size = 0;
1166     } finally {
1167     lock.unlock();
1168     }
1169 dl 1.13 }
1170 dl 1.40
1171     /**
1172 jsr166 1.66 * Returns first element only if it is expired.
1173 dl 1.40 * Used only by drainTo. Call only when holding lock.
1174     */
1175 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1176     // assert lock.isHeldByCurrentThread();
1177 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1178 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1179     null : first;
1180 dl 1.40 }
1181    
1182     public int drainTo(Collection<? super Runnable> c) {
1183     if (c == null)
1184     throw new NullPointerException();
1185     if (c == this)
1186     throw new IllegalArgumentException();
1187     final ReentrantLock lock = this.lock;
1188     lock.lock();
1189     try {
1190 jsr166 1.61 RunnableScheduledFuture<?> first;
1191 dl 1.40 int n = 0;
1192 jsr166 1.62 while ((first = peekExpired()) != null) {
1193     c.add(first); // In this order, in case add() throws.
1194     finishPoll(first);
1195 jsr166 1.48 ++n;
1196     }
1197 dl 1.40 return n;
1198     } finally {
1199     lock.unlock();
1200     }
1201 dl 1.13 }
1202    
1203 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1204 dl 1.40 if (c == null)
1205     throw new NullPointerException();
1206     if (c == this)
1207     throw new IllegalArgumentException();
1208     if (maxElements <= 0)
1209     return 0;
1210     final ReentrantLock lock = this.lock;
1211     lock.lock();
1212     try {
1213 jsr166 1.61 RunnableScheduledFuture<?> first;
1214 dl 1.40 int n = 0;
1215 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1216     c.add(first); // In this order, in case add() throws.
1217     finishPoll(first);
1218 jsr166 1.48 ++n;
1219     }
1220 dl 1.40 return n;
1221     } finally {
1222     lock.unlock();
1223     }
1224     }
1225    
1226     public Object[] toArray() {
1227     final ReentrantLock lock = this.lock;
1228     lock.lock();
1229     try {
1230 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1231 dl 1.40 } finally {
1232     lock.unlock();
1233     }
1234     }
1235    
1236 jsr166 1.48 @SuppressWarnings("unchecked")
1237 dl 1.40 public <T> T[] toArray(T[] a) {
1238     final ReentrantLock lock = this.lock;
1239     lock.lock();
1240     try {
1241     if (a.length < size)
1242     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1243     System.arraycopy(queue, 0, a, 0, size);
1244     if (a.length > size)
1245     a[size] = null;
1246     return a;
1247     } finally {
1248     lock.unlock();
1249     }
1250 dl 1.13 }
1251    
1252 jsr166 1.21 public Iterator<Runnable> iterator() {
1253 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1254 dl 1.40 }
1255 jsr166 1.42
1256 dl 1.40 /**
1257     * Snapshot iterator that works off copy of underlying q array.
1258     */
1259     private class Itr implements Iterator<Runnable> {
1260 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1261 jsr166 1.48 int cursor = 0; // index of next element to return
1262     int lastRet = -1; // index of last element, or -1 if no such
1263 jsr166 1.42
1264 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1265 dl 1.40 this.array = array;
1266     }
1267 jsr166 1.42
1268 dl 1.40 public boolean hasNext() {
1269     return cursor < array.length;
1270     }
1271 jsr166 1.42
1272 dl 1.40 public Runnable next() {
1273     if (cursor >= array.length)
1274     throw new NoSuchElementException();
1275     lastRet = cursor;
1276 jsr166 1.45 return array[cursor++];
1277 dl 1.40 }
1278 jsr166 1.42
1279 dl 1.40 public void remove() {
1280     if (lastRet < 0)
1281     throw new IllegalStateException();
1282     DelayedWorkQueue.this.remove(array[lastRet]);
1283     lastRet = -1;
1284     }
1285 dl 1.13 }
1286     }
1287 dl 1.1 }