ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.109
Committed: Wed Mar 29 15:03:09 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.108: +16 -12 lines
Log Message:
another round of clarifications to periodic task spec

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.83 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.83
12 jsr166 1.82 import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 jsr166 1.106 import java.util.Objects;
19 jsr166 1.83 import java.util.concurrent.atomic.AtomicLong;
20     import java.util.concurrent.locks.Condition;
21     import java.util.concurrent.locks.ReentrantLock;
22 dl 1.1
23     /**
24 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
25 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
26     * This class is preferable to {@link java.util.Timer} when multiple
27     * worker threads are needed, or when the additional flexibility or
28     * capabilities of {@link ThreadPoolExecutor} (which this class
29     * extends) are required.
30 dl 1.1 *
31 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
32 dl 1.18 * without any real-time guarantees about when, after they are
33     * enabled, they will commence. Tasks scheduled for exactly the same
34     * execution time are enabled in first-in-first-out (FIFO) order of
35 jsr166 1.46 * submission.
36     *
37     * <p>When a submitted task is cancelled before it is run, execution
38 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
39     * automatically removed from the work queue until its delay elapses.
40     * While this enables further inspection and monitoring, it may also
41     * cause unbounded retention of cancelled tasks. To avoid this, use
42     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43     * removed from the work queue at time of cancellation.
44 dl 1.1 *
45 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
46 jsr166 1.84 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48     * do not overlap. While different executions may be performed by
49     * different threads, the effects of prior executions
50     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 dl 1.51 * those of subsequent ones.
52     *
53 jsr166 1.109 * <p>Executions of a periodic task continue until one of the
54     * following happens, when further executions are suppressed and the
55     * task's future is completed:
56     * <ul>
57     * <li>{@link #shutdown} is called and the {@linkplain
58 jsr166 1.108 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
59 jsr166 1.109 * whether to continue after shutdown} is not set true)
60     * <li>{@link #shutdownNow} is called
61     * <li>the task's future is {@linkplain Future#cancel cancelled}
62     * <li>an execution of the task terminates abruptly with an exception;
63     * in this case calling {@link Future#get()} will throw an {@link
64     * ExecutionException} holding the exception as its cause. In all
65     * other cases {@code get()} will throw {@link CancellationException};
66     * a periodic task's future never completes normally.
67     * </ul>
68 jsr166 1.108 *
69 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
70 dl 1.8 * of the inherited tuning methods are not useful for it. In
71 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
72     * {@code corePoolSize} threads and an unbounded queue, adjustments
73     * to {@code maximumPoolSize} have no useful effect. Additionally, it
74     * is almost never a good idea to set {@code corePoolSize} to zero or
75     * use {@code allowCoreThreadTimeOut} because this may leave the pool
76     * without threads to handle tasks once they become eligible to run.
77 dl 1.1 *
78 jsr166 1.103 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
79     * this class uses {@link Executors#defaultThreadFactory} as the
80     * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
81     * as the default rejected execution handler.
82     *
83 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
84 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
85 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
86     * methods to generate internal {@link ScheduledFuture} objects to
87     * control per-task delays and scheduling. To preserve
88     * functionality, any further overrides of these methods in
89 dl 1.32 * subclasses must invoke superclass versions, which effectively
90 jsr166 1.39 * disables additional task customization. However, this class
91 dl 1.32 * provides alternative protected extension method
92 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
93     * {@code Callable}) that can be used to customize the concrete task
94     * types used to execute commands entered via {@code execute},
95     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
96     * and {@code scheduleWithFixedDelay}. By default, a
97     * {@code ScheduledThreadPoolExecutor} uses a task type extending
98 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
99     * subclasses of the form:
100     *
101 jsr166 1.85 * <pre> {@code
102 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
103     *
104 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
105 dl 1.23 *
106 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
107     * Runnable r, RunnableScheduledFuture<V> task) {
108     * return new CustomTask<V>(r, task);
109 jsr166 1.29 * }
110 dl 1.23 *
111 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
112     * Callable<V> c, RunnableScheduledFuture<V> task) {
113     * return new CustomTask<V>(c, task);
114 jsr166 1.29 * }
115     * // ... add constructors, etc.
116 jsr166 1.39 * }}</pre>
117     *
118 dl 1.1 * @since 1.5
119     * @author Doug Lea
120     */
121 jsr166 1.21 public class ScheduledThreadPoolExecutor
122     extends ThreadPoolExecutor
123 tim 1.3 implements ScheduledExecutorService {
124 dl 1.1
125 dl 1.37 /*
126     * This class specializes ThreadPoolExecutor implementation by
127     *
128 jsr166 1.99 * 1. Using a custom task type ScheduledFutureTask, even for tasks
129     * that don't require scheduling because they are submitted
130     * using ExecutorService rather than ScheduledExecutorService
131     * methods, which are treated as tasks with a delay of zero.
132 dl 1.37 *
133 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
134 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
135     * the fact that corePoolSize and maximumPoolSize are
136     * effectively identical simplifies some execution mechanics
137 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
138 dl 1.37 *
139     * 3. Supporting optional run-after-shutdown parameters, which
140     * leads to overrides of shutdown methods to remove and cancel
141     * tasks that should NOT be run after shutdown, as well as
142     * different recheck logic when task (re)submission overlaps
143     * with a shutdown.
144     *
145     * 4. Task decoration methods to allow interception and
146     * instrumentation, which are needed because subclasses cannot
147     * otherwise override submit methods to get this effect. These
148     * don't have any impact on pool control logic though.
149     */
150    
151 dl 1.1 /**
152     * False if should cancel/suppress periodic tasks on shutdown.
153     */
154     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
155    
156     /**
157     * False if should cancel non-periodic tasks on shutdown.
158     */
159     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
160    
161     /**
162 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
163 dl 1.41 */
164 jsr166 1.90 volatile boolean removeOnCancel;
165 dl 1.41
166     /**
167 dl 1.1 * Sequence number to break scheduling ties, and in turn to
168     * guarantee FIFO order among tied entries.
169     */
170 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
171 dl 1.14
172 jsr166 1.21 private class ScheduledFutureTask<V>
173 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
174 jsr166 1.21
175 dl 1.1 /** Sequence number to break ties FIFO */
176     private final long sequenceNumber;
177 jsr166 1.44
178 jsr166 1.99 /** The nanoTime-based time when the task is enabled to execute. */
179 dl 1.88 private volatile long time;
180 jsr166 1.44
181 dl 1.16 /**
182 jsr166 1.99 * Period for repeating tasks, in nanoseconds.
183 jsr166 1.75 * A positive value indicates fixed-rate execution.
184     * A negative value indicates fixed-delay execution.
185     * A value of 0 indicates a non-repeating (one-shot) task.
186 dl 1.16 */
187 dl 1.1 private final long period;
188    
189 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
190     RunnableScheduledFuture<V> outerTask = this;
191 jsr166 1.44
192 dl 1.1 /**
193 dl 1.40 * Index into delay queue, to support faster cancellation.
194     */
195     int heapIndex;
196    
197     /**
198 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
199 dl 1.1 */
200 jsr166 1.89 ScheduledFutureTask(Runnable r, V result, long triggerTime,
201     long sequenceNumber) {
202 dl 1.1 super(r, result);
203 jsr166 1.75 this.time = triggerTime;
204 dl 1.1 this.period = 0;
205 jsr166 1.89 this.sequenceNumber = sequenceNumber;
206 dl 1.1 }
207    
208     /**
209 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
210     * trigger time and period.
211 dl 1.1 */
212 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
213 jsr166 1.89 long period, long sequenceNumber) {
214 dl 1.1 super(r, result);
215 jsr166 1.75 this.time = triggerTime;
216 dl 1.1 this.period = period;
217 jsr166 1.89 this.sequenceNumber = sequenceNumber;
218 dl 1.1 }
219    
220     /**
221 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
222 dl 1.1 */
223 jsr166 1.89 ScheduledFutureTask(Callable<V> callable, long triggerTime,
224     long sequenceNumber) {
225 dl 1.1 super(callable);
226 jsr166 1.75 this.time = triggerTime;
227 dl 1.1 this.period = 0;
228 jsr166 1.89 this.sequenceNumber = sequenceNumber;
229 dl 1.1 }
230    
231     public long getDelay(TimeUnit unit) {
232 jsr166 1.98 return unit.convert(time - System.nanoTime(), NANOSECONDS);
233 dl 1.1 }
234    
235 dl 1.20 public int compareTo(Delayed other) {
236 dl 1.59 if (other == this) // compare zero if same object
237 dl 1.1 return 0;
238 dl 1.34 if (other instanceof ScheduledFutureTask) {
239     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
240     long diff = time - x.time;
241     if (diff < 0)
242     return -1;
243     else if (diff > 0)
244     return 1;
245     else if (sequenceNumber < x.sequenceNumber)
246     return -1;
247     else
248     return 1;
249     }
250 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
251 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
252 dl 1.1 }
253    
254     /**
255 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
256 jsr166 1.30 *
257 jsr166 1.70 * @return {@code true} if periodic
258 dl 1.1 */
259 dl 1.23 public boolean isPeriodic() {
260 dl 1.16 return period != 0;
261 dl 1.1 }
262    
263     /**
264 jsr166 1.39 * Sets the next time to run for a periodic task.
265 dl 1.13 */
266 dl 1.37 private void setNextRunTime() {
267     long p = period;
268     if (p > 0)
269     time += p;
270     else
271 jsr166 1.54 time = triggerTime(-p);
272 dl 1.13 }
273    
274 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
275 jsr166 1.100 // The racy read of heapIndex below is benign:
276     // if heapIndex < 0, then OOTA guarantees that we have surely
277     // been removed; else we recheck under lock in remove()
278 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
279     if (cancelled && removeOnCancel && heapIndex >= 0)
280 jsr166 1.42 remove(this);
281 dl 1.41 return cancelled;
282 dl 1.40 }
283    
284 dl 1.13 /**
285 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
286 jsr166 1.21 */
287 dl 1.1 public void run() {
288 jsr166 1.107 if (!canRunInCurrentRunState(this))
289 dl 1.37 cancel(false);
290 jsr166 1.107 else if (!isPeriodic())
291 jsr166 1.91 super.run();
292     else if (super.runAndReset()) {
293 dl 1.37 setNextRunTime();
294 jsr166 1.44 reExecutePeriodic(outerTask);
295 dl 1.37 }
296 dl 1.1 }
297     }
298    
299     /**
300 jsr166 1.107 * Returns true if can run a task given current run state and
301     * run-after-shutdown parameters.
302 dl 1.37 */
303 jsr166 1.107 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
304     if (!isShutdown())
305     return true;
306     if (isStopped())
307     return false;
308     return task.isPeriodic()
309     ? continueExistingPeriodicTasksAfterShutdown
310     : (executeExistingDelayedTasksAfterShutdown
311     || task.getDelay(NANOSECONDS) <= 0);
312 dl 1.37 }
313    
314     /**
315     * Main execution method for delayed or periodic tasks. If pool
316     * is shut down, rejects the task. Otherwise adds task to queue
317     * and starts a thread, if necessary, to run it. (We cannot
318     * prestart the thread to run the task because the task (probably)
319 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
320 dl 1.37 * is being added, cancel and remove it if required by state and
321 jsr166 1.39 * run-after-shutdown parameters.
322     *
323 dl 1.37 * @param task the task
324     */
325     private void delayedExecute(RunnableScheduledFuture<?> task) {
326     if (isShutdown())
327     reject(task);
328     else {
329     super.getQueue().add(task);
330 jsr166 1.107 if (!canRunInCurrentRunState(task) && remove(task))
331 dl 1.37 task.cancel(false);
332 jsr166 1.48 else
333 dl 1.63 ensurePrestart();
334 dl 1.37 }
335     }
336 jsr166 1.21
337 dl 1.37 /**
338 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
339     * Same idea as delayedExecute except drops task rather than rejecting.
340     *
341 dl 1.37 * @param task the task
342     */
343     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
344 jsr166 1.107 if (canRunInCurrentRunState(task)) {
345 dl 1.37 super.getQueue().add(task);
346 jsr166 1.107 if (canRunInCurrentRunState(task) || !remove(task)) {
347 dl 1.63 ensurePrestart();
348 jsr166 1.107 return;
349     }
350 dl 1.37 }
351 jsr166 1.107 task.cancel(false);
352 dl 1.13 }
353 dl 1.1
354 dl 1.13 /**
355 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
356 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
357 dl 1.13 */
358 dl 1.37 @Override void onShutdown() {
359     BlockingQueue<Runnable> q = super.getQueue();
360     boolean keepDelayed =
361     getExecuteExistingDelayedTasksAfterShutdownPolicy();
362     boolean keepPeriodic =
363     getContinueExistingPeriodicTasksAfterShutdownPolicy();
364 jsr166 1.107 // Traverse snapshot to avoid iterator exceptions
365     // TODO: implement and use efficient removeIf
366     // super.getQueue().removeIf(...);
367     for (Object e : q.toArray()) {
368     if (e instanceof RunnableScheduledFuture) {
369     RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
370     if ((t.isPeriodic()
371     ? !keepPeriodic
372     : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
373     || t.isCancelled()) { // also remove if already cancelled
374     if (q.remove(t))
375     t.cancel(false);
376 dl 1.13 }
377     }
378 dl 1.1 }
379 jsr166 1.48 tryTerminate();
380 dl 1.1 }
381    
382 dl 1.23 /**
383 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
384 jsr166 1.28 * This method can be used to override the concrete
385 dl 1.23 * class used for managing internal tasks.
386 jsr166 1.30 * The default implementation simply returns the given task.
387 jsr166 1.28 *
388 dl 1.23 * @param runnable the submitted Runnable
389     * @param task the task created to execute the runnable
390 jsr166 1.72 * @param <V> the type of the task's result
391 dl 1.23 * @return a task that can execute the runnable
392     * @since 1.6
393     */
394 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
395 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
396     return task;
397 peierls 1.22 }
398    
399 dl 1.23 /**
400 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
401 jsr166 1.28 * This method can be used to override the concrete
402 dl 1.23 * class used for managing internal tasks.
403 jsr166 1.30 * The default implementation simply returns the given task.
404 jsr166 1.28 *
405 dl 1.23 * @param callable the submitted Callable
406     * @param task the task created to execute the callable
407 jsr166 1.72 * @param <V> the type of the task's result
408 dl 1.23 * @return a task that can execute the callable
409     * @since 1.6
410     */
411 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
412 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
413     return task;
414 dl 1.19 }
415    
416 dl 1.1 /**
417 jsr166 1.78 * The default keep-alive time for pool threads.
418     *
419     * Normally, this value is unused because all pool threads will be
420     * core threads, but if a user creates a pool with a corePoolSize
421     * of zero (against our advice), we keep a thread alive as long as
422     * there are queued tasks. If the keep alive time is zero (the
423     * historic value), we end up hot-spinning in getTask, wasting a
424     * CPU. But on the other hand, if we set the value too high, and
425     * users create a one-shot pool which they don't cleanly shutdown,
426     * the pool's non-daemon threads will prevent JVM termination. A
427     * small but non-zero value (relative to a JVM's lifetime) seems
428     * best.
429     */
430     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
431    
432     /**
433 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
434     * given core pool size.
435 jsr166 1.21 *
436 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
437     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
438     * @throws IllegalArgumentException if {@code corePoolSize < 0}
439 dl 1.1 */
440     public ScheduledThreadPoolExecutor(int corePoolSize) {
441 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
442     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
443 dl 1.1 new DelayedWorkQueue());
444     }
445    
446     /**
447 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
448     * given initial parameters.
449 jsr166 1.21 *
450 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
451     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
452 dl 1.1 * @param threadFactory the factory to use when the executor
453 jsr166 1.39 * creates a new thread
454     * @throws IllegalArgumentException if {@code corePoolSize < 0}
455     * @throws NullPointerException if {@code threadFactory} is null
456 dl 1.1 */
457     public ScheduledThreadPoolExecutor(int corePoolSize,
458 jsr166 1.56 ThreadFactory threadFactory) {
459 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
460     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
461 dl 1.1 new DelayedWorkQueue(), threadFactory);
462     }
463    
464     /**
465 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
466     * given initial parameters.
467 jsr166 1.21 *
468 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
469     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
470 dl 1.1 * @param handler the handler to use when execution is blocked
471 jsr166 1.39 * because the thread bounds and queue capacities are reached
472     * @throws IllegalArgumentException if {@code corePoolSize < 0}
473     * @throws NullPointerException if {@code handler} is null
474 dl 1.1 */
475     public ScheduledThreadPoolExecutor(int corePoolSize,
476 jsr166 1.56 RejectedExecutionHandler handler) {
477 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
478     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
479 dl 1.1 new DelayedWorkQueue(), handler);
480     }
481    
482     /**
483 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
484     * given initial parameters.
485 jsr166 1.21 *
486 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
487     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
488 dl 1.1 * @param threadFactory the factory to use when the executor
489 jsr166 1.39 * creates a new thread
490 dl 1.1 * @param handler the handler to use when execution is blocked
491 jsr166 1.39 * because the thread bounds and queue capacities are reached
492     * @throws IllegalArgumentException if {@code corePoolSize < 0}
493     * @throws NullPointerException if {@code threadFactory} or
494     * {@code handler} is null
495 dl 1.1 */
496     public ScheduledThreadPoolExecutor(int corePoolSize,
497 jsr166 1.56 ThreadFactory threadFactory,
498     RejectedExecutionHandler handler) {
499 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
500     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
501 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
502     }
503    
504 dl 1.37 /**
505 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
506 jsr166 1.54 */
507     private long triggerTime(long delay, TimeUnit unit) {
508     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
509     }
510    
511     /**
512 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
513 dl 1.50 */
514 jsr166 1.54 long triggerTime(long delay) {
515 jsr166 1.98 return System.nanoTime() +
516 jsr166 1.54 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
517     }
518    
519     /**
520     * Constrains the values of all delays in the queue to be within
521     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
522     * This may occur if a task is eligible to be dequeued, but has
523     * not yet been, while some other task is added with a delay of
524     * Long.MAX_VALUE.
525     */
526     private long overflowFree(long delay) {
527     Delayed head = (Delayed) super.getQueue().peek();
528     if (head != null) {
529 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
530 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
531     delay = Long.MAX_VALUE + headDelay;
532     }
533     return delay;
534 dl 1.50 }
535    
536     /**
537 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
538     * @throws NullPointerException {@inheritDoc}
539     */
540 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
541     long delay,
542 dl 1.13 TimeUnit unit) {
543 dl 1.9 if (command == null || unit == null)
544 dl 1.1 throw new NullPointerException();
545 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
546 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
547 jsr166 1.89 triggerTime(delay, unit),
548     sequencer.getAndIncrement()));
549 dl 1.1 delayedExecute(t);
550     return t;
551     }
552 jsr166 1.52
553 dl 1.37 /**
554     * @throws RejectedExecutionException {@inheritDoc}
555     * @throws NullPointerException {@inheritDoc}
556     */
557 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
558     long delay,
559 dl 1.13 TimeUnit unit) {
560 dl 1.9 if (callable == null || unit == null)
561 dl 1.1 throw new NullPointerException();
562 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
563 jsr166 1.54 new ScheduledFutureTask<V>(callable,
564 jsr166 1.89 triggerTime(delay, unit),
565     sequencer.getAndIncrement()));
566 dl 1.1 delayedExecute(t);
567     return t;
568     }
569    
570 dl 1.37 /**
571     * @throws RejectedExecutionException {@inheritDoc}
572     * @throws NullPointerException {@inheritDoc}
573     * @throws IllegalArgumentException {@inheritDoc}
574     */
575 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
576     long initialDelay,
577     long period,
578 dl 1.13 TimeUnit unit) {
579 dl 1.9 if (command == null || unit == null)
580 dl 1.1 throw new NullPointerException();
581 jsr166 1.96 if (period <= 0L)
582 dl 1.1 throw new IllegalArgumentException();
583 jsr166 1.48 ScheduledFutureTask<Void> sft =
584     new ScheduledFutureTask<Void>(command,
585     null,
586 jsr166 1.54 triggerTime(initialDelay, unit),
587 jsr166 1.89 unit.toNanos(period),
588     sequencer.getAndIncrement());
589 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
590 jsr166 1.48 sft.outerTask = t;
591 dl 1.1 delayedExecute(t);
592     return t;
593     }
594 jsr166 1.21
595 dl 1.37 /**
596     * @throws RejectedExecutionException {@inheritDoc}
597     * @throws NullPointerException {@inheritDoc}
598     * @throws IllegalArgumentException {@inheritDoc}
599     */
600 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
601     long initialDelay,
602     long delay,
603 dl 1.13 TimeUnit unit) {
604 dl 1.9 if (command == null || unit == null)
605 dl 1.1 throw new NullPointerException();
606 jsr166 1.96 if (delay <= 0L)
607 dl 1.1 throw new IllegalArgumentException();
608 jsr166 1.48 ScheduledFutureTask<Void> sft =
609     new ScheduledFutureTask<Void>(command,
610     null,
611 jsr166 1.54 triggerTime(initialDelay, unit),
612 jsr166 1.97 -unit.toNanos(delay),
613 jsr166 1.89 sequencer.getAndIncrement());
614 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
615 jsr166 1.48 sft.outerTask = t;
616 dl 1.1 delayedExecute(t);
617     return t;
618     }
619 jsr166 1.21
620 dl 1.1 /**
621 jsr166 1.39 * Executes {@code command} with zero required delay.
622     * This has effect equivalent to
623     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
624     * Note that inspections of the queue and of the list returned by
625     * {@code shutdownNow} will access the zero-delayed
626     * {@link ScheduledFuture}, not the {@code command} itself.
627     *
628     * <p>A consequence of the use of {@code ScheduledFuture} objects is
629     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
630     * called with a null second {@code Throwable} argument, even if the
631     * {@code command} terminated abruptly. Instead, the {@code Throwable}
632     * thrown by such a task can be obtained via {@link Future#get}.
633 dl 1.1 *
634     * @throws RejectedExecutionException at discretion of
635 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
636     * cannot be accepted for execution because the
637     * executor has been shut down
638     * @throws NullPointerException {@inheritDoc}
639 dl 1.1 */
640     public void execute(Runnable command) {
641 jsr166 1.62 schedule(command, 0, NANOSECONDS);
642 dl 1.1 }
643    
644 dl 1.13 // Override AbstractExecutorService methods
645    
646 dl 1.37 /**
647     * @throws RejectedExecutionException {@inheritDoc}
648     * @throws NullPointerException {@inheritDoc}
649     */
650 dl 1.7 public Future<?> submit(Runnable task) {
651 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
652 dl 1.7 }
653    
654 dl 1.37 /**
655     * @throws RejectedExecutionException {@inheritDoc}
656     * @throws NullPointerException {@inheritDoc}
657     */
658 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
659 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
660 dl 1.7 }
661    
662 dl 1.37 /**
663     * @throws RejectedExecutionException {@inheritDoc}
664     * @throws NullPointerException {@inheritDoc}
665     */
666 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
667 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
668 dl 1.7 }
669 dl 1.1
670     /**
671 dl 1.37 * Sets the policy on whether to continue executing existing
672 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
673     * This value is by default {@code false}.
674 jsr166 1.30 *
675 jsr166 1.108 * <p>If the policy is set to {@code true}, periodic tasks will
676     * continue executing until one of the following happens:
677     *
678     * <ul>
679     * <li>{@link #shutdownNow} is called
680     * <li>the policy is set to {@code false} when already shutdown
681 jsr166 1.109 * <li>the task's future is {@linkplain Future#cancel cancelled}
682 jsr166 1.108 * <li>an execution of the task terminates with an exception
683     * </ul>
684     *
685 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
686 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
687 dl 1.1 */
688     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
689     continueExistingPeriodicTasksAfterShutdown = value;
690 jsr166 1.39 if (!value && isShutdown())
691 dl 1.37 onShutdown();
692 dl 1.1 }
693    
694     /**
695 jsr166 1.21 * Gets the policy on whether to continue executing existing
696 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
697     * This value is by default {@code false}.
698 jsr166 1.30 *
699 jsr166 1.108 * <p>If the policy is set to {@code true}, periodic tasks will
700     * continue executing until one of the following happens:
701     *
702     * <ul>
703     * <li>{@link #shutdownNow} is called
704     * <li>the policy is set to {@code false} when already shutdown
705 jsr166 1.109 * <li>the task's future is {@linkplain Future#cancel cancelled}
706 jsr166 1.108 * <li>an execution of the task terminates with an exception
707     * </ul>
708     *
709 jsr166 1.39 * @return {@code true} if will continue after shutdown
710 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
711 dl 1.1 */
712     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
713     return continueExistingPeriodicTasksAfterShutdown;
714     }
715    
716     /**
717 jsr166 1.21 * Sets the policy on whether to execute existing delayed
718 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
719     * In this case, these tasks will only terminate upon
720     * {@code shutdownNow}, or after setting the policy to
721     * {@code false} when already shutdown.
722     * This value is by default {@code true}.
723 jsr166 1.30 *
724 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
725 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
726 dl 1.1 */
727     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
728     executeExistingDelayedTasksAfterShutdown = value;
729 jsr166 1.39 if (!value && isShutdown())
730 dl 1.37 onShutdown();
731 dl 1.1 }
732    
733     /**
734 jsr166 1.21 * Gets the policy on whether to execute existing delayed
735 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
736     * In this case, these tasks will only terminate upon
737     * {@code shutdownNow}, or after setting the policy to
738     * {@code false} when already shutdown.
739     * This value is by default {@code true}.
740 jsr166 1.30 *
741 jsr166 1.39 * @return {@code true} if will execute after shutdown
742 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
743 dl 1.1 */
744     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
745     return executeExistingDelayedTasksAfterShutdown;
746     }
747    
748     /**
749 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
750     * removed from the work queue at time of cancellation. This value is
751     * by default {@code false}.
752 dl 1.41 *
753 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
754 dl 1.41 * @see #getRemoveOnCancelPolicy
755 jsr166 1.43 * @since 1.7
756 dl 1.41 */
757     public void setRemoveOnCancelPolicy(boolean value) {
758     removeOnCancel = value;
759     }
760    
761     /**
762 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
763     * removed from the work queue at time of cancellation. This value is
764     * by default {@code false}.
765 dl 1.41 *
766 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
767     * from the queue
768 dl 1.41 * @see #setRemoveOnCancelPolicy
769 jsr166 1.43 * @since 1.7
770 dl 1.41 */
771     public boolean getRemoveOnCancelPolicy() {
772     return removeOnCancel;
773     }
774    
775     /**
776 dl 1.1 * Initiates an orderly shutdown in which previously submitted
777 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
778     * Invocation has no additional effect if already shut down.
779     *
780     * <p>This method does not wait for previously submitted tasks to
781     * complete execution. Use {@link #awaitTermination awaitTermination}
782     * to do that.
783     *
784     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
785     * has been set {@code false}, existing delayed tasks whose delays
786     * have not yet elapsed are cancelled. And unless the {@code
787     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
788     * {@code true}, future executions of existing periodic tasks will
789     * be cancelled.
790 jsr166 1.39 *
791     * @throws SecurityException {@inheritDoc}
792 dl 1.1 */
793     public void shutdown() {
794     super.shutdown();
795     }
796    
797     /**
798     * Attempts to stop all actively executing tasks, halts the
799 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
800 jsr166 1.92 * that were awaiting execution. These tasks are drained (removed)
801     * from the task queue upon return from this method.
802 jsr166 1.21 *
803 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
804     * terminate. Use {@link #awaitTermination awaitTermination} to
805     * do that.
806     *
807 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
808 dl 1.18 * processing actively executing tasks. This implementation
809 jsr166 1.93 * interrupts tasks via {@link Thread#interrupt}; any task that
810 jsr166 1.31 * fails to respond to interrupts may never terminate.
811 dl 1.1 *
812 jsr166 1.39 * @return list of tasks that never commenced execution.
813 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
814     * For tasks submitted via one of the {@code schedule}
815     * methods, the element will be identical to the returned
816     * {@code ScheduledFuture}. For tasks submitted using
817 jsr166 1.84 * {@link #execute execute}, the element will be a
818     * zero-delay {@code ScheduledFuture}.
819 jsr166 1.31 * @throws SecurityException {@inheritDoc}
820 dl 1.1 */
821 tim 1.4 public List<Runnable> shutdownNow() {
822 dl 1.1 return super.shutdownNow();
823     }
824    
825     /**
826 jsr166 1.95 * Returns the task queue used by this executor. Access to the
827     * task queue is intended primarily for debugging and monitoring.
828     * This queue may be in active use. Retrieving the task queue
829     * does not prevent queued tasks from executing.
830     *
831     * <p>Each element of this queue is a {@link ScheduledFuture}.
832 jsr166 1.77 * For tasks submitted via one of the {@code schedule} methods, the
833     * element will be identical to the returned {@code ScheduledFuture}.
834 jsr166 1.84 * For tasks submitted using {@link #execute execute}, the element
835     * will be a zero-delay {@code ScheduledFuture}.
836 jsr166 1.77 *
837     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
838     * tasks in the order in which they will execute.
839 dl 1.1 *
840     * @return the task queue
841     */
842     public BlockingQueue<Runnable> getQueue() {
843     return super.getQueue();
844     }
845    
846 dl 1.13 /**
847 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
848     * class must be declared as a BlockingQueue<Runnable> even though
849 jsr166 1.42 * it can only hold RunnableScheduledFutures.
850 jsr166 1.21 */
851 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
852 dl 1.13 implements BlockingQueue<Runnable> {
853 jsr166 1.21
854 dl 1.40 /*
855     * A DelayedWorkQueue is based on a heap-based data structure
856     * like those in DelayQueue and PriorityQueue, except that
857     * every ScheduledFutureTask also records its index into the
858     * heap array. This eliminates the need to find a task upon
859     * cancellation, greatly speeding up removal (down from O(n)
860     * to O(log n)), and reducing garbage retention that would
861     * otherwise occur by waiting for the element to rise to top
862     * before clearing. But because the queue may also hold
863     * RunnableScheduledFutures that are not ScheduledFutureTasks,
864     * we are not guaranteed to have such indices available, in
865     * which case we fall back to linear search. (We expect that
866     * most tasks will not be decorated, and that the faster cases
867     * will be much more common.)
868     *
869     * All heap operations must record index changes -- mainly
870     * within siftUp and siftDown. Upon removal, a task's
871     * heapIndex is set to -1. Note that ScheduledFutureTasks can
872     * appear at most once in the queue (this need not be true for
873     * other kinds of tasks or work queues), so are uniquely
874     * identified by heapIndex.
875     */
876    
877 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
878 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
879     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
880 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
881 jsr166 1.80 private int size;
882 dl 1.40
883 jsr166 1.48 /**
884     * Thread designated to wait for the task at the head of the
885     * queue. This variant of the Leader-Follower pattern
886     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
887     * minimize unnecessary timed waiting. When a thread becomes
888     * the leader, it waits only for the next delay to elapse, but
889     * other threads await indefinitely. The leader thread must
890     * signal some other thread before returning from take() or
891     * poll(...), unless some other thread becomes leader in the
892     * interim. Whenever the head of the queue is replaced with a
893     * task with an earlier expiration time, the leader field is
894     * invalidated by being reset to null, and some waiting
895     * thread, but not necessarily the current leader, is
896     * signalled. So waiting threads must be prepared to acquire
897     * and lose leadership while waiting.
898     */
899 jsr166 1.80 private Thread leader;
900 jsr166 1.48
901     /**
902     * Condition signalled when a newer task becomes available at the
903     * head of the queue or a new thread may need to become leader.
904     */
905     private final Condition available = lock.newCondition();
906 dl 1.40
907     /**
908 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
909 dl 1.40 */
910 jsr166 1.102 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
911 dl 1.40 if (f instanceof ScheduledFutureTask)
912     ((ScheduledFutureTask)f).heapIndex = idx;
913     }
914    
915     /**
916 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
917 dl 1.40 * Call only when holding lock.
918     */
919 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
920 dl 1.40 while (k > 0) {
921     int parent = (k - 1) >>> 1;
922 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
923 dl 1.40 if (key.compareTo(e) >= 0)
924     break;
925     queue[k] = e;
926     setIndex(e, k);
927     k = parent;
928     }
929     queue[k] = key;
930     setIndex(key, k);
931     }
932    
933     /**
934 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
935 dl 1.40 * Call only when holding lock.
936     */
937 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
938 jsr166 1.42 int half = size >>> 1;
939 dl 1.40 while (k < half) {
940 jsr166 1.42 int child = (k << 1) + 1;
941 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
942 dl 1.40 int right = child + 1;
943     if (right < size && c.compareTo(queue[right]) > 0)
944     c = queue[child = right];
945     if (key.compareTo(c) <= 0)
946     break;
947     queue[k] = c;
948     setIndex(c, k);
949     k = child;
950     }
951     queue[k] = key;
952     setIndex(key, k);
953     }
954    
955     /**
956 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
957 dl 1.40 */
958     private void grow() {
959     int oldCapacity = queue.length;
960     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
961     if (newCapacity < 0) // overflow
962     newCapacity = Integer.MAX_VALUE;
963     queue = Arrays.copyOf(queue, newCapacity);
964     }
965    
966     /**
967 jsr166 1.66 * Finds index of given object, or -1 if absent.
968 dl 1.40 */
969     private int indexOf(Object x) {
970     if (x != null) {
971 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
972     int i = ((ScheduledFutureTask) x).heapIndex;
973     // Sanity check; x could conceivably be a
974     // ScheduledFutureTask from some other pool.
975     if (i >= 0 && i < size && queue[i] == x)
976     return i;
977     } else {
978     for (int i = 0; i < size; i++)
979     if (x.equals(queue[i]))
980     return i;
981     }
982 dl 1.40 }
983     return -1;
984     }
985    
986 jsr166 1.48 public boolean contains(Object x) {
987     final ReentrantLock lock = this.lock;
988 jsr166 1.45 lock.lock();
989     try {
990 jsr166 1.48 return indexOf(x) != -1;
991 jsr166 1.45 } finally {
992     lock.unlock();
993     }
994 jsr166 1.48 }
995 jsr166 1.45
996 dl 1.40 public boolean remove(Object x) {
997     final ReentrantLock lock = this.lock;
998     lock.lock();
999     try {
1000 jsr166 1.45 int i = indexOf(x);
1001 jsr166 1.48 if (i < 0)
1002     return false;
1003 jsr166 1.45
1004 jsr166 1.48 setIndex(queue[i], -1);
1005     int s = --size;
1006 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
1007 jsr166 1.48 queue[s] = null;
1008     if (s != i) {
1009     siftDown(i, replacement);
1010     if (queue[i] == replacement)
1011     siftUp(i, replacement);
1012     }
1013     return true;
1014 dl 1.40 } finally {
1015     lock.unlock();
1016     }
1017     }
1018    
1019     public int size() {
1020     final ReentrantLock lock = this.lock;
1021     lock.lock();
1022     try {
1023 jsr166 1.45 return size;
1024 dl 1.40 } finally {
1025     lock.unlock();
1026     }
1027     }
1028    
1029 jsr166 1.42 public boolean isEmpty() {
1030     return size() == 0;
1031 dl 1.40 }
1032    
1033     public int remainingCapacity() {
1034     return Integer.MAX_VALUE;
1035     }
1036    
1037 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
1038 dl 1.40 final ReentrantLock lock = this.lock;
1039     lock.lock();
1040     try {
1041     return queue[0];
1042     } finally {
1043     lock.unlock();
1044     }
1045 dl 1.13 }
1046    
1047 dl 1.40 public boolean offer(Runnable x) {
1048     if (x == null)
1049     throw new NullPointerException();
1050 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1051 dl 1.40 final ReentrantLock lock = this.lock;
1052     lock.lock();
1053     try {
1054     int i = size;
1055     if (i >= queue.length)
1056     grow();
1057     size = i + 1;
1058     if (i == 0) {
1059     queue[0] = e;
1060     setIndex(e, 0);
1061 jsr166 1.45 } else {
1062 dl 1.40 siftUp(i, e);
1063     }
1064 jsr166 1.46 if (queue[0] == e) {
1065 jsr166 1.48 leader = null;
1066 jsr166 1.46 available.signal();
1067 jsr166 1.48 }
1068 dl 1.40 } finally {
1069     lock.unlock();
1070     }
1071     return true;
1072 jsr166 1.48 }
1073 dl 1.40
1074     public void put(Runnable e) {
1075     offer(e);
1076     }
1077    
1078     public boolean add(Runnable e) {
1079 jsr166 1.48 return offer(e);
1080     }
1081 dl 1.40
1082     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1083     return offer(e);
1084     }
1085 jsr166 1.42
1086 jsr166 1.46 /**
1087     * Performs common bookkeeping for poll and take: Replaces
1088 jsr166 1.47 * first element with last and sifts it down. Call only when
1089     * holding lock.
1090 jsr166 1.46 * @param f the task to remove and return
1091     */
1092 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1093 jsr166 1.46 int s = --size;
1094 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1095 jsr166 1.46 queue[s] = null;
1096     if (s != 0)
1097     siftDown(0, x);
1098     setIndex(f, -1);
1099     return f;
1100     }
1101    
1102 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1103 dl 1.40 final ReentrantLock lock = this.lock;
1104     lock.lock();
1105     try {
1106 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1107 jsr166 1.87 return (first == null || first.getDelay(NANOSECONDS) > 0)
1108     ? null
1109     : finishPoll(first);
1110 dl 1.40 } finally {
1111     lock.unlock();
1112     }
1113     }
1114    
1115 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1116 dl 1.40 final ReentrantLock lock = this.lock;
1117     lock.lockInterruptibly();
1118     try {
1119     for (;;) {
1120 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1121 jsr166 1.42 if (first == null)
1122 dl 1.40 available.await();
1123     else {
1124 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1125 jsr166 1.96 if (delay <= 0L)
1126 jsr166 1.48 return finishPoll(first);
1127 jsr166 1.71 first = null; // don't retain ref while waiting
1128     if (leader != null)
1129 jsr166 1.48 available.await();
1130     else {
1131     Thread thisThread = Thread.currentThread();
1132     leader = thisThread;
1133     try {
1134     available.awaitNanos(delay);
1135     } finally {
1136     if (leader == thisThread)
1137     leader = null;
1138     }
1139     }
1140 dl 1.40 }
1141     }
1142     } finally {
1143 jsr166 1.48 if (leader == null && queue[0] != null)
1144     available.signal();
1145 dl 1.40 lock.unlock();
1146     }
1147     }
1148    
1149 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1150 dl 1.40 throws InterruptedException {
1151     long nanos = unit.toNanos(timeout);
1152     final ReentrantLock lock = this.lock;
1153     lock.lockInterruptibly();
1154     try {
1155     for (;;) {
1156 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1157 dl 1.40 if (first == null) {
1158 jsr166 1.96 if (nanos <= 0L)
1159 dl 1.40 return null;
1160     else
1161     nanos = available.awaitNanos(nanos);
1162     } else {
1163 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1164 jsr166 1.96 if (delay <= 0L)
1165 dl 1.40 return finishPoll(first);
1166 jsr166 1.96 if (nanos <= 0L)
1167 jsr166 1.48 return null;
1168 jsr166 1.71 first = null; // don't retain ref while waiting
1169 jsr166 1.48 if (nanos < delay || leader != null)
1170     nanos = available.awaitNanos(nanos);
1171     else {
1172     Thread thisThread = Thread.currentThread();
1173     leader = thisThread;
1174     try {
1175     long timeLeft = available.awaitNanos(delay);
1176     nanos -= delay - timeLeft;
1177     } finally {
1178     if (leader == thisThread)
1179     leader = null;
1180     }
1181     }
1182     }
1183     }
1184 dl 1.40 } finally {
1185 jsr166 1.48 if (leader == null && queue[0] != null)
1186     available.signal();
1187 dl 1.40 lock.unlock();
1188     }
1189     }
1190    
1191     public void clear() {
1192     final ReentrantLock lock = this.lock;
1193     lock.lock();
1194     try {
1195     for (int i = 0; i < size; i++) {
1196 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1197 dl 1.40 if (t != null) {
1198     queue[i] = null;
1199     setIndex(t, -1);
1200     }
1201     }
1202     size = 0;
1203     } finally {
1204     lock.unlock();
1205     }
1206 dl 1.13 }
1207 dl 1.40
1208     public int drainTo(Collection<? super Runnable> c) {
1209 jsr166 1.104 return drainTo(c, Integer.MAX_VALUE);
1210 dl 1.13 }
1211    
1212 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1213 jsr166 1.106 Objects.requireNonNull(c);
1214 dl 1.40 if (c == this)
1215     throw new IllegalArgumentException();
1216     if (maxElements <= 0)
1217     return 0;
1218     final ReentrantLock lock = this.lock;
1219     lock.lock();
1220     try {
1221     int n = 0;
1222 jsr166 1.106 for (RunnableScheduledFuture<?> first;
1223     n < maxElements
1224     && (first = queue[0]) != null
1225     && first.getDelay(NANOSECONDS) <= 0;) {
1226 jsr166 1.62 c.add(first); // In this order, in case add() throws.
1227     finishPoll(first);
1228 jsr166 1.48 ++n;
1229     }
1230 dl 1.40 return n;
1231     } finally {
1232     lock.unlock();
1233     }
1234     }
1235    
1236     public Object[] toArray() {
1237     final ReentrantLock lock = this.lock;
1238     lock.lock();
1239     try {
1240 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1241 dl 1.40 } finally {
1242     lock.unlock();
1243     }
1244     }
1245    
1246 jsr166 1.48 @SuppressWarnings("unchecked")
1247 dl 1.40 public <T> T[] toArray(T[] a) {
1248     final ReentrantLock lock = this.lock;
1249     lock.lock();
1250     try {
1251     if (a.length < size)
1252     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1253     System.arraycopy(queue, 0, a, 0, size);
1254     if (a.length > size)
1255     a[size] = null;
1256     return a;
1257     } finally {
1258     lock.unlock();
1259     }
1260 dl 1.13 }
1261    
1262 jsr166 1.21 public Iterator<Runnable> iterator() {
1263 jsr166 1.105 final ReentrantLock lock = this.lock;
1264     lock.lock();
1265     try {
1266     return new Itr(Arrays.copyOf(queue, size));
1267     } finally {
1268     lock.unlock();
1269     }
1270 dl 1.40 }
1271 jsr166 1.42
1272 dl 1.40 /**
1273     * Snapshot iterator that works off copy of underlying q array.
1274     */
1275     private class Itr implements Iterator<Runnable> {
1276 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1277 jsr166 1.86 int cursor; // index of next element to return; initially 0
1278     int lastRet = -1; // index of last element returned; -1 if no such
1279 jsr166 1.42
1280 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1281 dl 1.40 this.array = array;
1282     }
1283 jsr166 1.42
1284 dl 1.40 public boolean hasNext() {
1285     return cursor < array.length;
1286     }
1287 jsr166 1.42
1288 dl 1.40 public Runnable next() {
1289     if (cursor >= array.length)
1290     throw new NoSuchElementException();
1291 jsr166 1.101 return array[lastRet = cursor++];
1292 dl 1.40 }
1293 jsr166 1.42
1294 dl 1.40 public void remove() {
1295     if (lastRet < 0)
1296     throw new IllegalStateException();
1297     DelayedWorkQueue.this.remove(array[lastRet]);
1298     lastRet = -1;
1299     }
1300 dl 1.13 }
1301     }
1302 dl 1.1 }