ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.108
Committed: Tue Mar 28 23:09:10 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.107: +32 -6 lines
Log Message:
clarify behavior of periodic tasks

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.83 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.83
12 jsr166 1.82 import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 jsr166 1.106 import java.util.Objects;
19 jsr166 1.83 import java.util.concurrent.atomic.AtomicLong;
20     import java.util.concurrent.locks.Condition;
21     import java.util.concurrent.locks.ReentrantLock;
22 dl 1.1
23     /**
24 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
25 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
26     * This class is preferable to {@link java.util.Timer} when multiple
27     * worker threads are needed, or when the additional flexibility or
28     * capabilities of {@link ThreadPoolExecutor} (which this class
29     * extends) are required.
30 dl 1.1 *
31 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
32 dl 1.18 * without any real-time guarantees about when, after they are
33     * enabled, they will commence. Tasks scheduled for exactly the same
34     * execution time are enabled in first-in-first-out (FIFO) order of
35 jsr166 1.46 * submission.
36     *
37     * <p>When a submitted task is cancelled before it is run, execution
38 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
39     * automatically removed from the work queue until its delay elapses.
40     * While this enables further inspection and monitoring, it may also
41     * cause unbounded retention of cancelled tasks. To avoid this, use
42     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43     * removed from the work queue at time of cancellation.
44 dl 1.1 *
45 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
46 jsr166 1.84 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48     * do not overlap. While different executions may be performed by
49     * different threads, the effects of prior executions
50     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 dl 1.51 * those of subsequent ones.
52     *
53 jsr166 1.108 * <p>Calling {@link Future#get} on a periodic task's future will
54     * never return normally. If an execution of a periodic task
55     * throws an exception, further executions are suppressed and
56     * {@code get()} will throw an {@link ExecutionException} holding the
57     * exception as its cause. Otherwise, {@code get()} will block and
58     * executions continue indefinitely until the task is cancelled, when
59     * it will throw {@link CancellationException}. Such cancellations
60     * occur when the future is cancelled, on {@link #shutdownNow}, or on
61     * {@link #shutdown} unless the {@linkplain
62     * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
63     * whether to continue after shutdown} is set true.
64     *
65 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
66 dl 1.8 * of the inherited tuning methods are not useful for it. In
67 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
68     * {@code corePoolSize} threads and an unbounded queue, adjustments
69     * to {@code maximumPoolSize} have no useful effect. Additionally, it
70     * is almost never a good idea to set {@code corePoolSize} to zero or
71     * use {@code allowCoreThreadTimeOut} because this may leave the pool
72     * without threads to handle tasks once they become eligible to run.
73 dl 1.1 *
74 jsr166 1.103 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
75     * this class uses {@link Executors#defaultThreadFactory} as the
76     * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
77     * as the default rejected execution handler.
78     *
79 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
80 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
81 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
82     * methods to generate internal {@link ScheduledFuture} objects to
83     * control per-task delays and scheduling. To preserve
84     * functionality, any further overrides of these methods in
85 dl 1.32 * subclasses must invoke superclass versions, which effectively
86 jsr166 1.39 * disables additional task customization. However, this class
87 dl 1.32 * provides alternative protected extension method
88 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
89     * {@code Callable}) that can be used to customize the concrete task
90     * types used to execute commands entered via {@code execute},
91     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
92     * and {@code scheduleWithFixedDelay}. By default, a
93     * {@code ScheduledThreadPoolExecutor} uses a task type extending
94 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
95     * subclasses of the form:
96     *
97 jsr166 1.85 * <pre> {@code
98 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
99     *
100 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
101 dl 1.23 *
102 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
103     * Runnable r, RunnableScheduledFuture<V> task) {
104     * return new CustomTask<V>(r, task);
105 jsr166 1.29 * }
106 dl 1.23 *
107 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
108     * Callable<V> c, RunnableScheduledFuture<V> task) {
109     * return new CustomTask<V>(c, task);
110 jsr166 1.29 * }
111     * // ... add constructors, etc.
112 jsr166 1.39 * }}</pre>
113     *
114 dl 1.1 * @since 1.5
115     * @author Doug Lea
116     */
117 jsr166 1.21 public class ScheduledThreadPoolExecutor
118     extends ThreadPoolExecutor
119 tim 1.3 implements ScheduledExecutorService {
120 dl 1.1
121 dl 1.37 /*
122     * This class specializes ThreadPoolExecutor implementation by
123     *
124 jsr166 1.99 * 1. Using a custom task type ScheduledFutureTask, even for tasks
125     * that don't require scheduling because they are submitted
126     * using ExecutorService rather than ScheduledExecutorService
127     * methods, which are treated as tasks with a delay of zero.
128 dl 1.37 *
129 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
130 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
131     * the fact that corePoolSize and maximumPoolSize are
132     * effectively identical simplifies some execution mechanics
133 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
134 dl 1.37 *
135     * 3. Supporting optional run-after-shutdown parameters, which
136     * leads to overrides of shutdown methods to remove and cancel
137     * tasks that should NOT be run after shutdown, as well as
138     * different recheck logic when task (re)submission overlaps
139     * with a shutdown.
140     *
141     * 4. Task decoration methods to allow interception and
142     * instrumentation, which are needed because subclasses cannot
143     * otherwise override submit methods to get this effect. These
144     * don't have any impact on pool control logic though.
145     */
146    
147 dl 1.1 /**
148     * False if should cancel/suppress periodic tasks on shutdown.
149     */
150     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
151    
152     /**
153     * False if should cancel non-periodic tasks on shutdown.
154     */
155     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
156    
157     /**
158 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
159 dl 1.41 */
160 jsr166 1.90 volatile boolean removeOnCancel;
161 dl 1.41
162     /**
163 dl 1.1 * Sequence number to break scheduling ties, and in turn to
164     * guarantee FIFO order among tied entries.
165     */
166 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
167 dl 1.14
168 jsr166 1.21 private class ScheduledFutureTask<V>
169 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
170 jsr166 1.21
171 dl 1.1 /** Sequence number to break ties FIFO */
172     private final long sequenceNumber;
173 jsr166 1.44
174 jsr166 1.99 /** The nanoTime-based time when the task is enabled to execute. */
175 dl 1.88 private volatile long time;
176 jsr166 1.44
177 dl 1.16 /**
178 jsr166 1.99 * Period for repeating tasks, in nanoseconds.
179 jsr166 1.75 * A positive value indicates fixed-rate execution.
180     * A negative value indicates fixed-delay execution.
181     * A value of 0 indicates a non-repeating (one-shot) task.
182 dl 1.16 */
183 dl 1.1 private final long period;
184    
185 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
186     RunnableScheduledFuture<V> outerTask = this;
187 jsr166 1.44
188 dl 1.1 /**
189 dl 1.40 * Index into delay queue, to support faster cancellation.
190     */
191     int heapIndex;
192    
193     /**
194 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
195 dl 1.1 */
196 jsr166 1.89 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197     long sequenceNumber) {
198 dl 1.1 super(r, result);
199 jsr166 1.75 this.time = triggerTime;
200 dl 1.1 this.period = 0;
201 jsr166 1.89 this.sequenceNumber = sequenceNumber;
202 dl 1.1 }
203    
204     /**
205 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
206     * trigger time and period.
207 dl 1.1 */
208 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
209 jsr166 1.89 long period, long sequenceNumber) {
210 dl 1.1 super(r, result);
211 jsr166 1.75 this.time = triggerTime;
212 dl 1.1 this.period = period;
213 jsr166 1.89 this.sequenceNumber = sequenceNumber;
214 dl 1.1 }
215    
216     /**
217 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
218 dl 1.1 */
219 jsr166 1.89 ScheduledFutureTask(Callable<V> callable, long triggerTime,
220     long sequenceNumber) {
221 dl 1.1 super(callable);
222 jsr166 1.75 this.time = triggerTime;
223 dl 1.1 this.period = 0;
224 jsr166 1.89 this.sequenceNumber = sequenceNumber;
225 dl 1.1 }
226    
227     public long getDelay(TimeUnit unit) {
228 jsr166 1.98 return unit.convert(time - System.nanoTime(), NANOSECONDS);
229 dl 1.1 }
230    
231 dl 1.20 public int compareTo(Delayed other) {
232 dl 1.59 if (other == this) // compare zero if same object
233 dl 1.1 return 0;
234 dl 1.34 if (other instanceof ScheduledFutureTask) {
235     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
236     long diff = time - x.time;
237     if (diff < 0)
238     return -1;
239     else if (diff > 0)
240     return 1;
241     else if (sequenceNumber < x.sequenceNumber)
242     return -1;
243     else
244     return 1;
245     }
246 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
247 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
248 dl 1.1 }
249    
250     /**
251 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
252 jsr166 1.30 *
253 jsr166 1.70 * @return {@code true} if periodic
254 dl 1.1 */
255 dl 1.23 public boolean isPeriodic() {
256 dl 1.16 return period != 0;
257 dl 1.1 }
258    
259     /**
260 jsr166 1.39 * Sets the next time to run for a periodic task.
261 dl 1.13 */
262 dl 1.37 private void setNextRunTime() {
263     long p = period;
264     if (p > 0)
265     time += p;
266     else
267 jsr166 1.54 time = triggerTime(-p);
268 dl 1.13 }
269    
270 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
271 jsr166 1.100 // The racy read of heapIndex below is benign:
272     // if heapIndex < 0, then OOTA guarantees that we have surely
273     // been removed; else we recheck under lock in remove()
274 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
275     if (cancelled && removeOnCancel && heapIndex >= 0)
276 jsr166 1.42 remove(this);
277 dl 1.41 return cancelled;
278 dl 1.40 }
279    
280 dl 1.13 /**
281 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
282 jsr166 1.21 */
283 dl 1.1 public void run() {
284 jsr166 1.107 if (!canRunInCurrentRunState(this))
285 dl 1.37 cancel(false);
286 jsr166 1.107 else if (!isPeriodic())
287 jsr166 1.91 super.run();
288     else if (super.runAndReset()) {
289 dl 1.37 setNextRunTime();
290 jsr166 1.44 reExecutePeriodic(outerTask);
291 dl 1.37 }
292 dl 1.1 }
293     }
294    
295     /**
296 jsr166 1.107 * Returns true if can run a task given current run state and
297     * run-after-shutdown parameters.
298 dl 1.37 */
299 jsr166 1.107 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
300     if (!isShutdown())
301     return true;
302     if (isStopped())
303     return false;
304     return task.isPeriodic()
305     ? continueExistingPeriodicTasksAfterShutdown
306     : (executeExistingDelayedTasksAfterShutdown
307     || task.getDelay(NANOSECONDS) <= 0);
308 dl 1.37 }
309    
310     /**
311     * Main execution method for delayed or periodic tasks. If pool
312     * is shut down, rejects the task. Otherwise adds task to queue
313     * and starts a thread, if necessary, to run it. (We cannot
314     * prestart the thread to run the task because the task (probably)
315 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
316 dl 1.37 * is being added, cancel and remove it if required by state and
317 jsr166 1.39 * run-after-shutdown parameters.
318     *
319 dl 1.37 * @param task the task
320     */
321     private void delayedExecute(RunnableScheduledFuture<?> task) {
322     if (isShutdown())
323     reject(task);
324     else {
325     super.getQueue().add(task);
326 jsr166 1.107 if (!canRunInCurrentRunState(task) && remove(task))
327 dl 1.37 task.cancel(false);
328 jsr166 1.48 else
329 dl 1.63 ensurePrestart();
330 dl 1.37 }
331     }
332 jsr166 1.21
333 dl 1.37 /**
334 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
335     * Same idea as delayedExecute except drops task rather than rejecting.
336     *
337 dl 1.37 * @param task the task
338     */
339     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
340 jsr166 1.107 if (canRunInCurrentRunState(task)) {
341 dl 1.37 super.getQueue().add(task);
342 jsr166 1.107 if (canRunInCurrentRunState(task) || !remove(task)) {
343 dl 1.63 ensurePrestart();
344 jsr166 1.107 return;
345     }
346 dl 1.37 }
347 jsr166 1.107 task.cancel(false);
348 dl 1.13 }
349 dl 1.1
350 dl 1.13 /**
351 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
352 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
353 dl 1.13 */
354 dl 1.37 @Override void onShutdown() {
355     BlockingQueue<Runnable> q = super.getQueue();
356     boolean keepDelayed =
357     getExecuteExistingDelayedTasksAfterShutdownPolicy();
358     boolean keepPeriodic =
359     getContinueExistingPeriodicTasksAfterShutdownPolicy();
360 jsr166 1.107 // Traverse snapshot to avoid iterator exceptions
361     // TODO: implement and use efficient removeIf
362     // super.getQueue().removeIf(...);
363     for (Object e : q.toArray()) {
364     if (e instanceof RunnableScheduledFuture) {
365     RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
366     if ((t.isPeriodic()
367     ? !keepPeriodic
368     : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
369     || t.isCancelled()) { // also remove if already cancelled
370     if (q.remove(t))
371     t.cancel(false);
372 dl 1.13 }
373     }
374 dl 1.1 }
375 jsr166 1.48 tryTerminate();
376 dl 1.1 }
377    
378 dl 1.23 /**
379 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
380 jsr166 1.28 * This method can be used to override the concrete
381 dl 1.23 * class used for managing internal tasks.
382 jsr166 1.30 * The default implementation simply returns the given task.
383 jsr166 1.28 *
384 dl 1.23 * @param runnable the submitted Runnable
385     * @param task the task created to execute the runnable
386 jsr166 1.72 * @param <V> the type of the task's result
387 dl 1.23 * @return a task that can execute the runnable
388     * @since 1.6
389     */
390 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
391 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
392     return task;
393 peierls 1.22 }
394    
395 dl 1.23 /**
396 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
397 jsr166 1.28 * This method can be used to override the concrete
398 dl 1.23 * class used for managing internal tasks.
399 jsr166 1.30 * The default implementation simply returns the given task.
400 jsr166 1.28 *
401 dl 1.23 * @param callable the submitted Callable
402     * @param task the task created to execute the callable
403 jsr166 1.72 * @param <V> the type of the task's result
404 dl 1.23 * @return a task that can execute the callable
405     * @since 1.6
406     */
407 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
408 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
409     return task;
410 dl 1.19 }
411    
412 dl 1.1 /**
413 jsr166 1.78 * The default keep-alive time for pool threads.
414     *
415     * Normally, this value is unused because all pool threads will be
416     * core threads, but if a user creates a pool with a corePoolSize
417     * of zero (against our advice), we keep a thread alive as long as
418     * there are queued tasks. If the keep alive time is zero (the
419     * historic value), we end up hot-spinning in getTask, wasting a
420     * CPU. But on the other hand, if we set the value too high, and
421     * users create a one-shot pool which they don't cleanly shutdown,
422     * the pool's non-daemon threads will prevent JVM termination. A
423     * small but non-zero value (relative to a JVM's lifetime) seems
424     * best.
425     */
426     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
427    
428     /**
429 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
430     * given core pool size.
431 jsr166 1.21 *
432 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
433     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
434     * @throws IllegalArgumentException if {@code corePoolSize < 0}
435 dl 1.1 */
436     public ScheduledThreadPoolExecutor(int corePoolSize) {
437 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
438     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
439 dl 1.1 new DelayedWorkQueue());
440     }
441    
442     /**
443 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
444     * given initial parameters.
445 jsr166 1.21 *
446 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
447     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
448 dl 1.1 * @param threadFactory the factory to use when the executor
449 jsr166 1.39 * creates a new thread
450     * @throws IllegalArgumentException if {@code corePoolSize < 0}
451     * @throws NullPointerException if {@code threadFactory} is null
452 dl 1.1 */
453     public ScheduledThreadPoolExecutor(int corePoolSize,
454 jsr166 1.56 ThreadFactory threadFactory) {
455 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
456     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
457 dl 1.1 new DelayedWorkQueue(), threadFactory);
458     }
459    
460     /**
461 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
462     * given initial parameters.
463 jsr166 1.21 *
464 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
465     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
466 dl 1.1 * @param handler the handler to use when execution is blocked
467 jsr166 1.39 * because the thread bounds and queue capacities are reached
468     * @throws IllegalArgumentException if {@code corePoolSize < 0}
469     * @throws NullPointerException if {@code handler} is null
470 dl 1.1 */
471     public ScheduledThreadPoolExecutor(int corePoolSize,
472 jsr166 1.56 RejectedExecutionHandler handler) {
473 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
474     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
475 dl 1.1 new DelayedWorkQueue(), handler);
476     }
477    
478     /**
479 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
480     * given initial parameters.
481 jsr166 1.21 *
482 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
483     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
484 dl 1.1 * @param threadFactory the factory to use when the executor
485 jsr166 1.39 * creates a new thread
486 dl 1.1 * @param handler the handler to use when execution is blocked
487 jsr166 1.39 * because the thread bounds and queue capacities are reached
488     * @throws IllegalArgumentException if {@code corePoolSize < 0}
489     * @throws NullPointerException if {@code threadFactory} or
490     * {@code handler} is null
491 dl 1.1 */
492     public ScheduledThreadPoolExecutor(int corePoolSize,
493 jsr166 1.56 ThreadFactory threadFactory,
494     RejectedExecutionHandler handler) {
495 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
496     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
497 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
498     }
499    
500 dl 1.37 /**
501 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
502 jsr166 1.54 */
503     private long triggerTime(long delay, TimeUnit unit) {
504     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
505     }
506    
507     /**
508 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
509 dl 1.50 */
510 jsr166 1.54 long triggerTime(long delay) {
511 jsr166 1.98 return System.nanoTime() +
512 jsr166 1.54 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
513     }
514    
515     /**
516     * Constrains the values of all delays in the queue to be within
517     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
518     * This may occur if a task is eligible to be dequeued, but has
519     * not yet been, while some other task is added with a delay of
520     * Long.MAX_VALUE.
521     */
522     private long overflowFree(long delay) {
523     Delayed head = (Delayed) super.getQueue().peek();
524     if (head != null) {
525 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
526 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
527     delay = Long.MAX_VALUE + headDelay;
528     }
529     return delay;
530 dl 1.50 }
531    
532     /**
533 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
534     * @throws NullPointerException {@inheritDoc}
535     */
536 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
537     long delay,
538 dl 1.13 TimeUnit unit) {
539 dl 1.9 if (command == null || unit == null)
540 dl 1.1 throw new NullPointerException();
541 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
542 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
543 jsr166 1.89 triggerTime(delay, unit),
544     sequencer.getAndIncrement()));
545 dl 1.1 delayedExecute(t);
546     return t;
547     }
548 jsr166 1.52
549 dl 1.37 /**
550     * @throws RejectedExecutionException {@inheritDoc}
551     * @throws NullPointerException {@inheritDoc}
552     */
553 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
554     long delay,
555 dl 1.13 TimeUnit unit) {
556 dl 1.9 if (callable == null || unit == null)
557 dl 1.1 throw new NullPointerException();
558 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
559 jsr166 1.54 new ScheduledFutureTask<V>(callable,
560 jsr166 1.89 triggerTime(delay, unit),
561     sequencer.getAndIncrement()));
562 dl 1.1 delayedExecute(t);
563     return t;
564     }
565    
566 dl 1.37 /**
567     * @throws RejectedExecutionException {@inheritDoc}
568     * @throws NullPointerException {@inheritDoc}
569     * @throws IllegalArgumentException {@inheritDoc}
570     */
571 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
572     long initialDelay,
573     long period,
574 dl 1.13 TimeUnit unit) {
575 dl 1.9 if (command == null || unit == null)
576 dl 1.1 throw new NullPointerException();
577 jsr166 1.96 if (period <= 0L)
578 dl 1.1 throw new IllegalArgumentException();
579 jsr166 1.48 ScheduledFutureTask<Void> sft =
580     new ScheduledFutureTask<Void>(command,
581     null,
582 jsr166 1.54 triggerTime(initialDelay, unit),
583 jsr166 1.89 unit.toNanos(period),
584     sequencer.getAndIncrement());
585 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
586 jsr166 1.48 sft.outerTask = t;
587 dl 1.1 delayedExecute(t);
588     return t;
589     }
590 jsr166 1.21
591 dl 1.37 /**
592     * @throws RejectedExecutionException {@inheritDoc}
593     * @throws NullPointerException {@inheritDoc}
594     * @throws IllegalArgumentException {@inheritDoc}
595     */
596 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
597     long initialDelay,
598     long delay,
599 dl 1.13 TimeUnit unit) {
600 dl 1.9 if (command == null || unit == null)
601 dl 1.1 throw new NullPointerException();
602 jsr166 1.96 if (delay <= 0L)
603 dl 1.1 throw new IllegalArgumentException();
604 jsr166 1.48 ScheduledFutureTask<Void> sft =
605     new ScheduledFutureTask<Void>(command,
606     null,
607 jsr166 1.54 triggerTime(initialDelay, unit),
608 jsr166 1.97 -unit.toNanos(delay),
609 jsr166 1.89 sequencer.getAndIncrement());
610 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
611 jsr166 1.48 sft.outerTask = t;
612 dl 1.1 delayedExecute(t);
613     return t;
614     }
615 jsr166 1.21
616 dl 1.1 /**
617 jsr166 1.39 * Executes {@code command} with zero required delay.
618     * This has effect equivalent to
619     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
620     * Note that inspections of the queue and of the list returned by
621     * {@code shutdownNow} will access the zero-delayed
622     * {@link ScheduledFuture}, not the {@code command} itself.
623     *
624     * <p>A consequence of the use of {@code ScheduledFuture} objects is
625     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
626     * called with a null second {@code Throwable} argument, even if the
627     * {@code command} terminated abruptly. Instead, the {@code Throwable}
628     * thrown by such a task can be obtained via {@link Future#get}.
629 dl 1.1 *
630     * @throws RejectedExecutionException at discretion of
631 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
632     * cannot be accepted for execution because the
633     * executor has been shut down
634     * @throws NullPointerException {@inheritDoc}
635 dl 1.1 */
636     public void execute(Runnable command) {
637 jsr166 1.62 schedule(command, 0, NANOSECONDS);
638 dl 1.1 }
639    
640 dl 1.13 // Override AbstractExecutorService methods
641    
642 dl 1.37 /**
643     * @throws RejectedExecutionException {@inheritDoc}
644     * @throws NullPointerException {@inheritDoc}
645     */
646 dl 1.7 public Future<?> submit(Runnable task) {
647 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
648 dl 1.7 }
649    
650 dl 1.37 /**
651     * @throws RejectedExecutionException {@inheritDoc}
652     * @throws NullPointerException {@inheritDoc}
653     */
654 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
655 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
656 dl 1.7 }
657    
658 dl 1.37 /**
659     * @throws RejectedExecutionException {@inheritDoc}
660     * @throws NullPointerException {@inheritDoc}
661     */
662 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
663 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
664 dl 1.7 }
665 dl 1.1
666     /**
667 dl 1.37 * Sets the policy on whether to continue executing existing
668 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
669     * This value is by default {@code false}.
670 jsr166 1.30 *
671 jsr166 1.108 * <p>If the policy is set to {@code true}, periodic tasks will
672     * continue executing until one of the following happens:
673     *
674     * <ul>
675     * <li>{@link #shutdownNow} is called
676     * <li>the policy is set to {@code false} when already shutdown
677     * <li>the task is {@linkplain Future#cancel cancelled}
678     * <li>an execution of the task terminates with an exception
679     * </ul>
680     *
681 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
682 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
683 dl 1.1 */
684     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
685     continueExistingPeriodicTasksAfterShutdown = value;
686 jsr166 1.39 if (!value && isShutdown())
687 dl 1.37 onShutdown();
688 dl 1.1 }
689    
690     /**
691 jsr166 1.21 * Gets the policy on whether to continue executing existing
692 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
693     * This value is by default {@code false}.
694 jsr166 1.30 *
695 jsr166 1.108 * <p>If the policy is set to {@code true}, periodic tasks will
696     * continue executing until one of the following happens:
697     *
698     * <ul>
699     * <li>{@link #shutdownNow} is called
700     * <li>the policy is set to {@code false} when already shutdown
701     * <li>the task is {@linkplain Future#cancel cancelled}
702     * <li>an execution of the task terminates with an exception
703     * </ul>
704     *
705 jsr166 1.39 * @return {@code true} if will continue after shutdown
706 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
707 dl 1.1 */
708     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
709     return continueExistingPeriodicTasksAfterShutdown;
710     }
711    
712     /**
713 jsr166 1.21 * Sets the policy on whether to execute existing delayed
714 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
715     * In this case, these tasks will only terminate upon
716     * {@code shutdownNow}, or after setting the policy to
717     * {@code false} when already shutdown.
718     * This value is by default {@code true}.
719 jsr166 1.30 *
720 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
721 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
722 dl 1.1 */
723     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
724     executeExistingDelayedTasksAfterShutdown = value;
725 jsr166 1.39 if (!value && isShutdown())
726 dl 1.37 onShutdown();
727 dl 1.1 }
728    
729     /**
730 jsr166 1.21 * Gets the policy on whether to execute existing delayed
731 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
732     * In this case, these tasks will only terminate upon
733     * {@code shutdownNow}, or after setting the policy to
734     * {@code false} when already shutdown.
735     * This value is by default {@code true}.
736 jsr166 1.30 *
737 jsr166 1.39 * @return {@code true} if will execute after shutdown
738 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
739 dl 1.1 */
740     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
741     return executeExistingDelayedTasksAfterShutdown;
742     }
743    
744     /**
745 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
746     * removed from the work queue at time of cancellation. This value is
747     * by default {@code false}.
748 dl 1.41 *
749 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
750 dl 1.41 * @see #getRemoveOnCancelPolicy
751 jsr166 1.43 * @since 1.7
752 dl 1.41 */
753     public void setRemoveOnCancelPolicy(boolean value) {
754     removeOnCancel = value;
755     }
756    
757     /**
758 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
759     * removed from the work queue at time of cancellation. This value is
760     * by default {@code false}.
761 dl 1.41 *
762 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
763     * from the queue
764 dl 1.41 * @see #setRemoveOnCancelPolicy
765 jsr166 1.43 * @since 1.7
766 dl 1.41 */
767     public boolean getRemoveOnCancelPolicy() {
768     return removeOnCancel;
769     }
770    
771     /**
772 dl 1.1 * Initiates an orderly shutdown in which previously submitted
773 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
774     * Invocation has no additional effect if already shut down.
775     *
776     * <p>This method does not wait for previously submitted tasks to
777     * complete execution. Use {@link #awaitTermination awaitTermination}
778     * to do that.
779     *
780     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
781     * has been set {@code false}, existing delayed tasks whose delays
782     * have not yet elapsed are cancelled. And unless the {@code
783     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
784     * {@code true}, future executions of existing periodic tasks will
785     * be cancelled.
786 jsr166 1.39 *
787     * @throws SecurityException {@inheritDoc}
788 dl 1.1 */
789     public void shutdown() {
790     super.shutdown();
791     }
792    
793     /**
794     * Attempts to stop all actively executing tasks, halts the
795 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
796 jsr166 1.92 * that were awaiting execution. These tasks are drained (removed)
797     * from the task queue upon return from this method.
798 jsr166 1.21 *
799 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
800     * terminate. Use {@link #awaitTermination awaitTermination} to
801     * do that.
802     *
803 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
804 dl 1.18 * processing actively executing tasks. This implementation
805 jsr166 1.93 * interrupts tasks via {@link Thread#interrupt}; any task that
806 jsr166 1.31 * fails to respond to interrupts may never terminate.
807 dl 1.1 *
808 jsr166 1.39 * @return list of tasks that never commenced execution.
809 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
810     * For tasks submitted via one of the {@code schedule}
811     * methods, the element will be identical to the returned
812     * {@code ScheduledFuture}. For tasks submitted using
813 jsr166 1.84 * {@link #execute execute}, the element will be a
814     * zero-delay {@code ScheduledFuture}.
815 jsr166 1.31 * @throws SecurityException {@inheritDoc}
816 dl 1.1 */
817 tim 1.4 public List<Runnable> shutdownNow() {
818 dl 1.1 return super.shutdownNow();
819     }
820    
821     /**
822 jsr166 1.95 * Returns the task queue used by this executor. Access to the
823     * task queue is intended primarily for debugging and monitoring.
824     * This queue may be in active use. Retrieving the task queue
825     * does not prevent queued tasks from executing.
826     *
827     * <p>Each element of this queue is a {@link ScheduledFuture}.
828 jsr166 1.77 * For tasks submitted via one of the {@code schedule} methods, the
829     * element will be identical to the returned {@code ScheduledFuture}.
830 jsr166 1.84 * For tasks submitted using {@link #execute execute}, the element
831     * will be a zero-delay {@code ScheduledFuture}.
832 jsr166 1.77 *
833     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
834     * tasks in the order in which they will execute.
835 dl 1.1 *
836     * @return the task queue
837     */
838     public BlockingQueue<Runnable> getQueue() {
839     return super.getQueue();
840     }
841    
842 dl 1.13 /**
843 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
844     * class must be declared as a BlockingQueue<Runnable> even though
845 jsr166 1.42 * it can only hold RunnableScheduledFutures.
846 jsr166 1.21 */
847 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
848 dl 1.13 implements BlockingQueue<Runnable> {
849 jsr166 1.21
850 dl 1.40 /*
851     * A DelayedWorkQueue is based on a heap-based data structure
852     * like those in DelayQueue and PriorityQueue, except that
853     * every ScheduledFutureTask also records its index into the
854     * heap array. This eliminates the need to find a task upon
855     * cancellation, greatly speeding up removal (down from O(n)
856     * to O(log n)), and reducing garbage retention that would
857     * otherwise occur by waiting for the element to rise to top
858     * before clearing. But because the queue may also hold
859     * RunnableScheduledFutures that are not ScheduledFutureTasks,
860     * we are not guaranteed to have such indices available, in
861     * which case we fall back to linear search. (We expect that
862     * most tasks will not be decorated, and that the faster cases
863     * will be much more common.)
864     *
865     * All heap operations must record index changes -- mainly
866     * within siftUp and siftDown. Upon removal, a task's
867     * heapIndex is set to -1. Note that ScheduledFutureTasks can
868     * appear at most once in the queue (this need not be true for
869     * other kinds of tasks or work queues), so are uniquely
870     * identified by heapIndex.
871     */
872    
873 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
874 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
875     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
876 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
877 jsr166 1.80 private int size;
878 dl 1.40
879 jsr166 1.48 /**
880     * Thread designated to wait for the task at the head of the
881     * queue. This variant of the Leader-Follower pattern
882     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
883     * minimize unnecessary timed waiting. When a thread becomes
884     * the leader, it waits only for the next delay to elapse, but
885     * other threads await indefinitely. The leader thread must
886     * signal some other thread before returning from take() or
887     * poll(...), unless some other thread becomes leader in the
888     * interim. Whenever the head of the queue is replaced with a
889     * task with an earlier expiration time, the leader field is
890     * invalidated by being reset to null, and some waiting
891     * thread, but not necessarily the current leader, is
892     * signalled. So waiting threads must be prepared to acquire
893     * and lose leadership while waiting.
894     */
895 jsr166 1.80 private Thread leader;
896 jsr166 1.48
897     /**
898     * Condition signalled when a newer task becomes available at the
899     * head of the queue or a new thread may need to become leader.
900     */
901     private final Condition available = lock.newCondition();
902 dl 1.40
903     /**
904 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
905 dl 1.40 */
906 jsr166 1.102 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
907 dl 1.40 if (f instanceof ScheduledFutureTask)
908     ((ScheduledFutureTask)f).heapIndex = idx;
909     }
910    
911     /**
912 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
913 dl 1.40 * Call only when holding lock.
914     */
915 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
916 dl 1.40 while (k > 0) {
917     int parent = (k - 1) >>> 1;
918 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
919 dl 1.40 if (key.compareTo(e) >= 0)
920     break;
921     queue[k] = e;
922     setIndex(e, k);
923     k = parent;
924     }
925     queue[k] = key;
926     setIndex(key, k);
927     }
928    
929     /**
930 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
931 dl 1.40 * Call only when holding lock.
932     */
933 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
934 jsr166 1.42 int half = size >>> 1;
935 dl 1.40 while (k < half) {
936 jsr166 1.42 int child = (k << 1) + 1;
937 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
938 dl 1.40 int right = child + 1;
939     if (right < size && c.compareTo(queue[right]) > 0)
940     c = queue[child = right];
941     if (key.compareTo(c) <= 0)
942     break;
943     queue[k] = c;
944     setIndex(c, k);
945     k = child;
946     }
947     queue[k] = key;
948     setIndex(key, k);
949     }
950    
951     /**
952 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
953 dl 1.40 */
954     private void grow() {
955     int oldCapacity = queue.length;
956     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
957     if (newCapacity < 0) // overflow
958     newCapacity = Integer.MAX_VALUE;
959     queue = Arrays.copyOf(queue, newCapacity);
960     }
961    
962     /**
963 jsr166 1.66 * Finds index of given object, or -1 if absent.
964 dl 1.40 */
965     private int indexOf(Object x) {
966     if (x != null) {
967 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
968     int i = ((ScheduledFutureTask) x).heapIndex;
969     // Sanity check; x could conceivably be a
970     // ScheduledFutureTask from some other pool.
971     if (i >= 0 && i < size && queue[i] == x)
972     return i;
973     } else {
974     for (int i = 0; i < size; i++)
975     if (x.equals(queue[i]))
976     return i;
977     }
978 dl 1.40 }
979     return -1;
980     }
981    
982 jsr166 1.48 public boolean contains(Object x) {
983     final ReentrantLock lock = this.lock;
984 jsr166 1.45 lock.lock();
985     try {
986 jsr166 1.48 return indexOf(x) != -1;
987 jsr166 1.45 } finally {
988     lock.unlock();
989     }
990 jsr166 1.48 }
991 jsr166 1.45
992 dl 1.40 public boolean remove(Object x) {
993     final ReentrantLock lock = this.lock;
994     lock.lock();
995     try {
996 jsr166 1.45 int i = indexOf(x);
997 jsr166 1.48 if (i < 0)
998     return false;
999 jsr166 1.45
1000 jsr166 1.48 setIndex(queue[i], -1);
1001     int s = --size;
1002 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
1003 jsr166 1.48 queue[s] = null;
1004     if (s != i) {
1005     siftDown(i, replacement);
1006     if (queue[i] == replacement)
1007     siftUp(i, replacement);
1008     }
1009     return true;
1010 dl 1.40 } finally {
1011     lock.unlock();
1012     }
1013     }
1014    
1015     public int size() {
1016     final ReentrantLock lock = this.lock;
1017     lock.lock();
1018     try {
1019 jsr166 1.45 return size;
1020 dl 1.40 } finally {
1021     lock.unlock();
1022     }
1023     }
1024    
1025 jsr166 1.42 public boolean isEmpty() {
1026     return size() == 0;
1027 dl 1.40 }
1028    
1029     public int remainingCapacity() {
1030     return Integer.MAX_VALUE;
1031     }
1032    
1033 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
1034 dl 1.40 final ReentrantLock lock = this.lock;
1035     lock.lock();
1036     try {
1037     return queue[0];
1038     } finally {
1039     lock.unlock();
1040     }
1041 dl 1.13 }
1042    
1043 dl 1.40 public boolean offer(Runnable x) {
1044     if (x == null)
1045     throw new NullPointerException();
1046 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1047 dl 1.40 final ReentrantLock lock = this.lock;
1048     lock.lock();
1049     try {
1050     int i = size;
1051     if (i >= queue.length)
1052     grow();
1053     size = i + 1;
1054     if (i == 0) {
1055     queue[0] = e;
1056     setIndex(e, 0);
1057 jsr166 1.45 } else {
1058 dl 1.40 siftUp(i, e);
1059     }
1060 jsr166 1.46 if (queue[0] == e) {
1061 jsr166 1.48 leader = null;
1062 jsr166 1.46 available.signal();
1063 jsr166 1.48 }
1064 dl 1.40 } finally {
1065     lock.unlock();
1066     }
1067     return true;
1068 jsr166 1.48 }
1069 dl 1.40
1070     public void put(Runnable e) {
1071     offer(e);
1072     }
1073    
1074     public boolean add(Runnable e) {
1075 jsr166 1.48 return offer(e);
1076     }
1077 dl 1.40
1078     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1079     return offer(e);
1080     }
1081 jsr166 1.42
1082 jsr166 1.46 /**
1083     * Performs common bookkeeping for poll and take: Replaces
1084 jsr166 1.47 * first element with last and sifts it down. Call only when
1085     * holding lock.
1086 jsr166 1.46 * @param f the task to remove and return
1087     */
1088 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1089 jsr166 1.46 int s = --size;
1090 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1091 jsr166 1.46 queue[s] = null;
1092     if (s != 0)
1093     siftDown(0, x);
1094     setIndex(f, -1);
1095     return f;
1096     }
1097    
1098 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1099 dl 1.40 final ReentrantLock lock = this.lock;
1100     lock.lock();
1101     try {
1102 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1103 jsr166 1.87 return (first == null || first.getDelay(NANOSECONDS) > 0)
1104     ? null
1105     : finishPoll(first);
1106 dl 1.40 } finally {
1107     lock.unlock();
1108     }
1109     }
1110    
1111 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1112 dl 1.40 final ReentrantLock lock = this.lock;
1113     lock.lockInterruptibly();
1114     try {
1115     for (;;) {
1116 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1117 jsr166 1.42 if (first == null)
1118 dl 1.40 available.await();
1119     else {
1120 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1121 jsr166 1.96 if (delay <= 0L)
1122 jsr166 1.48 return finishPoll(first);
1123 jsr166 1.71 first = null; // don't retain ref while waiting
1124     if (leader != null)
1125 jsr166 1.48 available.await();
1126     else {
1127     Thread thisThread = Thread.currentThread();
1128     leader = thisThread;
1129     try {
1130     available.awaitNanos(delay);
1131     } finally {
1132     if (leader == thisThread)
1133     leader = null;
1134     }
1135     }
1136 dl 1.40 }
1137     }
1138     } finally {
1139 jsr166 1.48 if (leader == null && queue[0] != null)
1140     available.signal();
1141 dl 1.40 lock.unlock();
1142     }
1143     }
1144    
1145 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1146 dl 1.40 throws InterruptedException {
1147     long nanos = unit.toNanos(timeout);
1148     final ReentrantLock lock = this.lock;
1149     lock.lockInterruptibly();
1150     try {
1151     for (;;) {
1152 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1153 dl 1.40 if (first == null) {
1154 jsr166 1.96 if (nanos <= 0L)
1155 dl 1.40 return null;
1156     else
1157     nanos = available.awaitNanos(nanos);
1158     } else {
1159 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1160 jsr166 1.96 if (delay <= 0L)
1161 dl 1.40 return finishPoll(first);
1162 jsr166 1.96 if (nanos <= 0L)
1163 jsr166 1.48 return null;
1164 jsr166 1.71 first = null; // don't retain ref while waiting
1165 jsr166 1.48 if (nanos < delay || leader != null)
1166     nanos = available.awaitNanos(nanos);
1167     else {
1168     Thread thisThread = Thread.currentThread();
1169     leader = thisThread;
1170     try {
1171     long timeLeft = available.awaitNanos(delay);
1172     nanos -= delay - timeLeft;
1173     } finally {
1174     if (leader == thisThread)
1175     leader = null;
1176     }
1177     }
1178     }
1179     }
1180 dl 1.40 } finally {
1181 jsr166 1.48 if (leader == null && queue[0] != null)
1182     available.signal();
1183 dl 1.40 lock.unlock();
1184     }
1185     }
1186    
1187     public void clear() {
1188     final ReentrantLock lock = this.lock;
1189     lock.lock();
1190     try {
1191     for (int i = 0; i < size; i++) {
1192 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1193 dl 1.40 if (t != null) {
1194     queue[i] = null;
1195     setIndex(t, -1);
1196     }
1197     }
1198     size = 0;
1199     } finally {
1200     lock.unlock();
1201     }
1202 dl 1.13 }
1203 dl 1.40
1204     public int drainTo(Collection<? super Runnable> c) {
1205 jsr166 1.104 return drainTo(c, Integer.MAX_VALUE);
1206 dl 1.13 }
1207    
1208 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1209 jsr166 1.106 Objects.requireNonNull(c);
1210 dl 1.40 if (c == this)
1211     throw new IllegalArgumentException();
1212     if (maxElements <= 0)
1213     return 0;
1214     final ReentrantLock lock = this.lock;
1215     lock.lock();
1216     try {
1217     int n = 0;
1218 jsr166 1.106 for (RunnableScheduledFuture<?> first;
1219     n < maxElements
1220     && (first = queue[0]) != null
1221     && first.getDelay(NANOSECONDS) <= 0;) {
1222 jsr166 1.62 c.add(first); // In this order, in case add() throws.
1223     finishPoll(first);
1224 jsr166 1.48 ++n;
1225     }
1226 dl 1.40 return n;
1227     } finally {
1228     lock.unlock();
1229     }
1230     }
1231    
1232     public Object[] toArray() {
1233     final ReentrantLock lock = this.lock;
1234     lock.lock();
1235     try {
1236 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1237 dl 1.40 } finally {
1238     lock.unlock();
1239     }
1240     }
1241    
1242 jsr166 1.48 @SuppressWarnings("unchecked")
1243 dl 1.40 public <T> T[] toArray(T[] a) {
1244     final ReentrantLock lock = this.lock;
1245     lock.lock();
1246     try {
1247     if (a.length < size)
1248     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1249     System.arraycopy(queue, 0, a, 0, size);
1250     if (a.length > size)
1251     a[size] = null;
1252     return a;
1253     } finally {
1254     lock.unlock();
1255     }
1256 dl 1.13 }
1257    
1258 jsr166 1.21 public Iterator<Runnable> iterator() {
1259 jsr166 1.105 final ReentrantLock lock = this.lock;
1260     lock.lock();
1261     try {
1262     return new Itr(Arrays.copyOf(queue, size));
1263     } finally {
1264     lock.unlock();
1265     }
1266 dl 1.40 }
1267 jsr166 1.42
1268 dl 1.40 /**
1269     * Snapshot iterator that works off copy of underlying q array.
1270     */
1271     private class Itr implements Iterator<Runnable> {
1272 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1273 jsr166 1.86 int cursor; // index of next element to return; initially 0
1274     int lastRet = -1; // index of last element returned; -1 if no such
1275 jsr166 1.42
1276 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1277 dl 1.40 this.array = array;
1278     }
1279 jsr166 1.42
1280 dl 1.40 public boolean hasNext() {
1281     return cursor < array.length;
1282     }
1283 jsr166 1.42
1284 dl 1.40 public Runnable next() {
1285     if (cursor >= array.length)
1286     throw new NoSuchElementException();
1287 jsr166 1.101 return array[lastRet = cursor++];
1288 dl 1.40 }
1289 jsr166 1.42
1290 dl 1.40 public void remove() {
1291     if (lastRet < 0)
1292     throw new IllegalStateException();
1293     DelayedWorkQueue.this.remove(array[lastRet]);
1294     lastRet = -1;
1295     }
1296 dl 1.13 }
1297     }
1298 dl 1.1 }