ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.82
Committed: Wed Dec 3 21:55:44 2014 UTC (9 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.81: +7 -1 lines
Log Message:
never use wildcard imports

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 jsr166 1.78 import static java.util.concurrent.TimeUnit.MILLISECONDS;
11 jsr166 1.62 import java.util.concurrent.atomic.AtomicLong;
12     import java.util.concurrent.locks.Condition;
13     import java.util.concurrent.locks.ReentrantLock;
14 jsr166 1.82 import java.util.AbstractQueue;
15     import java.util.Arrays;
16     import java.util.Collection;
17     import java.util.Iterator;
18     import java.util.List;
19     import java.util.NoSuchElementException;
20 dl 1.1
21     /**
22 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
23 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
24     * This class is preferable to {@link java.util.Timer} when multiple
25     * worker threads are needed, or when the additional flexibility or
26     * capabilities of {@link ThreadPoolExecutor} (which this class
27     * extends) are required.
28 dl 1.1 *
29 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
30 dl 1.18 * without any real-time guarantees about when, after they are
31     * enabled, they will commence. Tasks scheduled for exactly the same
32     * execution time are enabled in first-in-first-out (FIFO) order of
33 jsr166 1.46 * submission.
34     *
35     * <p>When a submitted task is cancelled before it is run, execution
36 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
37     * automatically removed from the work queue until its delay elapses.
38     * While this enables further inspection and monitoring, it may also
39     * cause unbounded retention of cancelled tasks. To avoid this, use
40     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
41     * removed from the work queue at time of cancellation.
42 dl 1.1 *
43 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
44     * {@link #scheduleAtFixedRate} or
45     * {@link #scheduleWithFixedDelay} do not overlap. While different
46 dl 1.51 * executions may be performed by different threads, the effects of
47     * prior executions <a
48     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
49     * those of subsequent ones.
50     *
51 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
52 dl 1.8 * of the inherited tuning methods are not useful for it. In
53 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
54     * {@code corePoolSize} threads and an unbounded queue, adjustments
55     * to {@code maximumPoolSize} have no useful effect. Additionally, it
56     * is almost never a good idea to set {@code corePoolSize} to zero or
57     * use {@code allowCoreThreadTimeOut} because this may leave the pool
58     * without threads to handle tasks once they become eligible to run.
59 dl 1.1 *
60 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
61 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
62 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
63     * methods to generate internal {@link ScheduledFuture} objects to
64     * control per-task delays and scheduling. To preserve
65     * functionality, any further overrides of these methods in
66 dl 1.32 * subclasses must invoke superclass versions, which effectively
67 jsr166 1.39 * disables additional task customization. However, this class
68 dl 1.32 * provides alternative protected extension method
69 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
70     * {@code Callable}) that can be used to customize the concrete task
71     * types used to execute commands entered via {@code execute},
72     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
73     * and {@code scheduleWithFixedDelay}. By default, a
74     * {@code ScheduledThreadPoolExecutor} uses a task type extending
75 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
76     * subclasses of the form:
77     *
78 jsr166 1.39 * <pre> {@code
79 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
80     *
81 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
82 dl 1.23 *
83 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
84     * Runnable r, RunnableScheduledFuture<V> task) {
85     * return new CustomTask<V>(r, task);
86 jsr166 1.29 * }
87 dl 1.23 *
88 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
89     * Callable<V> c, RunnableScheduledFuture<V> task) {
90     * return new CustomTask<V>(c, task);
91 jsr166 1.29 * }
92     * // ... add constructors, etc.
93 jsr166 1.39 * }}</pre>
94     *
95 dl 1.1 * @since 1.5
96     * @author Doug Lea
97     */
98 jsr166 1.21 public class ScheduledThreadPoolExecutor
99     extends ThreadPoolExecutor
100 tim 1.3 implements ScheduledExecutorService {
101 dl 1.1
102 dl 1.37 /*
103     * This class specializes ThreadPoolExecutor implementation by
104     *
105     * 1. Using a custom task type, ScheduledFutureTask for
106     * tasks, even those that don't require scheduling (i.e.,
107     * those submitted using ExecutorService execute, not
108     * ScheduledExecutorService methods) which are treated as
109     * delayed tasks with a delay of zero.
110     *
111 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
112 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
113     * the fact that corePoolSize and maximumPoolSize are
114     * effectively identical simplifies some execution mechanics
115 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
116 dl 1.37 *
117     * 3. Supporting optional run-after-shutdown parameters, which
118     * leads to overrides of shutdown methods to remove and cancel
119     * tasks that should NOT be run after shutdown, as well as
120     * different recheck logic when task (re)submission overlaps
121     * with a shutdown.
122     *
123     * 4. Task decoration methods to allow interception and
124     * instrumentation, which are needed because subclasses cannot
125     * otherwise override submit methods to get this effect. These
126     * don't have any impact on pool control logic though.
127     */
128    
129 dl 1.1 /**
130     * False if should cancel/suppress periodic tasks on shutdown.
131     */
132     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
133    
134     /**
135     * False if should cancel non-periodic tasks on shutdown.
136     */
137     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
138    
139     /**
140 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
141 dl 1.41 */
142     private volatile boolean removeOnCancel = false;
143    
144     /**
145 dl 1.1 * Sequence number to break scheduling ties, and in turn to
146     * guarantee FIFO order among tied entries.
147     */
148 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
149 dl 1.14
150     /**
151 jsr166 1.39 * Returns current nanosecond time.
152 dl 1.14 */
153 jsr166 1.81 static final long now() {
154 jsr166 1.54 return System.nanoTime();
155 dl 1.14 }
156    
157 jsr166 1.21 private class ScheduledFutureTask<V>
158 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
159 jsr166 1.21
160 dl 1.1 /** Sequence number to break ties FIFO */
161     private final long sequenceNumber;
162 jsr166 1.44
163 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
164     private long time;
165 jsr166 1.44
166 dl 1.16 /**
167 jsr166 1.75 * Period in nanoseconds for repeating tasks.
168     * A positive value indicates fixed-rate execution.
169     * A negative value indicates fixed-delay execution.
170     * A value of 0 indicates a non-repeating (one-shot) task.
171 dl 1.16 */
172 dl 1.1 private final long period;
173    
174 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
175     RunnableScheduledFuture<V> outerTask = this;
176 jsr166 1.44
177 dl 1.1 /**
178 dl 1.40 * Index into delay queue, to support faster cancellation.
179     */
180     int heapIndex;
181    
182     /**
183 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
184 dl 1.1 */
185 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
186 dl 1.1 super(r, result);
187 jsr166 1.75 this.time = triggerTime;
188 dl 1.1 this.period = 0;
189     this.sequenceNumber = sequencer.getAndIncrement();
190     }
191    
192     /**
193 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
194     * trigger time and period.
195 dl 1.1 */
196 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197     long period) {
198 dl 1.1 super(r, result);
199 jsr166 1.75 this.time = triggerTime;
200 dl 1.1 this.period = period;
201     this.sequenceNumber = sequencer.getAndIncrement();
202     }
203    
204     /**
205 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
206 dl 1.1 */
207 jsr166 1.75 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
208 dl 1.1 super(callable);
209 jsr166 1.75 this.time = triggerTime;
210 dl 1.1 this.period = 0;
211     this.sequenceNumber = sequencer.getAndIncrement();
212     }
213    
214     public long getDelay(TimeUnit unit) {
215 jsr166 1.62 return unit.convert(time - now(), NANOSECONDS);
216 dl 1.1 }
217    
218 dl 1.20 public int compareTo(Delayed other) {
219 dl 1.59 if (other == this) // compare zero if same object
220 dl 1.1 return 0;
221 dl 1.34 if (other instanceof ScheduledFutureTask) {
222     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
223     long diff = time - x.time;
224     if (diff < 0)
225     return -1;
226     else if (diff > 0)
227     return 1;
228     else if (sequenceNumber < x.sequenceNumber)
229     return -1;
230     else
231     return 1;
232     }
233 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
234 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
235 dl 1.1 }
236    
237     /**
238 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
239 jsr166 1.30 *
240 jsr166 1.70 * @return {@code true} if periodic
241 dl 1.1 */
242 dl 1.23 public boolean isPeriodic() {
243 dl 1.16 return period != 0;
244 dl 1.1 }
245    
246     /**
247 jsr166 1.39 * Sets the next time to run for a periodic task.
248 dl 1.13 */
249 dl 1.37 private void setNextRunTime() {
250     long p = period;
251     if (p > 0)
252     time += p;
253     else
254 jsr166 1.54 time = triggerTime(-p);
255 dl 1.13 }
256    
257 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
258 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
259     if (cancelled && removeOnCancel && heapIndex >= 0)
260 jsr166 1.42 remove(this);
261 dl 1.41 return cancelled;
262 dl 1.40 }
263    
264 dl 1.13 /**
265 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
266 jsr166 1.21 */
267 dl 1.1 public void run() {
268 dl 1.37 boolean periodic = isPeriodic();
269     if (!canRunInCurrentRunState(periodic))
270     cancel(false);
271     else if (!periodic)
272 dl 1.5 ScheduledFutureTask.super.run();
273 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
274     setNextRunTime();
275 jsr166 1.44 reExecutePeriodic(outerTask);
276 dl 1.37 }
277 dl 1.1 }
278     }
279    
280     /**
281 dl 1.37 * Returns true if can run a task given current run state
282 jsr166 1.39 * and run-after-shutdown parameters.
283     *
284 dl 1.37 * @param periodic true if this task periodic, false if delayed
285     */
286     boolean canRunInCurrentRunState(boolean periodic) {
287 jsr166 1.38 return isRunningOrShutdown(periodic ?
288 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
289     executeExistingDelayedTasksAfterShutdown);
290     }
291    
292     /**
293     * Main execution method for delayed or periodic tasks. If pool
294     * is shut down, rejects the task. Otherwise adds task to queue
295     * and starts a thread, if necessary, to run it. (We cannot
296     * prestart the thread to run the task because the task (probably)
297 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
298 dl 1.37 * is being added, cancel and remove it if required by state and
299 jsr166 1.39 * run-after-shutdown parameters.
300     *
301 dl 1.37 * @param task the task
302     */
303     private void delayedExecute(RunnableScheduledFuture<?> task) {
304     if (isShutdown())
305     reject(task);
306     else {
307     super.getQueue().add(task);
308     if (isShutdown() &&
309     !canRunInCurrentRunState(task.isPeriodic()) &&
310     remove(task))
311     task.cancel(false);
312 jsr166 1.48 else
313 dl 1.63 ensurePrestart();
314 dl 1.37 }
315     }
316 jsr166 1.21
317 dl 1.37 /**
318 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
319     * Same idea as delayedExecute except drops task rather than rejecting.
320     *
321 dl 1.37 * @param task the task
322     */
323     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
324     if (canRunInCurrentRunState(true)) {
325     super.getQueue().add(task);
326     if (!canRunInCurrentRunState(true) && remove(task))
327     task.cancel(false);
328 jsr166 1.48 else
329 dl 1.63 ensurePrestart();
330 dl 1.37 }
331 dl 1.13 }
332 dl 1.1
333 dl 1.13 /**
334 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
335 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
336 dl 1.13 */
337 dl 1.37 @Override void onShutdown() {
338     BlockingQueue<Runnable> q = super.getQueue();
339     boolean keepDelayed =
340     getExecuteExistingDelayedTasksAfterShutdownPolicy();
341     boolean keepPeriodic =
342     getContinueExistingPeriodicTasksAfterShutdownPolicy();
343 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
344     for (Object e : q.toArray())
345     if (e instanceof RunnableScheduledFuture<?>)
346     ((RunnableScheduledFuture<?>) e).cancel(false);
347 dl 1.37 q.clear();
348 jsr166 1.57 }
349 dl 1.37 else {
350     // Traverse snapshot to avoid iterator exceptions
351 jsr166 1.39 for (Object e : q.toArray()) {
352 dl 1.23 if (e instanceof RunnableScheduledFuture) {
353 dl 1.37 RunnableScheduledFuture<?> t =
354     (RunnableScheduledFuture<?>)e;
355 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
356     t.isCancelled()) { // also remove if already cancelled
357     if (q.remove(t))
358     t.cancel(false);
359     }
360 dl 1.13 }
361     }
362 dl 1.1 }
363 jsr166 1.48 tryTerminate();
364 dl 1.1 }
365    
366 dl 1.23 /**
367 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
368 jsr166 1.28 * This method can be used to override the concrete
369 dl 1.23 * class used for managing internal tasks.
370 jsr166 1.30 * The default implementation simply returns the given task.
371 jsr166 1.28 *
372 dl 1.23 * @param runnable the submitted Runnable
373     * @param task the task created to execute the runnable
374 jsr166 1.72 * @param <V> the type of the task's result
375 dl 1.23 * @return a task that can execute the runnable
376     * @since 1.6
377     */
378 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
379 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
380     return task;
381 peierls 1.22 }
382    
383 dl 1.23 /**
384 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
385 jsr166 1.28 * This method can be used to override the concrete
386 dl 1.23 * class used for managing internal tasks.
387 jsr166 1.30 * The default implementation simply returns the given task.
388 jsr166 1.28 *
389 dl 1.23 * @param callable the submitted Callable
390     * @param task the task created to execute the callable
391 jsr166 1.72 * @param <V> the type of the task's result
392 dl 1.23 * @return a task that can execute the callable
393     * @since 1.6
394     */
395 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
396 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
397     return task;
398 dl 1.19 }
399    
400 dl 1.1 /**
401 jsr166 1.78 * The default keep-alive time for pool threads.
402     *
403     * Normally, this value is unused because all pool threads will be
404     * core threads, but if a user creates a pool with a corePoolSize
405     * of zero (against our advice), we keep a thread alive as long as
406     * there are queued tasks. If the keep alive time is zero (the
407     * historic value), we end up hot-spinning in getTask, wasting a
408     * CPU. But on the other hand, if we set the value too high, and
409     * users create a one-shot pool which they don't cleanly shutdown,
410     * the pool's non-daemon threads will prevent JVM termination. A
411     * small but non-zero value (relative to a JVM's lifetime) seems
412     * best.
413     */
414     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
415    
416     /**
417 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
418     * given core pool size.
419 jsr166 1.21 *
420 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
421     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
422     * @throws IllegalArgumentException if {@code corePoolSize < 0}
423 dl 1.1 */
424     public ScheduledThreadPoolExecutor(int corePoolSize) {
425 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
426     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
427 dl 1.1 new DelayedWorkQueue());
428     }
429    
430     /**
431 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
432     * given initial parameters.
433 jsr166 1.21 *
434 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
435     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
436 dl 1.1 * @param threadFactory the factory to use when the executor
437 jsr166 1.39 * creates a new thread
438     * @throws IllegalArgumentException if {@code corePoolSize < 0}
439     * @throws NullPointerException if {@code threadFactory} is null
440 dl 1.1 */
441     public ScheduledThreadPoolExecutor(int corePoolSize,
442 jsr166 1.56 ThreadFactory threadFactory) {
443 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
444     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
445 dl 1.1 new DelayedWorkQueue(), threadFactory);
446     }
447    
448     /**
449 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
450     * given initial parameters.
451 jsr166 1.21 *
452 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
453     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 dl 1.1 * @param handler the handler to use when execution is blocked
455 jsr166 1.39 * because the thread bounds and queue capacities are reached
456     * @throws IllegalArgumentException if {@code corePoolSize < 0}
457     * @throws NullPointerException if {@code handler} is null
458 dl 1.1 */
459     public ScheduledThreadPoolExecutor(int corePoolSize,
460 jsr166 1.56 RejectedExecutionHandler handler) {
461 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
462     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463 dl 1.1 new DelayedWorkQueue(), handler);
464     }
465    
466     /**
467 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
468     * given initial parameters.
469 jsr166 1.21 *
470 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
471     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
472 dl 1.1 * @param threadFactory the factory to use when the executor
473 jsr166 1.39 * creates a new thread
474 dl 1.1 * @param handler the handler to use when execution is blocked
475 jsr166 1.39 * because the thread bounds and queue capacities are reached
476     * @throws IllegalArgumentException if {@code corePoolSize < 0}
477     * @throws NullPointerException if {@code threadFactory} or
478     * {@code handler} is null
479 dl 1.1 */
480     public ScheduledThreadPoolExecutor(int corePoolSize,
481 jsr166 1.56 ThreadFactory threadFactory,
482     RejectedExecutionHandler handler) {
483 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
484     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
485 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
486     }
487    
488 dl 1.37 /**
489 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
490 jsr166 1.54 */
491     private long triggerTime(long delay, TimeUnit unit) {
492     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493     }
494    
495     /**
496 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
497 dl 1.50 */
498 jsr166 1.54 long triggerTime(long delay) {
499     return now() +
500     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501     }
502    
503     /**
504     * Constrains the values of all delays in the queue to be within
505     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506     * This may occur if a task is eligible to be dequeued, but has
507     * not yet been, while some other task is added with a delay of
508     * Long.MAX_VALUE.
509     */
510     private long overflowFree(long delay) {
511     Delayed head = (Delayed) super.getQueue().peek();
512     if (head != null) {
513 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
514 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
515     delay = Long.MAX_VALUE + headDelay;
516     }
517     return delay;
518 dl 1.50 }
519    
520     /**
521 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
522     * @throws NullPointerException {@inheritDoc}
523     */
524 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
525     long delay,
526 dl 1.13 TimeUnit unit) {
527 dl 1.9 if (command == null || unit == null)
528 dl 1.1 throw new NullPointerException();
529 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
530 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
531     triggerTime(delay, unit)));
532 dl 1.1 delayedExecute(t);
533     return t;
534     }
535 jsr166 1.52
536 dl 1.37 /**
537     * @throws RejectedExecutionException {@inheritDoc}
538     * @throws NullPointerException {@inheritDoc}
539     */
540 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
541     long delay,
542 dl 1.13 TimeUnit unit) {
543 dl 1.9 if (callable == null || unit == null)
544 dl 1.1 throw new NullPointerException();
545 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
546 jsr166 1.54 new ScheduledFutureTask<V>(callable,
547     triggerTime(delay, unit)));
548 dl 1.1 delayedExecute(t);
549     return t;
550     }
551    
552 dl 1.37 /**
553     * @throws RejectedExecutionException {@inheritDoc}
554     * @throws NullPointerException {@inheritDoc}
555     * @throws IllegalArgumentException {@inheritDoc}
556     */
557 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
558     long initialDelay,
559     long period,
560 dl 1.13 TimeUnit unit) {
561 dl 1.9 if (command == null || unit == null)
562 dl 1.1 throw new NullPointerException();
563     if (period <= 0)
564     throw new IllegalArgumentException();
565 jsr166 1.48 ScheduledFutureTask<Void> sft =
566     new ScheduledFutureTask<Void>(command,
567     null,
568 jsr166 1.54 triggerTime(initialDelay, unit),
569 jsr166 1.48 unit.toNanos(period));
570 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
571 jsr166 1.48 sft.outerTask = t;
572 dl 1.1 delayedExecute(t);
573     return t;
574     }
575 jsr166 1.21
576 dl 1.37 /**
577     * @throws RejectedExecutionException {@inheritDoc}
578     * @throws NullPointerException {@inheritDoc}
579     * @throws IllegalArgumentException {@inheritDoc}
580     */
581 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
582     long initialDelay,
583     long delay,
584 dl 1.13 TimeUnit unit) {
585 dl 1.9 if (command == null || unit == null)
586 dl 1.1 throw new NullPointerException();
587     if (delay <= 0)
588     throw new IllegalArgumentException();
589 jsr166 1.48 ScheduledFutureTask<Void> sft =
590     new ScheduledFutureTask<Void>(command,
591     null,
592 jsr166 1.54 triggerTime(initialDelay, unit),
593 jsr166 1.48 unit.toNanos(-delay));
594 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
595 jsr166 1.48 sft.outerTask = t;
596 dl 1.1 delayedExecute(t);
597     return t;
598     }
599 jsr166 1.21
600 dl 1.1 /**
601 jsr166 1.39 * Executes {@code command} with zero required delay.
602     * This has effect equivalent to
603     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
604     * Note that inspections of the queue and of the list returned by
605     * {@code shutdownNow} will access the zero-delayed
606     * {@link ScheduledFuture}, not the {@code command} itself.
607     *
608     * <p>A consequence of the use of {@code ScheduledFuture} objects is
609     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
610     * called with a null second {@code Throwable} argument, even if the
611     * {@code command} terminated abruptly. Instead, the {@code Throwable}
612     * thrown by such a task can be obtained via {@link Future#get}.
613 dl 1.1 *
614     * @throws RejectedExecutionException at discretion of
615 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
616     * cannot be accepted for execution because the
617     * executor has been shut down
618     * @throws NullPointerException {@inheritDoc}
619 dl 1.1 */
620     public void execute(Runnable command) {
621 jsr166 1.62 schedule(command, 0, NANOSECONDS);
622 dl 1.1 }
623    
624 dl 1.13 // Override AbstractExecutorService methods
625    
626 dl 1.37 /**
627     * @throws RejectedExecutionException {@inheritDoc}
628     * @throws NullPointerException {@inheritDoc}
629     */
630 dl 1.7 public Future<?> submit(Runnable task) {
631 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
632 dl 1.7 }
633    
634 dl 1.37 /**
635     * @throws RejectedExecutionException {@inheritDoc}
636     * @throws NullPointerException {@inheritDoc}
637     */
638 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
639 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
640 dl 1.7 }
641    
642 dl 1.37 /**
643     * @throws RejectedExecutionException {@inheritDoc}
644     * @throws NullPointerException {@inheritDoc}
645     */
646 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
647 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
648 dl 1.7 }
649 dl 1.1
650     /**
651 dl 1.37 * Sets the policy on whether to continue executing existing
652 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
653     * In this case, these tasks will only terminate upon
654     * {@code shutdownNow} or after setting the policy to
655     * {@code false} when already shutdown.
656     * This value is by default {@code false}.
657 jsr166 1.30 *
658 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
659 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
660 dl 1.1 */
661     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
662     continueExistingPeriodicTasksAfterShutdown = value;
663 jsr166 1.39 if (!value && isShutdown())
664 dl 1.37 onShutdown();
665 dl 1.1 }
666    
667     /**
668 jsr166 1.21 * Gets the policy on whether to continue executing existing
669 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
670     * In this case, these tasks will only terminate upon
671     * {@code shutdownNow} or after setting the policy to
672     * {@code false} when already shutdown.
673     * This value is by default {@code false}.
674 jsr166 1.30 *
675 jsr166 1.39 * @return {@code true} if will continue after shutdown
676 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
677 dl 1.1 */
678     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
679     return continueExistingPeriodicTasksAfterShutdown;
680     }
681    
682     /**
683 jsr166 1.21 * Sets the policy on whether to execute existing delayed
684 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
685     * In this case, these tasks will only terminate upon
686     * {@code shutdownNow}, or after setting the policy to
687     * {@code false} when already shutdown.
688     * This value is by default {@code true}.
689 jsr166 1.30 *
690 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
691 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
692 dl 1.1 */
693     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
694     executeExistingDelayedTasksAfterShutdown = value;
695 jsr166 1.39 if (!value && isShutdown())
696 dl 1.37 onShutdown();
697 dl 1.1 }
698    
699     /**
700 jsr166 1.21 * Gets the policy on whether to execute existing delayed
701 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
702     * In this case, these tasks will only terminate upon
703     * {@code shutdownNow}, or after setting the policy to
704     * {@code false} when already shutdown.
705     * This value is by default {@code true}.
706 jsr166 1.30 *
707 jsr166 1.39 * @return {@code true} if will execute after shutdown
708 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
709 dl 1.1 */
710     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
711     return executeExistingDelayedTasksAfterShutdown;
712     }
713    
714     /**
715 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
716     * removed from the work queue at time of cancellation. This value is
717     * by default {@code false}.
718 dl 1.41 *
719 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
720 dl 1.41 * @see #getRemoveOnCancelPolicy
721 jsr166 1.43 * @since 1.7
722 dl 1.41 */
723     public void setRemoveOnCancelPolicy(boolean value) {
724     removeOnCancel = value;
725     }
726    
727     /**
728 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
729     * removed from the work queue at time of cancellation. This value is
730     * by default {@code false}.
731 dl 1.41 *
732 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
733     * from the queue
734 dl 1.41 * @see #setRemoveOnCancelPolicy
735 jsr166 1.43 * @since 1.7
736 dl 1.41 */
737     public boolean getRemoveOnCancelPolicy() {
738     return removeOnCancel;
739     }
740    
741     /**
742 dl 1.1 * Initiates an orderly shutdown in which previously submitted
743 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
744     * Invocation has no additional effect if already shut down.
745     *
746     * <p>This method does not wait for previously submitted tasks to
747     * complete execution. Use {@link #awaitTermination awaitTermination}
748     * to do that.
749     *
750     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
751     * has been set {@code false}, existing delayed tasks whose delays
752     * have not yet elapsed are cancelled. And unless the {@code
753     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
754     * {@code true}, future executions of existing periodic tasks will
755     * be cancelled.
756 jsr166 1.39 *
757     * @throws SecurityException {@inheritDoc}
758 dl 1.1 */
759     public void shutdown() {
760     super.shutdown();
761     }
762    
763     /**
764     * Attempts to stop all actively executing tasks, halts the
765 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
766     * that were awaiting execution.
767 jsr166 1.21 *
768 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
769     * terminate. Use {@link #awaitTermination awaitTermination} to
770     * do that.
771     *
772 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
773 dl 1.18 * processing actively executing tasks. This implementation
774 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
775     * fails to respond to interrupts may never terminate.
776 dl 1.1 *
777 jsr166 1.39 * @return list of tasks that never commenced execution.
778 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
779     * For tasks submitted via one of the {@code schedule}
780     * methods, the element will be identical to the returned
781     * {@code ScheduledFuture}. For tasks submitted using
782     * {@link #execute}, the element will be a zero-delay {@code
783     * ScheduledFuture}.
784 jsr166 1.31 * @throws SecurityException {@inheritDoc}
785 dl 1.1 */
786 tim 1.4 public List<Runnable> shutdownNow() {
787 dl 1.1 return super.shutdownNow();
788     }
789    
790     /**
791 jsr166 1.77 * Returns the task queue used by this executor.
792     * Each element of this list is a {@link ScheduledFuture}.
793     * For tasks submitted via one of the {@code schedule} methods, the
794     * element will be identical to the returned {@code ScheduledFuture}.
795     * For tasks submitted using {@link #execute}, the element will be a
796     * zero-delay {@code ScheduledFuture}.
797     *
798     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
799     * tasks in the order in which they will execute.
800 dl 1.1 *
801     * @return the task queue
802     */
803     public BlockingQueue<Runnable> getQueue() {
804     return super.getQueue();
805     }
806    
807 dl 1.13 /**
808 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
809     * class must be declared as a BlockingQueue<Runnable> even though
810 jsr166 1.42 * it can only hold RunnableScheduledFutures.
811 jsr166 1.21 */
812 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
813 dl 1.13 implements BlockingQueue<Runnable> {
814 jsr166 1.21
815 dl 1.40 /*
816     * A DelayedWorkQueue is based on a heap-based data structure
817     * like those in DelayQueue and PriorityQueue, except that
818     * every ScheduledFutureTask also records its index into the
819     * heap array. This eliminates the need to find a task upon
820     * cancellation, greatly speeding up removal (down from O(n)
821     * to O(log n)), and reducing garbage retention that would
822     * otherwise occur by waiting for the element to rise to top
823     * before clearing. But because the queue may also hold
824     * RunnableScheduledFutures that are not ScheduledFutureTasks,
825     * we are not guaranteed to have such indices available, in
826     * which case we fall back to linear search. (We expect that
827     * most tasks will not be decorated, and that the faster cases
828     * will be much more common.)
829     *
830     * All heap operations must record index changes -- mainly
831     * within siftUp and siftDown. Upon removal, a task's
832     * heapIndex is set to -1. Note that ScheduledFutureTasks can
833     * appear at most once in the queue (this need not be true for
834     * other kinds of tasks or work queues), so are uniquely
835     * identified by heapIndex.
836     */
837    
838 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
839 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
840     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
841 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
842 jsr166 1.80 private int size;
843 dl 1.40
844 jsr166 1.48 /**
845     * Thread designated to wait for the task at the head of the
846     * queue. This variant of the Leader-Follower pattern
847     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
848     * minimize unnecessary timed waiting. When a thread becomes
849     * the leader, it waits only for the next delay to elapse, but
850     * other threads await indefinitely. The leader thread must
851     * signal some other thread before returning from take() or
852     * poll(...), unless some other thread becomes leader in the
853     * interim. Whenever the head of the queue is replaced with a
854     * task with an earlier expiration time, the leader field is
855     * invalidated by being reset to null, and some waiting
856     * thread, but not necessarily the current leader, is
857     * signalled. So waiting threads must be prepared to acquire
858     * and lose leadership while waiting.
859     */
860 jsr166 1.80 private Thread leader;
861 jsr166 1.48
862     /**
863     * Condition signalled when a newer task becomes available at the
864     * head of the queue or a new thread may need to become leader.
865     */
866     private final Condition available = lock.newCondition();
867 dl 1.40
868     /**
869 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
870 dl 1.40 */
871 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
872 dl 1.40 if (f instanceof ScheduledFutureTask)
873     ((ScheduledFutureTask)f).heapIndex = idx;
874     }
875    
876     /**
877 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
878 dl 1.40 * Call only when holding lock.
879     */
880 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
881 dl 1.40 while (k > 0) {
882     int parent = (k - 1) >>> 1;
883 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
884 dl 1.40 if (key.compareTo(e) >= 0)
885     break;
886     queue[k] = e;
887     setIndex(e, k);
888     k = parent;
889     }
890     queue[k] = key;
891     setIndex(key, k);
892     }
893    
894     /**
895 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
896 dl 1.40 * Call only when holding lock.
897     */
898 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
899 jsr166 1.42 int half = size >>> 1;
900 dl 1.40 while (k < half) {
901 jsr166 1.42 int child = (k << 1) + 1;
902 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
903 dl 1.40 int right = child + 1;
904     if (right < size && c.compareTo(queue[right]) > 0)
905     c = queue[child = right];
906     if (key.compareTo(c) <= 0)
907     break;
908     queue[k] = c;
909     setIndex(c, k);
910     k = child;
911     }
912     queue[k] = key;
913     setIndex(key, k);
914     }
915    
916     /**
917 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
918 dl 1.40 */
919     private void grow() {
920     int oldCapacity = queue.length;
921     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
922     if (newCapacity < 0) // overflow
923     newCapacity = Integer.MAX_VALUE;
924     queue = Arrays.copyOf(queue, newCapacity);
925     }
926    
927     /**
928 jsr166 1.66 * Finds index of given object, or -1 if absent.
929 dl 1.40 */
930     private int indexOf(Object x) {
931     if (x != null) {
932 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
933     int i = ((ScheduledFutureTask) x).heapIndex;
934     // Sanity check; x could conceivably be a
935     // ScheduledFutureTask from some other pool.
936     if (i >= 0 && i < size && queue[i] == x)
937     return i;
938     } else {
939     for (int i = 0; i < size; i++)
940     if (x.equals(queue[i]))
941     return i;
942     }
943 dl 1.40 }
944     return -1;
945     }
946    
947 jsr166 1.48 public boolean contains(Object x) {
948     final ReentrantLock lock = this.lock;
949 jsr166 1.45 lock.lock();
950     try {
951 jsr166 1.48 return indexOf(x) != -1;
952 jsr166 1.45 } finally {
953     lock.unlock();
954     }
955 jsr166 1.48 }
956 jsr166 1.45
957 dl 1.40 public boolean remove(Object x) {
958     final ReentrantLock lock = this.lock;
959     lock.lock();
960     try {
961 jsr166 1.45 int i = indexOf(x);
962 jsr166 1.48 if (i < 0)
963     return false;
964 jsr166 1.45
965 jsr166 1.48 setIndex(queue[i], -1);
966     int s = --size;
967 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
968 jsr166 1.48 queue[s] = null;
969     if (s != i) {
970     siftDown(i, replacement);
971     if (queue[i] == replacement)
972     siftUp(i, replacement);
973     }
974     return true;
975 dl 1.40 } finally {
976     lock.unlock();
977     }
978     }
979    
980     public int size() {
981     final ReentrantLock lock = this.lock;
982     lock.lock();
983     try {
984 jsr166 1.45 return size;
985 dl 1.40 } finally {
986     lock.unlock();
987     }
988     }
989    
990 jsr166 1.42 public boolean isEmpty() {
991     return size() == 0;
992 dl 1.40 }
993    
994     public int remainingCapacity() {
995     return Integer.MAX_VALUE;
996     }
997    
998 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
999 dl 1.40 final ReentrantLock lock = this.lock;
1000     lock.lock();
1001     try {
1002     return queue[0];
1003     } finally {
1004     lock.unlock();
1005     }
1006 dl 1.13 }
1007    
1008 dl 1.40 public boolean offer(Runnable x) {
1009     if (x == null)
1010     throw new NullPointerException();
1011 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1012 dl 1.40 final ReentrantLock lock = this.lock;
1013     lock.lock();
1014     try {
1015     int i = size;
1016     if (i >= queue.length)
1017     grow();
1018     size = i + 1;
1019     if (i == 0) {
1020     queue[0] = e;
1021     setIndex(e, 0);
1022 jsr166 1.45 } else {
1023 dl 1.40 siftUp(i, e);
1024     }
1025 jsr166 1.46 if (queue[0] == e) {
1026 jsr166 1.48 leader = null;
1027 jsr166 1.46 available.signal();
1028 jsr166 1.48 }
1029 dl 1.40 } finally {
1030     lock.unlock();
1031     }
1032     return true;
1033 jsr166 1.48 }
1034 dl 1.40
1035     public void put(Runnable e) {
1036     offer(e);
1037     }
1038    
1039     public boolean add(Runnable e) {
1040 jsr166 1.48 return offer(e);
1041     }
1042 dl 1.40
1043     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1044     return offer(e);
1045     }
1046 jsr166 1.42
1047 jsr166 1.46 /**
1048     * Performs common bookkeeping for poll and take: Replaces
1049 jsr166 1.47 * first element with last and sifts it down. Call only when
1050     * holding lock.
1051 jsr166 1.46 * @param f the task to remove and return
1052     */
1053 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1054 jsr166 1.46 int s = --size;
1055 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1056 jsr166 1.46 queue[s] = null;
1057     if (s != 0)
1058     siftDown(0, x);
1059     setIndex(f, -1);
1060     return f;
1061     }
1062    
1063 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1064 dl 1.40 final ReentrantLock lock = this.lock;
1065     lock.lock();
1066     try {
1067 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1068 jsr166 1.62 if (first == null || first.getDelay(NANOSECONDS) > 0)
1069 dl 1.40 return null;
1070 jsr166 1.42 else
1071 dl 1.40 return finishPoll(first);
1072     } finally {
1073     lock.unlock();
1074     }
1075     }
1076    
1077 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1078 dl 1.40 final ReentrantLock lock = this.lock;
1079     lock.lockInterruptibly();
1080     try {
1081     for (;;) {
1082 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1083 jsr166 1.42 if (first == null)
1084 dl 1.40 available.await();
1085     else {
1086 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1087 jsr166 1.48 if (delay <= 0)
1088     return finishPoll(first);
1089 jsr166 1.71 first = null; // don't retain ref while waiting
1090     if (leader != null)
1091 jsr166 1.48 available.await();
1092     else {
1093     Thread thisThread = Thread.currentThread();
1094     leader = thisThread;
1095     try {
1096     available.awaitNanos(delay);
1097     } finally {
1098     if (leader == thisThread)
1099     leader = null;
1100     }
1101     }
1102 dl 1.40 }
1103     }
1104     } finally {
1105 jsr166 1.48 if (leader == null && queue[0] != null)
1106     available.signal();
1107 dl 1.40 lock.unlock();
1108     }
1109     }
1110    
1111 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1112 dl 1.40 throws InterruptedException {
1113     long nanos = unit.toNanos(timeout);
1114     final ReentrantLock lock = this.lock;
1115     lock.lockInterruptibly();
1116     try {
1117     for (;;) {
1118 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1119 dl 1.40 if (first == null) {
1120     if (nanos <= 0)
1121     return null;
1122     else
1123     nanos = available.awaitNanos(nanos);
1124     } else {
1125 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1126 jsr166 1.48 if (delay <= 0)
1127 dl 1.40 return finishPoll(first);
1128 jsr166 1.48 if (nanos <= 0)
1129     return null;
1130 jsr166 1.71 first = null; // don't retain ref while waiting
1131 jsr166 1.48 if (nanos < delay || leader != null)
1132     nanos = available.awaitNanos(nanos);
1133     else {
1134     Thread thisThread = Thread.currentThread();
1135     leader = thisThread;
1136     try {
1137     long timeLeft = available.awaitNanos(delay);
1138     nanos -= delay - timeLeft;
1139     } finally {
1140     if (leader == thisThread)
1141     leader = null;
1142     }
1143     }
1144     }
1145     }
1146 dl 1.40 } finally {
1147 jsr166 1.48 if (leader == null && queue[0] != null)
1148     available.signal();
1149 dl 1.40 lock.unlock();
1150     }
1151     }
1152    
1153     public void clear() {
1154     final ReentrantLock lock = this.lock;
1155     lock.lock();
1156     try {
1157     for (int i = 0; i < size; i++) {
1158 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1159 dl 1.40 if (t != null) {
1160     queue[i] = null;
1161     setIndex(t, -1);
1162     }
1163     }
1164     size = 0;
1165     } finally {
1166     lock.unlock();
1167     }
1168 dl 1.13 }
1169 dl 1.40
1170     /**
1171 jsr166 1.66 * Returns first element only if it is expired.
1172 dl 1.40 * Used only by drainTo. Call only when holding lock.
1173     */
1174 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1175     // assert lock.isHeldByCurrentThread();
1176 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1177 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1178     null : first;
1179 dl 1.40 }
1180    
1181     public int drainTo(Collection<? super Runnable> c) {
1182     if (c == null)
1183     throw new NullPointerException();
1184     if (c == this)
1185     throw new IllegalArgumentException();
1186     final ReentrantLock lock = this.lock;
1187     lock.lock();
1188     try {
1189 jsr166 1.61 RunnableScheduledFuture<?> first;
1190 dl 1.40 int n = 0;
1191 jsr166 1.62 while ((first = peekExpired()) != null) {
1192     c.add(first); // In this order, in case add() throws.
1193     finishPoll(first);
1194 jsr166 1.48 ++n;
1195     }
1196 dl 1.40 return n;
1197     } finally {
1198     lock.unlock();
1199     }
1200 dl 1.13 }
1201    
1202 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1203 dl 1.40 if (c == null)
1204     throw new NullPointerException();
1205     if (c == this)
1206     throw new IllegalArgumentException();
1207     if (maxElements <= 0)
1208     return 0;
1209     final ReentrantLock lock = this.lock;
1210     lock.lock();
1211     try {
1212 jsr166 1.61 RunnableScheduledFuture<?> first;
1213 dl 1.40 int n = 0;
1214 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1215     c.add(first); // In this order, in case add() throws.
1216     finishPoll(first);
1217 jsr166 1.48 ++n;
1218     }
1219 dl 1.40 return n;
1220     } finally {
1221     lock.unlock();
1222     }
1223     }
1224    
1225     public Object[] toArray() {
1226     final ReentrantLock lock = this.lock;
1227     lock.lock();
1228     try {
1229 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1230 dl 1.40 } finally {
1231     lock.unlock();
1232     }
1233     }
1234    
1235 jsr166 1.48 @SuppressWarnings("unchecked")
1236 dl 1.40 public <T> T[] toArray(T[] a) {
1237     final ReentrantLock lock = this.lock;
1238     lock.lock();
1239     try {
1240     if (a.length < size)
1241     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1242     System.arraycopy(queue, 0, a, 0, size);
1243     if (a.length > size)
1244     a[size] = null;
1245     return a;
1246     } finally {
1247     lock.unlock();
1248     }
1249 dl 1.13 }
1250    
1251 jsr166 1.21 public Iterator<Runnable> iterator() {
1252 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1253 dl 1.40 }
1254 jsr166 1.42
1255 dl 1.40 /**
1256     * Snapshot iterator that works off copy of underlying q array.
1257     */
1258     private class Itr implements Iterator<Runnable> {
1259 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1260 jsr166 1.48 int cursor = 0; // index of next element to return
1261     int lastRet = -1; // index of last element, or -1 if no such
1262 jsr166 1.42
1263 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1264 dl 1.40 this.array = array;
1265     }
1266 jsr166 1.42
1267 dl 1.40 public boolean hasNext() {
1268     return cursor < array.length;
1269     }
1270 jsr166 1.42
1271 dl 1.40 public Runnable next() {
1272     if (cursor >= array.length)
1273     throw new NoSuchElementException();
1274     lastRet = cursor;
1275 jsr166 1.45 return array[cursor++];
1276 dl 1.40 }
1277 jsr166 1.42
1278 dl 1.40 public void remove() {
1279     if (lastRet < 0)
1280     throw new IllegalStateException();
1281     DelayedWorkQueue.this.remove(array[lastRet]);
1282     lastRet = -1;
1283     }
1284 dl 1.13 }
1285     }
1286 dl 1.1 }