ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.80
Committed: Fri Apr 11 21:15:44 2014 UTC (10 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.79: +2 -2 lines
Log Message:
remove redundant initializations to null or 0

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 jsr166 1.78 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import java.util.concurrent.atomic.AtomicLong;
11     import java.util.concurrent.locks.Condition;
12     import java.util.concurrent.locks.ReentrantLock;
13 dl 1.1 import java.util.*;
14    
15     /**
16 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
17 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
18     * This class is preferable to {@link java.util.Timer} when multiple
19     * worker threads are needed, or when the additional flexibility or
20     * capabilities of {@link ThreadPoolExecutor} (which this class
21     * extends) are required.
22 dl 1.1 *
23 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
24 dl 1.18 * without any real-time guarantees about when, after they are
25     * enabled, they will commence. Tasks scheduled for exactly the same
26     * execution time are enabled in first-in-first-out (FIFO) order of
27 jsr166 1.46 * submission.
28     *
29     * <p>When a submitted task is cancelled before it is run, execution
30 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
31     * automatically removed from the work queue until its delay elapses.
32     * While this enables further inspection and monitoring, it may also
33     * cause unbounded retention of cancelled tasks. To avoid this, use
34     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
35     * removed from the work queue at time of cancellation.
36 dl 1.1 *
37 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
38     * {@link #scheduleAtFixedRate} or
39     * {@link #scheduleWithFixedDelay} do not overlap. While different
40 dl 1.51 * executions may be performed by different threads, the effects of
41     * prior executions <a
42     * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
43     * those of subsequent ones.
44     *
45 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46 dl 1.8 * of the inherited tuning methods are not useful for it. In
47 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
48     * {@code corePoolSize} threads and an unbounded queue, adjustments
49     * to {@code maximumPoolSize} have no useful effect. Additionally, it
50     * is almost never a good idea to set {@code corePoolSize} to zero or
51     * use {@code allowCoreThreadTimeOut} because this may leave the pool
52     * without threads to handle tasks once they become eligible to run.
53 dl 1.1 *
54 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
55 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
56 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
57     * methods to generate internal {@link ScheduledFuture} objects to
58     * control per-task delays and scheduling. To preserve
59     * functionality, any further overrides of these methods in
60 dl 1.32 * subclasses must invoke superclass versions, which effectively
61 jsr166 1.39 * disables additional task customization. However, this class
62 dl 1.32 * provides alternative protected extension method
63 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
64     * {@code Callable}) that can be used to customize the concrete task
65     * types used to execute commands entered via {@code execute},
66     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
67     * and {@code scheduleWithFixedDelay}. By default, a
68     * {@code ScheduledThreadPoolExecutor} uses a task type extending
69 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
70     * subclasses of the form:
71     *
72 jsr166 1.39 * <pre> {@code
73 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
74     *
75 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
76 dl 1.23 *
77 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
78     * Runnable r, RunnableScheduledFuture<V> task) {
79     * return new CustomTask<V>(r, task);
80 jsr166 1.29 * }
81 dl 1.23 *
82 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
83     * Callable<V> c, RunnableScheduledFuture<V> task) {
84     * return new CustomTask<V>(c, task);
85 jsr166 1.29 * }
86     * // ... add constructors, etc.
87 jsr166 1.39 * }}</pre>
88     *
89 dl 1.1 * @since 1.5
90     * @author Doug Lea
91     */
92 jsr166 1.21 public class ScheduledThreadPoolExecutor
93     extends ThreadPoolExecutor
94 tim 1.3 implements ScheduledExecutorService {
95 dl 1.1
96 dl 1.37 /*
97     * This class specializes ThreadPoolExecutor implementation by
98     *
99     * 1. Using a custom task type, ScheduledFutureTask for
100     * tasks, even those that don't require scheduling (i.e.,
101     * those submitted using ExecutorService execute, not
102     * ScheduledExecutorService methods) which are treated as
103     * delayed tasks with a delay of zero.
104     *
105 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
106 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
107     * the fact that corePoolSize and maximumPoolSize are
108     * effectively identical simplifies some execution mechanics
109 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
110 dl 1.37 *
111     * 3. Supporting optional run-after-shutdown parameters, which
112     * leads to overrides of shutdown methods to remove and cancel
113     * tasks that should NOT be run after shutdown, as well as
114     * different recheck logic when task (re)submission overlaps
115     * with a shutdown.
116     *
117     * 4. Task decoration methods to allow interception and
118     * instrumentation, which are needed because subclasses cannot
119     * otherwise override submit methods to get this effect. These
120     * don't have any impact on pool control logic though.
121     */
122    
123 dl 1.1 /**
124     * False if should cancel/suppress periodic tasks on shutdown.
125     */
126     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
127    
128     /**
129     * False if should cancel non-periodic tasks on shutdown.
130     */
131     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
132    
133     /**
134 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
135 dl 1.41 */
136     private volatile boolean removeOnCancel = false;
137    
138     /**
139 dl 1.1 * Sequence number to break scheduling ties, and in turn to
140     * guarantee FIFO order among tied entries.
141     */
142 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
143 dl 1.14
144     /**
145 jsr166 1.39 * Returns current nanosecond time.
146 dl 1.14 */
147 jsr166 1.54 final long now() {
148     return System.nanoTime();
149 dl 1.14 }
150    
151 jsr166 1.21 private class ScheduledFutureTask<V>
152 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
153 jsr166 1.21
154 dl 1.1 /** Sequence number to break ties FIFO */
155     private final long sequenceNumber;
156 jsr166 1.44
157 dl 1.1 /** The time the task is enabled to execute in nanoTime units */
158     private long time;
159 jsr166 1.44
160 dl 1.16 /**
161 jsr166 1.75 * Period in nanoseconds for repeating tasks.
162     * A positive value indicates fixed-rate execution.
163     * A negative value indicates fixed-delay execution.
164     * A value of 0 indicates a non-repeating (one-shot) task.
165 dl 1.16 */
166 dl 1.1 private final long period;
167    
168 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
169     RunnableScheduledFuture<V> outerTask = this;
170 jsr166 1.44
171 dl 1.1 /**
172 dl 1.40 * Index into delay queue, to support faster cancellation.
173     */
174     int heapIndex;
175    
176     /**
177 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
178 dl 1.1 */
179 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
180 dl 1.1 super(r, result);
181 jsr166 1.75 this.time = triggerTime;
182 dl 1.1 this.period = 0;
183     this.sequenceNumber = sequencer.getAndIncrement();
184     }
185    
186     /**
187 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
188     * trigger time and period.
189 dl 1.1 */
190 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
191     long period) {
192 dl 1.1 super(r, result);
193 jsr166 1.75 this.time = triggerTime;
194 dl 1.1 this.period = period;
195     this.sequenceNumber = sequencer.getAndIncrement();
196     }
197    
198     /**
199 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
200 dl 1.1 */
201 jsr166 1.75 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
202 dl 1.1 super(callable);
203 jsr166 1.75 this.time = triggerTime;
204 dl 1.1 this.period = 0;
205     this.sequenceNumber = sequencer.getAndIncrement();
206     }
207    
208     public long getDelay(TimeUnit unit) {
209 jsr166 1.62 return unit.convert(time - now(), NANOSECONDS);
210 dl 1.1 }
211    
212 dl 1.20 public int compareTo(Delayed other) {
213 dl 1.59 if (other == this) // compare zero if same object
214 dl 1.1 return 0;
215 dl 1.34 if (other instanceof ScheduledFutureTask) {
216     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
217     long diff = time - x.time;
218     if (diff < 0)
219     return -1;
220     else if (diff > 0)
221     return 1;
222     else if (sequenceNumber < x.sequenceNumber)
223     return -1;
224     else
225     return 1;
226     }
227 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
228 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
229 dl 1.1 }
230    
231     /**
232 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
233 jsr166 1.30 *
234 jsr166 1.70 * @return {@code true} if periodic
235 dl 1.1 */
236 dl 1.23 public boolean isPeriodic() {
237 dl 1.16 return period != 0;
238 dl 1.1 }
239    
240     /**
241 jsr166 1.39 * Sets the next time to run for a periodic task.
242 dl 1.13 */
243 dl 1.37 private void setNextRunTime() {
244     long p = period;
245     if (p > 0)
246     time += p;
247     else
248 jsr166 1.54 time = triggerTime(-p);
249 dl 1.13 }
250    
251 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
252 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
253     if (cancelled && removeOnCancel && heapIndex >= 0)
254 jsr166 1.42 remove(this);
255 dl 1.41 return cancelled;
256 dl 1.40 }
257    
258 dl 1.13 /**
259 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
260 jsr166 1.21 */
261 dl 1.1 public void run() {
262 dl 1.37 boolean periodic = isPeriodic();
263     if (!canRunInCurrentRunState(periodic))
264     cancel(false);
265     else if (!periodic)
266 dl 1.5 ScheduledFutureTask.super.run();
267 dl 1.37 else if (ScheduledFutureTask.super.runAndReset()) {
268     setNextRunTime();
269 jsr166 1.44 reExecutePeriodic(outerTask);
270 dl 1.37 }
271 dl 1.1 }
272     }
273    
274     /**
275 dl 1.37 * Returns true if can run a task given current run state
276 jsr166 1.39 * and run-after-shutdown parameters.
277     *
278 dl 1.37 * @param periodic true if this task periodic, false if delayed
279     */
280     boolean canRunInCurrentRunState(boolean periodic) {
281 jsr166 1.38 return isRunningOrShutdown(periodic ?
282 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
283     executeExistingDelayedTasksAfterShutdown);
284     }
285    
286     /**
287     * Main execution method for delayed or periodic tasks. If pool
288     * is shut down, rejects the task. Otherwise adds task to queue
289     * and starts a thread, if necessary, to run it. (We cannot
290     * prestart the thread to run the task because the task (probably)
291 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
292 dl 1.37 * is being added, cancel and remove it if required by state and
293 jsr166 1.39 * run-after-shutdown parameters.
294     *
295 dl 1.37 * @param task the task
296     */
297     private void delayedExecute(RunnableScheduledFuture<?> task) {
298     if (isShutdown())
299     reject(task);
300     else {
301     super.getQueue().add(task);
302     if (isShutdown() &&
303     !canRunInCurrentRunState(task.isPeriodic()) &&
304     remove(task))
305     task.cancel(false);
306 jsr166 1.48 else
307 dl 1.63 ensurePrestart();
308 dl 1.37 }
309     }
310 jsr166 1.21
311 dl 1.37 /**
312 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
313     * Same idea as delayedExecute except drops task rather than rejecting.
314     *
315 dl 1.37 * @param task the task
316     */
317     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
318     if (canRunInCurrentRunState(true)) {
319     super.getQueue().add(task);
320     if (!canRunInCurrentRunState(true) && remove(task))
321     task.cancel(false);
322 jsr166 1.48 else
323 dl 1.63 ensurePrestart();
324 dl 1.37 }
325 dl 1.13 }
326 dl 1.1
327 dl 1.13 /**
328 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
329 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
330 dl 1.13 */
331 dl 1.37 @Override void onShutdown() {
332     BlockingQueue<Runnable> q = super.getQueue();
333     boolean keepDelayed =
334     getExecuteExistingDelayedTasksAfterShutdownPolicy();
335     boolean keepPeriodic =
336     getContinueExistingPeriodicTasksAfterShutdownPolicy();
337 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
338     for (Object e : q.toArray())
339     if (e instanceof RunnableScheduledFuture<?>)
340     ((RunnableScheduledFuture<?>) e).cancel(false);
341 dl 1.37 q.clear();
342 jsr166 1.57 }
343 dl 1.37 else {
344     // Traverse snapshot to avoid iterator exceptions
345 jsr166 1.39 for (Object e : q.toArray()) {
346 dl 1.23 if (e instanceof RunnableScheduledFuture) {
347 dl 1.37 RunnableScheduledFuture<?> t =
348     (RunnableScheduledFuture<?>)e;
349 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
350     t.isCancelled()) { // also remove if already cancelled
351     if (q.remove(t))
352     t.cancel(false);
353     }
354 dl 1.13 }
355     }
356 dl 1.1 }
357 jsr166 1.48 tryTerminate();
358 dl 1.1 }
359    
360 dl 1.23 /**
361 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
362 jsr166 1.28 * This method can be used to override the concrete
363 dl 1.23 * class used for managing internal tasks.
364 jsr166 1.30 * The default implementation simply returns the given task.
365 jsr166 1.28 *
366 dl 1.23 * @param runnable the submitted Runnable
367     * @param task the task created to execute the runnable
368 jsr166 1.72 * @param <V> the type of the task's result
369 dl 1.23 * @return a task that can execute the runnable
370     * @since 1.6
371     */
372 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
373 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
374     return task;
375 peierls 1.22 }
376    
377 dl 1.23 /**
378 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
379 jsr166 1.28 * This method can be used to override the concrete
380 dl 1.23 * class used for managing internal tasks.
381 jsr166 1.30 * The default implementation simply returns the given task.
382 jsr166 1.28 *
383 dl 1.23 * @param callable the submitted Callable
384     * @param task the task created to execute the callable
385 jsr166 1.72 * @param <V> the type of the task's result
386 dl 1.23 * @return a task that can execute the callable
387     * @since 1.6
388     */
389 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
390 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
391     return task;
392 dl 1.19 }
393    
394 dl 1.1 /**
395 jsr166 1.78 * The default keep-alive time for pool threads.
396     *
397     * Normally, this value is unused because all pool threads will be
398     * core threads, but if a user creates a pool with a corePoolSize
399     * of zero (against our advice), we keep a thread alive as long as
400     * there are queued tasks. If the keep alive time is zero (the
401     * historic value), we end up hot-spinning in getTask, wasting a
402     * CPU. But on the other hand, if we set the value too high, and
403     * users create a one-shot pool which they don't cleanly shutdown,
404     * the pool's non-daemon threads will prevent JVM termination. A
405     * small but non-zero value (relative to a JVM's lifetime) seems
406     * best.
407     */
408     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
409    
410     /**
411 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
412     * given core pool size.
413 jsr166 1.21 *
414 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
415     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
416     * @throws IllegalArgumentException if {@code corePoolSize < 0}
417 dl 1.1 */
418     public ScheduledThreadPoolExecutor(int corePoolSize) {
419 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
420     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
421 dl 1.1 new DelayedWorkQueue());
422     }
423    
424     /**
425 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
426     * given initial parameters.
427 jsr166 1.21 *
428 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
429     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
430 dl 1.1 * @param threadFactory the factory to use when the executor
431 jsr166 1.39 * creates a new thread
432     * @throws IllegalArgumentException if {@code corePoolSize < 0}
433     * @throws NullPointerException if {@code threadFactory} is null
434 dl 1.1 */
435     public ScheduledThreadPoolExecutor(int corePoolSize,
436 jsr166 1.56 ThreadFactory threadFactory) {
437 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
438     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
439 dl 1.1 new DelayedWorkQueue(), threadFactory);
440     }
441    
442     /**
443 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
444     * given initial parameters.
445 jsr166 1.21 *
446 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
447     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
448 dl 1.1 * @param handler the handler to use when execution is blocked
449 jsr166 1.39 * because the thread bounds and queue capacities are reached
450     * @throws IllegalArgumentException if {@code corePoolSize < 0}
451     * @throws NullPointerException if {@code handler} is null
452 dl 1.1 */
453     public ScheduledThreadPoolExecutor(int corePoolSize,
454 jsr166 1.56 RejectedExecutionHandler handler) {
455 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
456     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
457 dl 1.1 new DelayedWorkQueue(), handler);
458     }
459    
460     /**
461 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
462     * given initial parameters.
463 jsr166 1.21 *
464 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
465     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
466 dl 1.1 * @param threadFactory the factory to use when the executor
467 jsr166 1.39 * creates a new thread
468 dl 1.1 * @param handler the handler to use when execution is blocked
469 jsr166 1.39 * because the thread bounds and queue capacities are reached
470     * @throws IllegalArgumentException if {@code corePoolSize < 0}
471     * @throws NullPointerException if {@code threadFactory} or
472     * {@code handler} is null
473 dl 1.1 */
474     public ScheduledThreadPoolExecutor(int corePoolSize,
475 jsr166 1.56 ThreadFactory threadFactory,
476     RejectedExecutionHandler handler) {
477 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
478     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
479 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
480     }
481    
482 dl 1.37 /**
483 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
484 jsr166 1.54 */
485     private long triggerTime(long delay, TimeUnit unit) {
486     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
487     }
488    
489     /**
490 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
491 dl 1.50 */
492 jsr166 1.54 long triggerTime(long delay) {
493     return now() +
494     ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
495     }
496    
497     /**
498     * Constrains the values of all delays in the queue to be within
499     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
500     * This may occur if a task is eligible to be dequeued, but has
501     * not yet been, while some other task is added with a delay of
502     * Long.MAX_VALUE.
503     */
504     private long overflowFree(long delay) {
505     Delayed head = (Delayed) super.getQueue().peek();
506     if (head != null) {
507 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
508 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
509     delay = Long.MAX_VALUE + headDelay;
510     }
511     return delay;
512 dl 1.50 }
513    
514     /**
515 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
516     * @throws NullPointerException {@inheritDoc}
517     */
518 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
519     long delay,
520 dl 1.13 TimeUnit unit) {
521 dl 1.9 if (command == null || unit == null)
522 dl 1.1 throw new NullPointerException();
523 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
524 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
525     triggerTime(delay, unit)));
526 dl 1.1 delayedExecute(t);
527     return t;
528     }
529 jsr166 1.52
530 dl 1.37 /**
531     * @throws RejectedExecutionException {@inheritDoc}
532     * @throws NullPointerException {@inheritDoc}
533     */
534 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
535     long delay,
536 dl 1.13 TimeUnit unit) {
537 dl 1.9 if (callable == null || unit == null)
538 dl 1.1 throw new NullPointerException();
539 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
540 jsr166 1.54 new ScheduledFutureTask<V>(callable,
541     triggerTime(delay, unit)));
542 dl 1.1 delayedExecute(t);
543     return t;
544     }
545    
546 dl 1.37 /**
547     * @throws RejectedExecutionException {@inheritDoc}
548     * @throws NullPointerException {@inheritDoc}
549     * @throws IllegalArgumentException {@inheritDoc}
550     */
551 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
552     long initialDelay,
553     long period,
554 dl 1.13 TimeUnit unit) {
555 dl 1.9 if (command == null || unit == null)
556 dl 1.1 throw new NullPointerException();
557     if (period <= 0)
558     throw new IllegalArgumentException();
559 jsr166 1.48 ScheduledFutureTask<Void> sft =
560     new ScheduledFutureTask<Void>(command,
561     null,
562 jsr166 1.54 triggerTime(initialDelay, unit),
563 jsr166 1.48 unit.toNanos(period));
564 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
565 jsr166 1.48 sft.outerTask = t;
566 dl 1.1 delayedExecute(t);
567     return t;
568     }
569 jsr166 1.21
570 dl 1.37 /**
571     * @throws RejectedExecutionException {@inheritDoc}
572     * @throws NullPointerException {@inheritDoc}
573     * @throws IllegalArgumentException {@inheritDoc}
574     */
575 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
576     long initialDelay,
577     long delay,
578 dl 1.13 TimeUnit unit) {
579 dl 1.9 if (command == null || unit == null)
580 dl 1.1 throw new NullPointerException();
581     if (delay <= 0)
582     throw new IllegalArgumentException();
583 jsr166 1.48 ScheduledFutureTask<Void> sft =
584     new ScheduledFutureTask<Void>(command,
585     null,
586 jsr166 1.54 triggerTime(initialDelay, unit),
587 jsr166 1.48 unit.toNanos(-delay));
588 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
589 jsr166 1.48 sft.outerTask = t;
590 dl 1.1 delayedExecute(t);
591     return t;
592     }
593 jsr166 1.21
594 dl 1.1 /**
595 jsr166 1.39 * Executes {@code command} with zero required delay.
596     * This has effect equivalent to
597     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
598     * Note that inspections of the queue and of the list returned by
599     * {@code shutdownNow} will access the zero-delayed
600     * {@link ScheduledFuture}, not the {@code command} itself.
601     *
602     * <p>A consequence of the use of {@code ScheduledFuture} objects is
603     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
604     * called with a null second {@code Throwable} argument, even if the
605     * {@code command} terminated abruptly. Instead, the {@code Throwable}
606     * thrown by such a task can be obtained via {@link Future#get}.
607 dl 1.1 *
608     * @throws RejectedExecutionException at discretion of
609 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
610     * cannot be accepted for execution because the
611     * executor has been shut down
612     * @throws NullPointerException {@inheritDoc}
613 dl 1.1 */
614     public void execute(Runnable command) {
615 jsr166 1.62 schedule(command, 0, NANOSECONDS);
616 dl 1.1 }
617    
618 dl 1.13 // Override AbstractExecutorService methods
619    
620 dl 1.37 /**
621     * @throws RejectedExecutionException {@inheritDoc}
622     * @throws NullPointerException {@inheritDoc}
623     */
624 dl 1.7 public Future<?> submit(Runnable task) {
625 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
626 dl 1.7 }
627    
628 dl 1.37 /**
629     * @throws RejectedExecutionException {@inheritDoc}
630     * @throws NullPointerException {@inheritDoc}
631     */
632 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
633 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
634 dl 1.7 }
635    
636 dl 1.37 /**
637     * @throws RejectedExecutionException {@inheritDoc}
638     * @throws NullPointerException {@inheritDoc}
639     */
640 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
641 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
642 dl 1.7 }
643 dl 1.1
644     /**
645 dl 1.37 * Sets the policy on whether to continue executing existing
646 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
647     * In this case, these tasks will only terminate upon
648     * {@code shutdownNow} or after setting the policy to
649     * {@code false} when already shutdown.
650     * This value is by default {@code false}.
651 jsr166 1.30 *
652 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
653 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
654 dl 1.1 */
655     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
656     continueExistingPeriodicTasksAfterShutdown = value;
657 jsr166 1.39 if (!value && isShutdown())
658 dl 1.37 onShutdown();
659 dl 1.1 }
660    
661     /**
662 jsr166 1.21 * Gets the policy on whether to continue executing existing
663 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
664     * In this case, these tasks will only terminate upon
665     * {@code shutdownNow} or after setting the policy to
666     * {@code false} when already shutdown.
667     * This value is by default {@code false}.
668 jsr166 1.30 *
669 jsr166 1.39 * @return {@code true} if will continue after shutdown
670 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
671 dl 1.1 */
672     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
673     return continueExistingPeriodicTasksAfterShutdown;
674     }
675    
676     /**
677 jsr166 1.21 * Sets the policy on whether to execute existing delayed
678 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
679     * In this case, these tasks will only terminate upon
680     * {@code shutdownNow}, or after setting the policy to
681     * {@code false} when already shutdown.
682     * This value is by default {@code true}.
683 jsr166 1.30 *
684 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
685 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
686 dl 1.1 */
687     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
688     executeExistingDelayedTasksAfterShutdown = value;
689 jsr166 1.39 if (!value && isShutdown())
690 dl 1.37 onShutdown();
691 dl 1.1 }
692    
693     /**
694 jsr166 1.21 * Gets the policy on whether to execute existing delayed
695 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
696     * In this case, these tasks will only terminate upon
697     * {@code shutdownNow}, or after setting the policy to
698     * {@code false} when already shutdown.
699     * This value is by default {@code true}.
700 jsr166 1.30 *
701 jsr166 1.39 * @return {@code true} if will execute after shutdown
702 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
703 dl 1.1 */
704     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
705     return executeExistingDelayedTasksAfterShutdown;
706     }
707    
708     /**
709 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
710     * removed from the work queue at time of cancellation. This value is
711     * by default {@code false}.
712 dl 1.41 *
713 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
714 dl 1.41 * @see #getRemoveOnCancelPolicy
715 jsr166 1.43 * @since 1.7
716 dl 1.41 */
717     public void setRemoveOnCancelPolicy(boolean value) {
718     removeOnCancel = value;
719     }
720    
721     /**
722 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
723     * removed from the work queue at time of cancellation. This value is
724     * by default {@code false}.
725 dl 1.41 *
726 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
727     * from the queue
728 dl 1.41 * @see #setRemoveOnCancelPolicy
729 jsr166 1.43 * @since 1.7
730 dl 1.41 */
731     public boolean getRemoveOnCancelPolicy() {
732     return removeOnCancel;
733     }
734    
735     /**
736 dl 1.1 * Initiates an orderly shutdown in which previously submitted
737 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
738     * Invocation has no additional effect if already shut down.
739     *
740     * <p>This method does not wait for previously submitted tasks to
741     * complete execution. Use {@link #awaitTermination awaitTermination}
742     * to do that.
743     *
744     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
745     * has been set {@code false}, existing delayed tasks whose delays
746     * have not yet elapsed are cancelled. And unless the {@code
747     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
748     * {@code true}, future executions of existing periodic tasks will
749     * be cancelled.
750 jsr166 1.39 *
751     * @throws SecurityException {@inheritDoc}
752 dl 1.1 */
753     public void shutdown() {
754     super.shutdown();
755     }
756    
757     /**
758     * Attempts to stop all actively executing tasks, halts the
759 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
760     * that were awaiting execution.
761 jsr166 1.21 *
762 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
763     * terminate. Use {@link #awaitTermination awaitTermination} to
764     * do that.
765     *
766 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
767 dl 1.18 * processing actively executing tasks. This implementation
768 jsr166 1.31 * cancels tasks via {@link Thread#interrupt}, so any task that
769     * fails to respond to interrupts may never terminate.
770 dl 1.1 *
771 jsr166 1.39 * @return list of tasks that never commenced execution.
772 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
773     * For tasks submitted via one of the {@code schedule}
774     * methods, the element will be identical to the returned
775     * {@code ScheduledFuture}. For tasks submitted using
776     * {@link #execute}, the element will be a zero-delay {@code
777     * ScheduledFuture}.
778 jsr166 1.31 * @throws SecurityException {@inheritDoc}
779 dl 1.1 */
780 tim 1.4 public List<Runnable> shutdownNow() {
781 dl 1.1 return super.shutdownNow();
782     }
783    
784     /**
785 jsr166 1.77 * Returns the task queue used by this executor.
786     * Each element of this list is a {@link ScheduledFuture}.
787     * For tasks submitted via one of the {@code schedule} methods, the
788     * element will be identical to the returned {@code ScheduledFuture}.
789     * For tasks submitted using {@link #execute}, the element will be a
790     * zero-delay {@code ScheduledFuture}.
791     *
792     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
793     * tasks in the order in which they will execute.
794 dl 1.1 *
795     * @return the task queue
796     */
797     public BlockingQueue<Runnable> getQueue() {
798     return super.getQueue();
799     }
800    
801 dl 1.13 /**
802 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
803     * class must be declared as a BlockingQueue<Runnable> even though
804 jsr166 1.42 * it can only hold RunnableScheduledFutures.
805 jsr166 1.21 */
806 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
807 dl 1.13 implements BlockingQueue<Runnable> {
808 jsr166 1.21
809 dl 1.40 /*
810     * A DelayedWorkQueue is based on a heap-based data structure
811     * like those in DelayQueue and PriorityQueue, except that
812     * every ScheduledFutureTask also records its index into the
813     * heap array. This eliminates the need to find a task upon
814     * cancellation, greatly speeding up removal (down from O(n)
815     * to O(log n)), and reducing garbage retention that would
816     * otherwise occur by waiting for the element to rise to top
817     * before clearing. But because the queue may also hold
818     * RunnableScheduledFutures that are not ScheduledFutureTasks,
819     * we are not guaranteed to have such indices available, in
820     * which case we fall back to linear search. (We expect that
821     * most tasks will not be decorated, and that the faster cases
822     * will be much more common.)
823     *
824     * All heap operations must record index changes -- mainly
825     * within siftUp and siftDown. Upon removal, a task's
826     * heapIndex is set to -1. Note that ScheduledFutureTasks can
827     * appear at most once in the queue (this need not be true for
828     * other kinds of tasks or work queues), so are uniquely
829     * identified by heapIndex.
830     */
831    
832 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
833 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
834     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
835 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
836 jsr166 1.80 private int size;
837 dl 1.40
838 jsr166 1.48 /**
839     * Thread designated to wait for the task at the head of the
840     * queue. This variant of the Leader-Follower pattern
841     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
842     * minimize unnecessary timed waiting. When a thread becomes
843     * the leader, it waits only for the next delay to elapse, but
844     * other threads await indefinitely. The leader thread must
845     * signal some other thread before returning from take() or
846     * poll(...), unless some other thread becomes leader in the
847     * interim. Whenever the head of the queue is replaced with a
848     * task with an earlier expiration time, the leader field is
849     * invalidated by being reset to null, and some waiting
850     * thread, but not necessarily the current leader, is
851     * signalled. So waiting threads must be prepared to acquire
852     * and lose leadership while waiting.
853     */
854 jsr166 1.80 private Thread leader;
855 jsr166 1.48
856     /**
857     * Condition signalled when a newer task becomes available at the
858     * head of the queue or a new thread may need to become leader.
859     */
860     private final Condition available = lock.newCondition();
861 dl 1.40
862     /**
863 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
864 dl 1.40 */
865 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
866 dl 1.40 if (f instanceof ScheduledFutureTask)
867     ((ScheduledFutureTask)f).heapIndex = idx;
868     }
869    
870     /**
871 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
872 dl 1.40 * Call only when holding lock.
873     */
874 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
875 dl 1.40 while (k > 0) {
876     int parent = (k - 1) >>> 1;
877 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
878 dl 1.40 if (key.compareTo(e) >= 0)
879     break;
880     queue[k] = e;
881     setIndex(e, k);
882     k = parent;
883     }
884     queue[k] = key;
885     setIndex(key, k);
886     }
887    
888     /**
889 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
890 dl 1.40 * Call only when holding lock.
891     */
892 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
893 jsr166 1.42 int half = size >>> 1;
894 dl 1.40 while (k < half) {
895 jsr166 1.42 int child = (k << 1) + 1;
896 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
897 dl 1.40 int right = child + 1;
898     if (right < size && c.compareTo(queue[right]) > 0)
899     c = queue[child = right];
900     if (key.compareTo(c) <= 0)
901     break;
902     queue[k] = c;
903     setIndex(c, k);
904     k = child;
905     }
906     queue[k] = key;
907     setIndex(key, k);
908     }
909    
910     /**
911 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
912 dl 1.40 */
913     private void grow() {
914     int oldCapacity = queue.length;
915     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
916     if (newCapacity < 0) // overflow
917     newCapacity = Integer.MAX_VALUE;
918     queue = Arrays.copyOf(queue, newCapacity);
919     }
920    
921     /**
922 jsr166 1.66 * Finds index of given object, or -1 if absent.
923 dl 1.40 */
924     private int indexOf(Object x) {
925     if (x != null) {
926 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
927     int i = ((ScheduledFutureTask) x).heapIndex;
928     // Sanity check; x could conceivably be a
929     // ScheduledFutureTask from some other pool.
930     if (i >= 0 && i < size && queue[i] == x)
931     return i;
932     } else {
933     for (int i = 0; i < size; i++)
934     if (x.equals(queue[i]))
935     return i;
936     }
937 dl 1.40 }
938     return -1;
939     }
940    
941 jsr166 1.48 public boolean contains(Object x) {
942     final ReentrantLock lock = this.lock;
943 jsr166 1.45 lock.lock();
944     try {
945 jsr166 1.48 return indexOf(x) != -1;
946 jsr166 1.45 } finally {
947     lock.unlock();
948     }
949 jsr166 1.48 }
950 jsr166 1.45
951 dl 1.40 public boolean remove(Object x) {
952     final ReentrantLock lock = this.lock;
953     lock.lock();
954     try {
955 jsr166 1.45 int i = indexOf(x);
956 jsr166 1.48 if (i < 0)
957     return false;
958 jsr166 1.45
959 jsr166 1.48 setIndex(queue[i], -1);
960     int s = --size;
961 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
962 jsr166 1.48 queue[s] = null;
963     if (s != i) {
964     siftDown(i, replacement);
965     if (queue[i] == replacement)
966     siftUp(i, replacement);
967     }
968     return true;
969 dl 1.40 } finally {
970     lock.unlock();
971     }
972     }
973    
974     public int size() {
975     final ReentrantLock lock = this.lock;
976     lock.lock();
977     try {
978 jsr166 1.45 return size;
979 dl 1.40 } finally {
980     lock.unlock();
981     }
982     }
983    
984 jsr166 1.42 public boolean isEmpty() {
985     return size() == 0;
986 dl 1.40 }
987    
988     public int remainingCapacity() {
989     return Integer.MAX_VALUE;
990     }
991    
992 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
993 dl 1.40 final ReentrantLock lock = this.lock;
994     lock.lock();
995     try {
996     return queue[0];
997     } finally {
998     lock.unlock();
999     }
1000 dl 1.13 }
1001    
1002 dl 1.40 public boolean offer(Runnable x) {
1003     if (x == null)
1004     throw new NullPointerException();
1005 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1006 dl 1.40 final ReentrantLock lock = this.lock;
1007     lock.lock();
1008     try {
1009     int i = size;
1010     if (i >= queue.length)
1011     grow();
1012     size = i + 1;
1013     if (i == 0) {
1014     queue[0] = e;
1015     setIndex(e, 0);
1016 jsr166 1.45 } else {
1017 dl 1.40 siftUp(i, e);
1018     }
1019 jsr166 1.46 if (queue[0] == e) {
1020 jsr166 1.48 leader = null;
1021 jsr166 1.46 available.signal();
1022 jsr166 1.48 }
1023 dl 1.40 } finally {
1024     lock.unlock();
1025     }
1026     return true;
1027 jsr166 1.48 }
1028 dl 1.40
1029     public void put(Runnable e) {
1030     offer(e);
1031     }
1032    
1033     public boolean add(Runnable e) {
1034 jsr166 1.48 return offer(e);
1035     }
1036 dl 1.40
1037     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1038     return offer(e);
1039     }
1040 jsr166 1.42
1041 jsr166 1.46 /**
1042     * Performs common bookkeeping for poll and take: Replaces
1043 jsr166 1.47 * first element with last and sifts it down. Call only when
1044     * holding lock.
1045 jsr166 1.46 * @param f the task to remove and return
1046     */
1047 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1048 jsr166 1.46 int s = --size;
1049 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1050 jsr166 1.46 queue[s] = null;
1051     if (s != 0)
1052     siftDown(0, x);
1053     setIndex(f, -1);
1054     return f;
1055     }
1056    
1057 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1058 dl 1.40 final ReentrantLock lock = this.lock;
1059     lock.lock();
1060     try {
1061 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1062 jsr166 1.62 if (first == null || first.getDelay(NANOSECONDS) > 0)
1063 dl 1.40 return null;
1064 jsr166 1.42 else
1065 dl 1.40 return finishPoll(first);
1066     } finally {
1067     lock.unlock();
1068     }
1069     }
1070    
1071 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1072 dl 1.40 final ReentrantLock lock = this.lock;
1073     lock.lockInterruptibly();
1074     try {
1075     for (;;) {
1076 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1077 jsr166 1.42 if (first == null)
1078 dl 1.40 available.await();
1079     else {
1080 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1081 jsr166 1.48 if (delay <= 0)
1082     return finishPoll(first);
1083 jsr166 1.71 first = null; // don't retain ref while waiting
1084     if (leader != null)
1085 jsr166 1.48 available.await();
1086     else {
1087     Thread thisThread = Thread.currentThread();
1088     leader = thisThread;
1089     try {
1090     available.awaitNanos(delay);
1091     } finally {
1092     if (leader == thisThread)
1093     leader = null;
1094     }
1095     }
1096 dl 1.40 }
1097     }
1098     } finally {
1099 jsr166 1.48 if (leader == null && queue[0] != null)
1100     available.signal();
1101 dl 1.40 lock.unlock();
1102     }
1103     }
1104    
1105 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1106 dl 1.40 throws InterruptedException {
1107     long nanos = unit.toNanos(timeout);
1108     final ReentrantLock lock = this.lock;
1109     lock.lockInterruptibly();
1110     try {
1111     for (;;) {
1112 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1113 dl 1.40 if (first == null) {
1114     if (nanos <= 0)
1115     return null;
1116     else
1117     nanos = available.awaitNanos(nanos);
1118     } else {
1119 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1120 jsr166 1.48 if (delay <= 0)
1121 dl 1.40 return finishPoll(first);
1122 jsr166 1.48 if (nanos <= 0)
1123     return null;
1124 jsr166 1.71 first = null; // don't retain ref while waiting
1125 jsr166 1.48 if (nanos < delay || leader != null)
1126     nanos = available.awaitNanos(nanos);
1127     else {
1128     Thread thisThread = Thread.currentThread();
1129     leader = thisThread;
1130     try {
1131     long timeLeft = available.awaitNanos(delay);
1132     nanos -= delay - timeLeft;
1133     } finally {
1134     if (leader == thisThread)
1135     leader = null;
1136     }
1137     }
1138     }
1139     }
1140 dl 1.40 } finally {
1141 jsr166 1.48 if (leader == null && queue[0] != null)
1142     available.signal();
1143 dl 1.40 lock.unlock();
1144     }
1145     }
1146    
1147     public void clear() {
1148     final ReentrantLock lock = this.lock;
1149     lock.lock();
1150     try {
1151     for (int i = 0; i < size; i++) {
1152 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1153 dl 1.40 if (t != null) {
1154     queue[i] = null;
1155     setIndex(t, -1);
1156     }
1157     }
1158     size = 0;
1159     } finally {
1160     lock.unlock();
1161     }
1162 dl 1.13 }
1163 dl 1.40
1164     /**
1165 jsr166 1.66 * Returns first element only if it is expired.
1166 dl 1.40 * Used only by drainTo. Call only when holding lock.
1167     */
1168 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1169     // assert lock.isHeldByCurrentThread();
1170 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1171 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1172     null : first;
1173 dl 1.40 }
1174    
1175     public int drainTo(Collection<? super Runnable> c) {
1176     if (c == null)
1177     throw new NullPointerException();
1178     if (c == this)
1179     throw new IllegalArgumentException();
1180     final ReentrantLock lock = this.lock;
1181     lock.lock();
1182     try {
1183 jsr166 1.61 RunnableScheduledFuture<?> first;
1184 dl 1.40 int n = 0;
1185 jsr166 1.62 while ((first = peekExpired()) != null) {
1186     c.add(first); // In this order, in case add() throws.
1187     finishPoll(first);
1188 jsr166 1.48 ++n;
1189     }
1190 dl 1.40 return n;
1191     } finally {
1192     lock.unlock();
1193     }
1194 dl 1.13 }
1195    
1196 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1197 dl 1.40 if (c == null)
1198     throw new NullPointerException();
1199     if (c == this)
1200     throw new IllegalArgumentException();
1201     if (maxElements <= 0)
1202     return 0;
1203     final ReentrantLock lock = this.lock;
1204     lock.lock();
1205     try {
1206 jsr166 1.61 RunnableScheduledFuture<?> first;
1207 dl 1.40 int n = 0;
1208 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1209     c.add(first); // In this order, in case add() throws.
1210     finishPoll(first);
1211 jsr166 1.48 ++n;
1212     }
1213 dl 1.40 return n;
1214     } finally {
1215     lock.unlock();
1216     }
1217     }
1218    
1219     public Object[] toArray() {
1220     final ReentrantLock lock = this.lock;
1221     lock.lock();
1222     try {
1223 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1224 dl 1.40 } finally {
1225     lock.unlock();
1226     }
1227     }
1228    
1229 jsr166 1.48 @SuppressWarnings("unchecked")
1230 dl 1.40 public <T> T[] toArray(T[] a) {
1231     final ReentrantLock lock = this.lock;
1232     lock.lock();
1233     try {
1234     if (a.length < size)
1235     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1236     System.arraycopy(queue, 0, a, 0, size);
1237     if (a.length > size)
1238     a[size] = null;
1239     return a;
1240     } finally {
1241     lock.unlock();
1242     }
1243 dl 1.13 }
1244    
1245 jsr166 1.21 public Iterator<Runnable> iterator() {
1246 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1247 dl 1.40 }
1248 jsr166 1.42
1249 dl 1.40 /**
1250     * Snapshot iterator that works off copy of underlying q array.
1251     */
1252     private class Itr implements Iterator<Runnable> {
1253 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1254 jsr166 1.48 int cursor = 0; // index of next element to return
1255     int lastRet = -1; // index of last element, or -1 if no such
1256 jsr166 1.42
1257 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1258 dl 1.40 this.array = array;
1259     }
1260 jsr166 1.42
1261 dl 1.40 public boolean hasNext() {
1262     return cursor < array.length;
1263     }
1264 jsr166 1.42
1265 dl 1.40 public Runnable next() {
1266     if (cursor >= array.length)
1267     throw new NoSuchElementException();
1268     lastRet = cursor;
1269 jsr166 1.45 return array[cursor++];
1270 dl 1.40 }
1271 jsr166 1.42
1272 dl 1.40 public void remove() {
1273     if (lastRet < 0)
1274     throw new IllegalStateException();
1275     DelayedWorkQueue.this.remove(array[lastRet]);
1276     lastRet = -1;
1277     }
1278 dl 1.13 }
1279     }
1280 dl 1.1 }