ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.111
Committed: Thu Mar 30 20:38:51 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.110: +55 -36 lines
Log Message:
rework the spec for periodic task execution after shutdown

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.83 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.83
12 jsr166 1.82 import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 jsr166 1.106 import java.util.Objects;
19 jsr166 1.83 import java.util.concurrent.atomic.AtomicLong;
20     import java.util.concurrent.locks.Condition;
21     import java.util.concurrent.locks.ReentrantLock;
22 dl 1.1
23     /**
24 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
25 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
26     * This class is preferable to {@link java.util.Timer} when multiple
27     * worker threads are needed, or when the additional flexibility or
28     * capabilities of {@link ThreadPoolExecutor} (which this class
29     * extends) are required.
30 dl 1.1 *
31 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
32 dl 1.18 * without any real-time guarantees about when, after they are
33     * enabled, they will commence. Tasks scheduled for exactly the same
34     * execution time are enabled in first-in-first-out (FIFO) order of
35 jsr166 1.46 * submission.
36     *
37     * <p>When a submitted task is cancelled before it is run, execution
38 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
39     * automatically removed from the work queue until its delay elapses.
40     * While this enables further inspection and monitoring, it may also
41     * cause unbounded retention of cancelled tasks. To avoid this, use
42     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43     * removed from the work queue at time of cancellation.
44 dl 1.1 *
45 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
46 jsr166 1.84 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48     * do not overlap. While different executions may be performed by
49     * different threads, the effects of prior executions
50     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 dl 1.51 * those of subsequent ones.
52     *
53 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
54 dl 1.8 * of the inherited tuning methods are not useful for it. In
55 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
56     * {@code corePoolSize} threads and an unbounded queue, adjustments
57     * to {@code maximumPoolSize} have no useful effect. Additionally, it
58     * is almost never a good idea to set {@code corePoolSize} to zero or
59     * use {@code allowCoreThreadTimeOut} because this may leave the pool
60     * without threads to handle tasks once they become eligible to run.
61 dl 1.1 *
62 jsr166 1.103 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
63     * this class uses {@link Executors#defaultThreadFactory} as the
64     * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
65     * as the default rejected execution handler.
66     *
67 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
68 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
69 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
70     * methods to generate internal {@link ScheduledFuture} objects to
71     * control per-task delays and scheduling. To preserve
72     * functionality, any further overrides of these methods in
73 dl 1.32 * subclasses must invoke superclass versions, which effectively
74 jsr166 1.39 * disables additional task customization. However, this class
75 dl 1.32 * provides alternative protected extension method
76 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
77     * {@code Callable}) that can be used to customize the concrete task
78     * types used to execute commands entered via {@code execute},
79     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
80     * and {@code scheduleWithFixedDelay}. By default, a
81     * {@code ScheduledThreadPoolExecutor} uses a task type extending
82 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
83     * subclasses of the form:
84     *
85 jsr166 1.85 * <pre> {@code
86 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
87     *
88 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
89 dl 1.23 *
90 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
91     * Runnable r, RunnableScheduledFuture<V> task) {
92     * return new CustomTask<V>(r, task);
93 jsr166 1.29 * }
94 dl 1.23 *
95 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
96     * Callable<V> c, RunnableScheduledFuture<V> task) {
97     * return new CustomTask<V>(c, task);
98 jsr166 1.29 * }
99     * // ... add constructors, etc.
100 jsr166 1.39 * }}</pre>
101     *
102 dl 1.1 * @since 1.5
103     * @author Doug Lea
104     */
105 jsr166 1.21 public class ScheduledThreadPoolExecutor
106     extends ThreadPoolExecutor
107 tim 1.3 implements ScheduledExecutorService {
108 dl 1.1
109 dl 1.37 /*
110     * This class specializes ThreadPoolExecutor implementation by
111     *
112 jsr166 1.99 * 1. Using a custom task type ScheduledFutureTask, even for tasks
113     * that don't require scheduling because they are submitted
114     * using ExecutorService rather than ScheduledExecutorService
115     * methods, which are treated as tasks with a delay of zero.
116 dl 1.37 *
117 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
118 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
119     * the fact that corePoolSize and maximumPoolSize are
120     * effectively identical simplifies some execution mechanics
121 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
122 dl 1.37 *
123     * 3. Supporting optional run-after-shutdown parameters, which
124     * leads to overrides of shutdown methods to remove and cancel
125     * tasks that should NOT be run after shutdown, as well as
126     * different recheck logic when task (re)submission overlaps
127     * with a shutdown.
128     *
129     * 4. Task decoration methods to allow interception and
130     * instrumentation, which are needed because subclasses cannot
131     * otherwise override submit methods to get this effect. These
132     * don't have any impact on pool control logic though.
133     */
134    
135 dl 1.1 /**
136     * False if should cancel/suppress periodic tasks on shutdown.
137     */
138     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
139    
140     /**
141 jsr166 1.110 * False if should cancel non-periodic not-yet-expired tasks on shutdown.
142 dl 1.1 */
143     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
144    
145     /**
146 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
147 dl 1.41 */
148 jsr166 1.90 volatile boolean removeOnCancel;
149 dl 1.41
150     /**
151 dl 1.1 * Sequence number to break scheduling ties, and in turn to
152     * guarantee FIFO order among tied entries.
153     */
154 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
155 dl 1.14
156 jsr166 1.21 private class ScheduledFutureTask<V>
157 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
158 jsr166 1.21
159 dl 1.1 /** Sequence number to break ties FIFO */
160     private final long sequenceNumber;
161 jsr166 1.44
162 jsr166 1.99 /** The nanoTime-based time when the task is enabled to execute. */
163 dl 1.88 private volatile long time;
164 jsr166 1.44
165 dl 1.16 /**
166 jsr166 1.99 * Period for repeating tasks, in nanoseconds.
167 jsr166 1.75 * A positive value indicates fixed-rate execution.
168     * A negative value indicates fixed-delay execution.
169     * A value of 0 indicates a non-repeating (one-shot) task.
170 dl 1.16 */
171 dl 1.1 private final long period;
172    
173 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
174     RunnableScheduledFuture<V> outerTask = this;
175 jsr166 1.44
176 dl 1.1 /**
177 dl 1.40 * Index into delay queue, to support faster cancellation.
178     */
179     int heapIndex;
180    
181     /**
182 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
183 dl 1.1 */
184 jsr166 1.89 ScheduledFutureTask(Runnable r, V result, long triggerTime,
185     long sequenceNumber) {
186 dl 1.1 super(r, result);
187 jsr166 1.75 this.time = triggerTime;
188 dl 1.1 this.period = 0;
189 jsr166 1.89 this.sequenceNumber = sequenceNumber;
190 dl 1.1 }
191    
192     /**
193 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
194     * trigger time and period.
195 dl 1.1 */
196 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197 jsr166 1.89 long period, long sequenceNumber) {
198 dl 1.1 super(r, result);
199 jsr166 1.75 this.time = triggerTime;
200 dl 1.1 this.period = period;
201 jsr166 1.89 this.sequenceNumber = sequenceNumber;
202 dl 1.1 }
203    
204     /**
205 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
206 dl 1.1 */
207 jsr166 1.89 ScheduledFutureTask(Callable<V> callable, long triggerTime,
208     long sequenceNumber) {
209 dl 1.1 super(callable);
210 jsr166 1.75 this.time = triggerTime;
211 dl 1.1 this.period = 0;
212 jsr166 1.89 this.sequenceNumber = sequenceNumber;
213 dl 1.1 }
214    
215     public long getDelay(TimeUnit unit) {
216 jsr166 1.98 return unit.convert(time - System.nanoTime(), NANOSECONDS);
217 dl 1.1 }
218    
219 dl 1.20 public int compareTo(Delayed other) {
220 dl 1.59 if (other == this) // compare zero if same object
221 dl 1.1 return 0;
222 dl 1.34 if (other instanceof ScheduledFutureTask) {
223     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
224     long diff = time - x.time;
225     if (diff < 0)
226     return -1;
227     else if (diff > 0)
228     return 1;
229     else if (sequenceNumber < x.sequenceNumber)
230     return -1;
231     else
232     return 1;
233     }
234 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
235 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
236 dl 1.1 }
237    
238     /**
239 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
240 jsr166 1.30 *
241 jsr166 1.70 * @return {@code true} if periodic
242 dl 1.1 */
243 dl 1.23 public boolean isPeriodic() {
244 dl 1.16 return period != 0;
245 dl 1.1 }
246    
247     /**
248 jsr166 1.39 * Sets the next time to run for a periodic task.
249 dl 1.13 */
250 dl 1.37 private void setNextRunTime() {
251     long p = period;
252     if (p > 0)
253     time += p;
254     else
255 jsr166 1.54 time = triggerTime(-p);
256 dl 1.13 }
257    
258 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
259 jsr166 1.100 // The racy read of heapIndex below is benign:
260     // if heapIndex < 0, then OOTA guarantees that we have surely
261     // been removed; else we recheck under lock in remove()
262 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
263     if (cancelled && removeOnCancel && heapIndex >= 0)
264 jsr166 1.42 remove(this);
265 dl 1.41 return cancelled;
266 dl 1.40 }
267    
268 dl 1.13 /**
269 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
270 jsr166 1.21 */
271 dl 1.1 public void run() {
272 jsr166 1.107 if (!canRunInCurrentRunState(this))
273 dl 1.37 cancel(false);
274 jsr166 1.107 else if (!isPeriodic())
275 jsr166 1.91 super.run();
276     else if (super.runAndReset()) {
277 dl 1.37 setNextRunTime();
278 jsr166 1.44 reExecutePeriodic(outerTask);
279 dl 1.37 }
280 dl 1.1 }
281     }
282    
283     /**
284 jsr166 1.107 * Returns true if can run a task given current run state and
285     * run-after-shutdown parameters.
286 dl 1.37 */
287 jsr166 1.107 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
288     if (!isShutdown())
289     return true;
290     if (isStopped())
291     return false;
292     return task.isPeriodic()
293     ? continueExistingPeriodicTasksAfterShutdown
294     : (executeExistingDelayedTasksAfterShutdown
295     || task.getDelay(NANOSECONDS) <= 0);
296 dl 1.37 }
297    
298     /**
299     * Main execution method for delayed or periodic tasks. If pool
300     * is shut down, rejects the task. Otherwise adds task to queue
301     * and starts a thread, if necessary, to run it. (We cannot
302     * prestart the thread to run the task because the task (probably)
303 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
304 dl 1.37 * is being added, cancel and remove it if required by state and
305 jsr166 1.39 * run-after-shutdown parameters.
306     *
307 dl 1.37 * @param task the task
308     */
309     private void delayedExecute(RunnableScheduledFuture<?> task) {
310     if (isShutdown())
311     reject(task);
312     else {
313     super.getQueue().add(task);
314 jsr166 1.107 if (!canRunInCurrentRunState(task) && remove(task))
315 dl 1.37 task.cancel(false);
316 jsr166 1.48 else
317 dl 1.63 ensurePrestart();
318 dl 1.37 }
319     }
320 jsr166 1.21
321 dl 1.37 /**
322 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
323     * Same idea as delayedExecute except drops task rather than rejecting.
324     *
325 dl 1.37 * @param task the task
326     */
327     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
328 jsr166 1.107 if (canRunInCurrentRunState(task)) {
329 dl 1.37 super.getQueue().add(task);
330 jsr166 1.107 if (canRunInCurrentRunState(task) || !remove(task)) {
331 dl 1.63 ensurePrestart();
332 jsr166 1.107 return;
333     }
334 dl 1.37 }
335 jsr166 1.107 task.cancel(false);
336 dl 1.13 }
337 dl 1.1
338 dl 1.13 /**
339 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
340 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
341 dl 1.13 */
342 dl 1.37 @Override void onShutdown() {
343     BlockingQueue<Runnable> q = super.getQueue();
344     boolean keepDelayed =
345     getExecuteExistingDelayedTasksAfterShutdownPolicy();
346     boolean keepPeriodic =
347     getContinueExistingPeriodicTasksAfterShutdownPolicy();
348 jsr166 1.107 // Traverse snapshot to avoid iterator exceptions
349     // TODO: implement and use efficient removeIf
350     // super.getQueue().removeIf(...);
351     for (Object e : q.toArray()) {
352     if (e instanceof RunnableScheduledFuture) {
353     RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
354     if ((t.isPeriodic()
355     ? !keepPeriodic
356     : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
357     || t.isCancelled()) { // also remove if already cancelled
358     if (q.remove(t))
359     t.cancel(false);
360 dl 1.13 }
361     }
362 dl 1.1 }
363 jsr166 1.48 tryTerminate();
364 dl 1.1 }
365    
366 dl 1.23 /**
367 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
368 jsr166 1.28 * This method can be used to override the concrete
369 dl 1.23 * class used for managing internal tasks.
370 jsr166 1.30 * The default implementation simply returns the given task.
371 jsr166 1.28 *
372 dl 1.23 * @param runnable the submitted Runnable
373     * @param task the task created to execute the runnable
374 jsr166 1.72 * @param <V> the type of the task's result
375 dl 1.23 * @return a task that can execute the runnable
376     * @since 1.6
377     */
378 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
379 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
380     return task;
381 peierls 1.22 }
382    
383 dl 1.23 /**
384 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
385 jsr166 1.28 * This method can be used to override the concrete
386 dl 1.23 * class used for managing internal tasks.
387 jsr166 1.30 * The default implementation simply returns the given task.
388 jsr166 1.28 *
389 dl 1.23 * @param callable the submitted Callable
390     * @param task the task created to execute the callable
391 jsr166 1.72 * @param <V> the type of the task's result
392 dl 1.23 * @return a task that can execute the callable
393     * @since 1.6
394     */
395 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
396 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
397     return task;
398 dl 1.19 }
399    
400 dl 1.1 /**
401 jsr166 1.78 * The default keep-alive time for pool threads.
402     *
403     * Normally, this value is unused because all pool threads will be
404     * core threads, but if a user creates a pool with a corePoolSize
405     * of zero (against our advice), we keep a thread alive as long as
406     * there are queued tasks. If the keep alive time is zero (the
407     * historic value), we end up hot-spinning in getTask, wasting a
408     * CPU. But on the other hand, if we set the value too high, and
409     * users create a one-shot pool which they don't cleanly shutdown,
410     * the pool's non-daemon threads will prevent JVM termination. A
411     * small but non-zero value (relative to a JVM's lifetime) seems
412     * best.
413     */
414     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
415    
416     /**
417 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
418     * given core pool size.
419 jsr166 1.21 *
420 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
421     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
422     * @throws IllegalArgumentException if {@code corePoolSize < 0}
423 dl 1.1 */
424     public ScheduledThreadPoolExecutor(int corePoolSize) {
425 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
426     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
427 dl 1.1 new DelayedWorkQueue());
428     }
429    
430     /**
431 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
432     * given initial parameters.
433 jsr166 1.21 *
434 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
435     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
436 dl 1.1 * @param threadFactory the factory to use when the executor
437 jsr166 1.39 * creates a new thread
438     * @throws IllegalArgumentException if {@code corePoolSize < 0}
439     * @throws NullPointerException if {@code threadFactory} is null
440 dl 1.1 */
441     public ScheduledThreadPoolExecutor(int corePoolSize,
442 jsr166 1.56 ThreadFactory threadFactory) {
443 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
444     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
445 dl 1.1 new DelayedWorkQueue(), threadFactory);
446     }
447    
448     /**
449 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
450     * given initial parameters.
451 jsr166 1.21 *
452 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
453     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 dl 1.1 * @param handler the handler to use when execution is blocked
455 jsr166 1.39 * because the thread bounds and queue capacities are reached
456     * @throws IllegalArgumentException if {@code corePoolSize < 0}
457     * @throws NullPointerException if {@code handler} is null
458 dl 1.1 */
459     public ScheduledThreadPoolExecutor(int corePoolSize,
460 jsr166 1.56 RejectedExecutionHandler handler) {
461 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
462     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463 dl 1.1 new DelayedWorkQueue(), handler);
464     }
465    
466     /**
467 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
468     * given initial parameters.
469 jsr166 1.21 *
470 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
471     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
472 dl 1.1 * @param threadFactory the factory to use when the executor
473 jsr166 1.39 * creates a new thread
474 dl 1.1 * @param handler the handler to use when execution is blocked
475 jsr166 1.39 * because the thread bounds and queue capacities are reached
476     * @throws IllegalArgumentException if {@code corePoolSize < 0}
477     * @throws NullPointerException if {@code threadFactory} or
478     * {@code handler} is null
479 dl 1.1 */
480     public ScheduledThreadPoolExecutor(int corePoolSize,
481 jsr166 1.56 ThreadFactory threadFactory,
482     RejectedExecutionHandler handler) {
483 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
484     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
485 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
486     }
487    
488 dl 1.37 /**
489 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
490 jsr166 1.54 */
491     private long triggerTime(long delay, TimeUnit unit) {
492     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493     }
494    
495     /**
496 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
497 dl 1.50 */
498 jsr166 1.54 long triggerTime(long delay) {
499 jsr166 1.98 return System.nanoTime() +
500 jsr166 1.54 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501     }
502    
503     /**
504     * Constrains the values of all delays in the queue to be within
505     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506     * This may occur if a task is eligible to be dequeued, but has
507     * not yet been, while some other task is added with a delay of
508     * Long.MAX_VALUE.
509     */
510     private long overflowFree(long delay) {
511     Delayed head = (Delayed) super.getQueue().peek();
512     if (head != null) {
513 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
514 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
515     delay = Long.MAX_VALUE + headDelay;
516     }
517     return delay;
518 dl 1.50 }
519    
520     /**
521 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
522     * @throws NullPointerException {@inheritDoc}
523     */
524 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
525     long delay,
526 dl 1.13 TimeUnit unit) {
527 dl 1.9 if (command == null || unit == null)
528 dl 1.1 throw new NullPointerException();
529 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
530 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
531 jsr166 1.89 triggerTime(delay, unit),
532     sequencer.getAndIncrement()));
533 dl 1.1 delayedExecute(t);
534     return t;
535     }
536 jsr166 1.52
537 dl 1.37 /**
538     * @throws RejectedExecutionException {@inheritDoc}
539     * @throws NullPointerException {@inheritDoc}
540     */
541 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542     long delay,
543 dl 1.13 TimeUnit unit) {
544 dl 1.9 if (callable == null || unit == null)
545 dl 1.1 throw new NullPointerException();
546 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
547 jsr166 1.54 new ScheduledFutureTask<V>(callable,
548 jsr166 1.89 triggerTime(delay, unit),
549     sequencer.getAndIncrement()));
550 dl 1.1 delayedExecute(t);
551     return t;
552     }
553    
554 dl 1.37 /**
555 jsr166 1.111 * Submits a periodic action that becomes enabled first after the
556     * given initial delay, and subsequently with the given period;
557     * that is, executions will commence after
558     * {@code initialDelay}, then {@code initialDelay + period}, then
559     * {@code initialDelay + 2 * period}, and so on.
560     *
561     * <p>The sequence of task executions continues indefinitely until
562     * one of the following exceptional completions occur:
563     * <ul>
564     * <li>The task is {@linkplain Future#cancel explicitly cancelled}
565     * via the returned future.
566     * <li>Method {@link #shutdown} is called and the {@linkplain
567     * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
568     * whether to continue after shutdown} is not set true, or method
569     * {@link #shutdownNow} is called; also resulting in task
570     * cancellation.
571     * <li>An execution of the task throws an exception. In this case
572     * calling {@link Future#get() get} on the returned future will throw
573     * {@link ExecutionException}, holding the exception as its cause.
574     * </ul>
575     * Subsequent executions are suppressed. Subsequent calls to
576     * {@link Future#isDone isDone()} on the returned future will
577     * return {@code true}.
578     *
579     * <p>If any execution of this task takes longer than its period, then
580     * subsequent executions may start late, but will not concurrently
581     * execute.
582     *
583 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
584     * @throws NullPointerException {@inheritDoc}
585     * @throws IllegalArgumentException {@inheritDoc}
586     */
587 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
588     long initialDelay,
589     long period,
590 dl 1.13 TimeUnit unit) {
591 dl 1.9 if (command == null || unit == null)
592 dl 1.1 throw new NullPointerException();
593 jsr166 1.96 if (period <= 0L)
594 dl 1.1 throw new IllegalArgumentException();
595 jsr166 1.48 ScheduledFutureTask<Void> sft =
596     new ScheduledFutureTask<Void>(command,
597     null,
598 jsr166 1.54 triggerTime(initialDelay, unit),
599 jsr166 1.89 unit.toNanos(period),
600     sequencer.getAndIncrement());
601 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
602 jsr166 1.48 sft.outerTask = t;
603 dl 1.1 delayedExecute(t);
604     return t;
605     }
606 jsr166 1.21
607 dl 1.37 /**
608 jsr166 1.111 * Submits a periodic action that becomes enabled first after the
609     * given initial delay, and subsequently with the given delay
610     * between the termination of one execution and the commencement of
611     * the next.
612     *
613     * <p>The sequence of task executions continues indefinitely until
614     * one of the following exceptional completions occur:
615     * <ul>
616     * <li>The task is {@linkplain Future#cancel explicitly cancelled}
617     * via the returned future.
618     * <li>Method {@link #shutdown} is called and the {@linkplain
619     * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
620     * whether to continue after shutdown} is not set true, or method
621     * {@link #shutdownNow} is called; also resulting in task
622     * cancellation.
623     * <li>An execution of the task throws an exception. In this case
624     * calling {@link Future#get() get} on the returned future will throw
625     * {@link ExecutionException}, holding the exception as its cause.
626     * </ul>
627     * Subsequent executions are suppressed. Subsequent calls to
628     * {@link Future#isDone isDone()} on the returned future will
629     * return {@code true}.
630     *
631 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
632     * @throws NullPointerException {@inheritDoc}
633     * @throws IllegalArgumentException {@inheritDoc}
634     */
635 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
636     long initialDelay,
637     long delay,
638 dl 1.13 TimeUnit unit) {
639 dl 1.9 if (command == null || unit == null)
640 dl 1.1 throw new NullPointerException();
641 jsr166 1.96 if (delay <= 0L)
642 dl 1.1 throw new IllegalArgumentException();
643 jsr166 1.48 ScheduledFutureTask<Void> sft =
644     new ScheduledFutureTask<Void>(command,
645     null,
646 jsr166 1.54 triggerTime(initialDelay, unit),
647 jsr166 1.97 -unit.toNanos(delay),
648 jsr166 1.89 sequencer.getAndIncrement());
649 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
650 jsr166 1.48 sft.outerTask = t;
651 dl 1.1 delayedExecute(t);
652     return t;
653     }
654 jsr166 1.21
655 dl 1.1 /**
656 jsr166 1.39 * Executes {@code command} with zero required delay.
657     * This has effect equivalent to
658     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
659     * Note that inspections of the queue and of the list returned by
660     * {@code shutdownNow} will access the zero-delayed
661     * {@link ScheduledFuture}, not the {@code command} itself.
662     *
663     * <p>A consequence of the use of {@code ScheduledFuture} objects is
664     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
665     * called with a null second {@code Throwable} argument, even if the
666     * {@code command} terminated abruptly. Instead, the {@code Throwable}
667     * thrown by such a task can be obtained via {@link Future#get}.
668 dl 1.1 *
669     * @throws RejectedExecutionException at discretion of
670 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
671     * cannot be accepted for execution because the
672     * executor has been shut down
673     * @throws NullPointerException {@inheritDoc}
674 dl 1.1 */
675     public void execute(Runnable command) {
676 jsr166 1.62 schedule(command, 0, NANOSECONDS);
677 dl 1.1 }
678    
679 dl 1.13 // Override AbstractExecutorService methods
680    
681 dl 1.37 /**
682     * @throws RejectedExecutionException {@inheritDoc}
683     * @throws NullPointerException {@inheritDoc}
684     */
685 dl 1.7 public Future<?> submit(Runnable task) {
686 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
687 dl 1.7 }
688    
689 dl 1.37 /**
690     * @throws RejectedExecutionException {@inheritDoc}
691     * @throws NullPointerException {@inheritDoc}
692     */
693 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
694 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
695 dl 1.7 }
696    
697 dl 1.37 /**
698     * @throws RejectedExecutionException {@inheritDoc}
699     * @throws NullPointerException {@inheritDoc}
700     */
701 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
702 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
703 dl 1.7 }
704 dl 1.1
705     /**
706 dl 1.37 * Sets the policy on whether to continue executing existing
707 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
708 jsr166 1.111 * In this case, executions will continue until {@code shutdownNow}
709     * or the policy is set to {@code false} when already shutdown.
710 jsr166 1.39 * This value is by default {@code false}.
711 jsr166 1.30 *
712 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
713 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
714 dl 1.1 */
715     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
716     continueExistingPeriodicTasksAfterShutdown = value;
717 jsr166 1.39 if (!value && isShutdown())
718 dl 1.37 onShutdown();
719 dl 1.1 }
720    
721     /**
722 jsr166 1.21 * Gets the policy on whether to continue executing existing
723 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
724 jsr166 1.111 * In this case, executions will continue until {@code shutdownNow}
725     * or the policy is set to {@code false} when already shutdown.
726 jsr166 1.39 * This value is by default {@code false}.
727 jsr166 1.30 *
728 jsr166 1.39 * @return {@code true} if will continue after shutdown
729 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
730 dl 1.1 */
731     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
732     return continueExistingPeriodicTasksAfterShutdown;
733     }
734    
735     /**
736 jsr166 1.21 * Sets the policy on whether to execute existing delayed
737 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
738     * In this case, these tasks will only terminate upon
739     * {@code shutdownNow}, or after setting the policy to
740     * {@code false} when already shutdown.
741     * This value is by default {@code true}.
742 jsr166 1.30 *
743 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
744 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
745 dl 1.1 */
746     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
747     executeExistingDelayedTasksAfterShutdown = value;
748 jsr166 1.39 if (!value && isShutdown())
749 dl 1.37 onShutdown();
750 dl 1.1 }
751    
752     /**
753 jsr166 1.21 * Gets the policy on whether to execute existing delayed
754 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
755     * In this case, these tasks will only terminate upon
756     * {@code shutdownNow}, or after setting the policy to
757     * {@code false} when already shutdown.
758     * This value is by default {@code true}.
759 jsr166 1.30 *
760 jsr166 1.39 * @return {@code true} if will execute after shutdown
761 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
762 dl 1.1 */
763     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
764     return executeExistingDelayedTasksAfterShutdown;
765     }
766    
767     /**
768 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
769     * removed from the work queue at time of cancellation. This value is
770     * by default {@code false}.
771 dl 1.41 *
772 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
773 dl 1.41 * @see #getRemoveOnCancelPolicy
774 jsr166 1.43 * @since 1.7
775 dl 1.41 */
776     public void setRemoveOnCancelPolicy(boolean value) {
777     removeOnCancel = value;
778     }
779    
780     /**
781 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
782     * removed from the work queue at time of cancellation. This value is
783     * by default {@code false}.
784 dl 1.41 *
785 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
786     * from the queue
787 dl 1.41 * @see #setRemoveOnCancelPolicy
788 jsr166 1.43 * @since 1.7
789 dl 1.41 */
790     public boolean getRemoveOnCancelPolicy() {
791     return removeOnCancel;
792     }
793    
794     /**
795 dl 1.1 * Initiates an orderly shutdown in which previously submitted
796 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
797     * Invocation has no additional effect if already shut down.
798     *
799     * <p>This method does not wait for previously submitted tasks to
800     * complete execution. Use {@link #awaitTermination awaitTermination}
801     * to do that.
802     *
803     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
804     * has been set {@code false}, existing delayed tasks whose delays
805     * have not yet elapsed are cancelled. And unless the {@code
806     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
807     * {@code true}, future executions of existing periodic tasks will
808     * be cancelled.
809 jsr166 1.39 *
810     * @throws SecurityException {@inheritDoc}
811 dl 1.1 */
812     public void shutdown() {
813     super.shutdown();
814     }
815    
816     /**
817     * Attempts to stop all actively executing tasks, halts the
818 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
819 jsr166 1.92 * that were awaiting execution. These tasks are drained (removed)
820     * from the task queue upon return from this method.
821 jsr166 1.21 *
822 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
823     * terminate. Use {@link #awaitTermination awaitTermination} to
824     * do that.
825     *
826 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
827 dl 1.18 * processing actively executing tasks. This implementation
828 jsr166 1.93 * interrupts tasks via {@link Thread#interrupt}; any task that
829 jsr166 1.31 * fails to respond to interrupts may never terminate.
830 dl 1.1 *
831 jsr166 1.39 * @return list of tasks that never commenced execution.
832 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
833     * For tasks submitted via one of the {@code schedule}
834     * methods, the element will be identical to the returned
835     * {@code ScheduledFuture}. For tasks submitted using
836 jsr166 1.84 * {@link #execute execute}, the element will be a
837     * zero-delay {@code ScheduledFuture}.
838 jsr166 1.31 * @throws SecurityException {@inheritDoc}
839 dl 1.1 */
840 tim 1.4 public List<Runnable> shutdownNow() {
841 dl 1.1 return super.shutdownNow();
842     }
843    
844     /**
845 jsr166 1.95 * Returns the task queue used by this executor. Access to the
846     * task queue is intended primarily for debugging and monitoring.
847     * This queue may be in active use. Retrieving the task queue
848     * does not prevent queued tasks from executing.
849     *
850     * <p>Each element of this queue is a {@link ScheduledFuture}.
851 jsr166 1.77 * For tasks submitted via one of the {@code schedule} methods, the
852     * element will be identical to the returned {@code ScheduledFuture}.
853 jsr166 1.84 * For tasks submitted using {@link #execute execute}, the element
854     * will be a zero-delay {@code ScheduledFuture}.
855 jsr166 1.77 *
856     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
857     * tasks in the order in which they will execute.
858 dl 1.1 *
859     * @return the task queue
860     */
861     public BlockingQueue<Runnable> getQueue() {
862     return super.getQueue();
863     }
864    
865 dl 1.13 /**
866 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
867     * class must be declared as a BlockingQueue<Runnable> even though
868 jsr166 1.42 * it can only hold RunnableScheduledFutures.
869 jsr166 1.21 */
870 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
871 dl 1.13 implements BlockingQueue<Runnable> {
872 jsr166 1.21
873 dl 1.40 /*
874     * A DelayedWorkQueue is based on a heap-based data structure
875     * like those in DelayQueue and PriorityQueue, except that
876     * every ScheduledFutureTask also records its index into the
877     * heap array. This eliminates the need to find a task upon
878     * cancellation, greatly speeding up removal (down from O(n)
879     * to O(log n)), and reducing garbage retention that would
880     * otherwise occur by waiting for the element to rise to top
881     * before clearing. But because the queue may also hold
882     * RunnableScheduledFutures that are not ScheduledFutureTasks,
883     * we are not guaranteed to have such indices available, in
884     * which case we fall back to linear search. (We expect that
885     * most tasks will not be decorated, and that the faster cases
886     * will be much more common.)
887     *
888     * All heap operations must record index changes -- mainly
889     * within siftUp and siftDown. Upon removal, a task's
890     * heapIndex is set to -1. Note that ScheduledFutureTasks can
891     * appear at most once in the queue (this need not be true for
892     * other kinds of tasks or work queues), so are uniquely
893     * identified by heapIndex.
894     */
895    
896 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
897 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
898     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
899 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
900 jsr166 1.80 private int size;
901 dl 1.40
902 jsr166 1.48 /**
903     * Thread designated to wait for the task at the head of the
904     * queue. This variant of the Leader-Follower pattern
905     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
906     * minimize unnecessary timed waiting. When a thread becomes
907     * the leader, it waits only for the next delay to elapse, but
908     * other threads await indefinitely. The leader thread must
909     * signal some other thread before returning from take() or
910     * poll(...), unless some other thread becomes leader in the
911     * interim. Whenever the head of the queue is replaced with a
912     * task with an earlier expiration time, the leader field is
913     * invalidated by being reset to null, and some waiting
914     * thread, but not necessarily the current leader, is
915     * signalled. So waiting threads must be prepared to acquire
916     * and lose leadership while waiting.
917     */
918 jsr166 1.80 private Thread leader;
919 jsr166 1.48
920     /**
921     * Condition signalled when a newer task becomes available at the
922     * head of the queue or a new thread may need to become leader.
923     */
924     private final Condition available = lock.newCondition();
925 dl 1.40
926     /**
927 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
928 dl 1.40 */
929 jsr166 1.102 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
930 dl 1.40 if (f instanceof ScheduledFutureTask)
931     ((ScheduledFutureTask)f).heapIndex = idx;
932     }
933    
934     /**
935 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
936 dl 1.40 * Call only when holding lock.
937     */
938 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
939 dl 1.40 while (k > 0) {
940     int parent = (k - 1) >>> 1;
941 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
942 dl 1.40 if (key.compareTo(e) >= 0)
943     break;
944     queue[k] = e;
945     setIndex(e, k);
946     k = parent;
947     }
948     queue[k] = key;
949     setIndex(key, k);
950     }
951    
952     /**
953 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
954 dl 1.40 * Call only when holding lock.
955     */
956 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
957 jsr166 1.42 int half = size >>> 1;
958 dl 1.40 while (k < half) {
959 jsr166 1.42 int child = (k << 1) + 1;
960 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
961 dl 1.40 int right = child + 1;
962     if (right < size && c.compareTo(queue[right]) > 0)
963     c = queue[child = right];
964     if (key.compareTo(c) <= 0)
965     break;
966     queue[k] = c;
967     setIndex(c, k);
968     k = child;
969     }
970     queue[k] = key;
971     setIndex(key, k);
972     }
973    
974     /**
975 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
976 dl 1.40 */
977     private void grow() {
978     int oldCapacity = queue.length;
979     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
980     if (newCapacity < 0) // overflow
981     newCapacity = Integer.MAX_VALUE;
982     queue = Arrays.copyOf(queue, newCapacity);
983     }
984    
985     /**
986 jsr166 1.66 * Finds index of given object, or -1 if absent.
987 dl 1.40 */
988     private int indexOf(Object x) {
989     if (x != null) {
990 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
991     int i = ((ScheduledFutureTask) x).heapIndex;
992     // Sanity check; x could conceivably be a
993     // ScheduledFutureTask from some other pool.
994     if (i >= 0 && i < size && queue[i] == x)
995     return i;
996     } else {
997     for (int i = 0; i < size; i++)
998     if (x.equals(queue[i]))
999     return i;
1000     }
1001 dl 1.40 }
1002     return -1;
1003     }
1004    
1005 jsr166 1.48 public boolean contains(Object x) {
1006     final ReentrantLock lock = this.lock;
1007 jsr166 1.45 lock.lock();
1008     try {
1009 jsr166 1.48 return indexOf(x) != -1;
1010 jsr166 1.45 } finally {
1011     lock.unlock();
1012     }
1013 jsr166 1.48 }
1014 jsr166 1.45
1015 dl 1.40 public boolean remove(Object x) {
1016     final ReentrantLock lock = this.lock;
1017     lock.lock();
1018     try {
1019 jsr166 1.45 int i = indexOf(x);
1020 jsr166 1.48 if (i < 0)
1021     return false;
1022 jsr166 1.45
1023 jsr166 1.48 setIndex(queue[i], -1);
1024     int s = --size;
1025 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
1026 jsr166 1.48 queue[s] = null;
1027     if (s != i) {
1028     siftDown(i, replacement);
1029     if (queue[i] == replacement)
1030     siftUp(i, replacement);
1031     }
1032     return true;
1033 dl 1.40 } finally {
1034     lock.unlock();
1035     }
1036     }
1037    
1038     public int size() {
1039     final ReentrantLock lock = this.lock;
1040     lock.lock();
1041     try {
1042 jsr166 1.45 return size;
1043 dl 1.40 } finally {
1044     lock.unlock();
1045     }
1046     }
1047    
1048 jsr166 1.42 public boolean isEmpty() {
1049     return size() == 0;
1050 dl 1.40 }
1051    
1052     public int remainingCapacity() {
1053     return Integer.MAX_VALUE;
1054     }
1055    
1056 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
1057 dl 1.40 final ReentrantLock lock = this.lock;
1058     lock.lock();
1059     try {
1060     return queue[0];
1061     } finally {
1062     lock.unlock();
1063     }
1064 dl 1.13 }
1065    
1066 dl 1.40 public boolean offer(Runnable x) {
1067     if (x == null)
1068     throw new NullPointerException();
1069 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1070 dl 1.40 final ReentrantLock lock = this.lock;
1071     lock.lock();
1072     try {
1073     int i = size;
1074     if (i >= queue.length)
1075     grow();
1076     size = i + 1;
1077     if (i == 0) {
1078     queue[0] = e;
1079     setIndex(e, 0);
1080 jsr166 1.45 } else {
1081 dl 1.40 siftUp(i, e);
1082     }
1083 jsr166 1.46 if (queue[0] == e) {
1084 jsr166 1.48 leader = null;
1085 jsr166 1.46 available.signal();
1086 jsr166 1.48 }
1087 dl 1.40 } finally {
1088     lock.unlock();
1089     }
1090     return true;
1091 jsr166 1.48 }
1092 dl 1.40
1093     public void put(Runnable e) {
1094     offer(e);
1095     }
1096    
1097     public boolean add(Runnable e) {
1098 jsr166 1.48 return offer(e);
1099     }
1100 dl 1.40
1101     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1102     return offer(e);
1103     }
1104 jsr166 1.42
1105 jsr166 1.46 /**
1106     * Performs common bookkeeping for poll and take: Replaces
1107 jsr166 1.47 * first element with last and sifts it down. Call only when
1108     * holding lock.
1109 jsr166 1.46 * @param f the task to remove and return
1110     */
1111 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1112 jsr166 1.46 int s = --size;
1113 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1114 jsr166 1.46 queue[s] = null;
1115     if (s != 0)
1116     siftDown(0, x);
1117     setIndex(f, -1);
1118     return f;
1119     }
1120    
1121 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1122 dl 1.40 final ReentrantLock lock = this.lock;
1123     lock.lock();
1124     try {
1125 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1126 jsr166 1.87 return (first == null || first.getDelay(NANOSECONDS) > 0)
1127     ? null
1128     : finishPoll(first);
1129 dl 1.40 } finally {
1130     lock.unlock();
1131     }
1132     }
1133    
1134 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1135 dl 1.40 final ReentrantLock lock = this.lock;
1136     lock.lockInterruptibly();
1137     try {
1138     for (;;) {
1139 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1140 jsr166 1.42 if (first == null)
1141 dl 1.40 available.await();
1142     else {
1143 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1144 jsr166 1.96 if (delay <= 0L)
1145 jsr166 1.48 return finishPoll(first);
1146 jsr166 1.71 first = null; // don't retain ref while waiting
1147     if (leader != null)
1148 jsr166 1.48 available.await();
1149     else {
1150     Thread thisThread = Thread.currentThread();
1151     leader = thisThread;
1152     try {
1153     available.awaitNanos(delay);
1154     } finally {
1155     if (leader == thisThread)
1156     leader = null;
1157     }
1158     }
1159 dl 1.40 }
1160     }
1161     } finally {
1162 jsr166 1.48 if (leader == null && queue[0] != null)
1163     available.signal();
1164 dl 1.40 lock.unlock();
1165     }
1166     }
1167    
1168 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1169 dl 1.40 throws InterruptedException {
1170     long nanos = unit.toNanos(timeout);
1171     final ReentrantLock lock = this.lock;
1172     lock.lockInterruptibly();
1173     try {
1174     for (;;) {
1175 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1176 dl 1.40 if (first == null) {
1177 jsr166 1.96 if (nanos <= 0L)
1178 dl 1.40 return null;
1179     else
1180     nanos = available.awaitNanos(nanos);
1181     } else {
1182 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1183 jsr166 1.96 if (delay <= 0L)
1184 dl 1.40 return finishPoll(first);
1185 jsr166 1.96 if (nanos <= 0L)
1186 jsr166 1.48 return null;
1187 jsr166 1.71 first = null; // don't retain ref while waiting
1188 jsr166 1.48 if (nanos < delay || leader != null)
1189     nanos = available.awaitNanos(nanos);
1190     else {
1191     Thread thisThread = Thread.currentThread();
1192     leader = thisThread;
1193     try {
1194     long timeLeft = available.awaitNanos(delay);
1195     nanos -= delay - timeLeft;
1196     } finally {
1197     if (leader == thisThread)
1198     leader = null;
1199     }
1200     }
1201     }
1202     }
1203 dl 1.40 } finally {
1204 jsr166 1.48 if (leader == null && queue[0] != null)
1205     available.signal();
1206 dl 1.40 lock.unlock();
1207     }
1208     }
1209    
1210     public void clear() {
1211     final ReentrantLock lock = this.lock;
1212     lock.lock();
1213     try {
1214     for (int i = 0; i < size; i++) {
1215 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1216 dl 1.40 if (t != null) {
1217     queue[i] = null;
1218     setIndex(t, -1);
1219     }
1220     }
1221     size = 0;
1222     } finally {
1223     lock.unlock();
1224     }
1225 dl 1.13 }
1226 dl 1.40
1227     public int drainTo(Collection<? super Runnable> c) {
1228 jsr166 1.104 return drainTo(c, Integer.MAX_VALUE);
1229 dl 1.13 }
1230    
1231 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1232 jsr166 1.106 Objects.requireNonNull(c);
1233 dl 1.40 if (c == this)
1234     throw new IllegalArgumentException();
1235     if (maxElements <= 0)
1236     return 0;
1237     final ReentrantLock lock = this.lock;
1238     lock.lock();
1239     try {
1240     int n = 0;
1241 jsr166 1.106 for (RunnableScheduledFuture<?> first;
1242     n < maxElements
1243     && (first = queue[0]) != null
1244     && first.getDelay(NANOSECONDS) <= 0;) {
1245 jsr166 1.62 c.add(first); // In this order, in case add() throws.
1246     finishPoll(first);
1247 jsr166 1.48 ++n;
1248     }
1249 dl 1.40 return n;
1250     } finally {
1251     lock.unlock();
1252     }
1253     }
1254    
1255     public Object[] toArray() {
1256     final ReentrantLock lock = this.lock;
1257     lock.lock();
1258     try {
1259 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1260 dl 1.40 } finally {
1261     lock.unlock();
1262     }
1263     }
1264    
1265 jsr166 1.48 @SuppressWarnings("unchecked")
1266 dl 1.40 public <T> T[] toArray(T[] a) {
1267     final ReentrantLock lock = this.lock;
1268     lock.lock();
1269     try {
1270     if (a.length < size)
1271     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1272     System.arraycopy(queue, 0, a, 0, size);
1273     if (a.length > size)
1274     a[size] = null;
1275     return a;
1276     } finally {
1277     lock.unlock();
1278     }
1279 dl 1.13 }
1280    
1281 jsr166 1.21 public Iterator<Runnable> iterator() {
1282 jsr166 1.105 final ReentrantLock lock = this.lock;
1283     lock.lock();
1284     try {
1285     return new Itr(Arrays.copyOf(queue, size));
1286     } finally {
1287     lock.unlock();
1288     }
1289 dl 1.40 }
1290 jsr166 1.42
1291 dl 1.40 /**
1292     * Snapshot iterator that works off copy of underlying q array.
1293     */
1294     private class Itr implements Iterator<Runnable> {
1295 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1296 jsr166 1.86 int cursor; // index of next element to return; initially 0
1297     int lastRet = -1; // index of last element returned; -1 if no such
1298 jsr166 1.42
1299 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1300 dl 1.40 this.array = array;
1301     }
1302 jsr166 1.42
1303 dl 1.40 public boolean hasNext() {
1304     return cursor < array.length;
1305     }
1306 jsr166 1.42
1307 dl 1.40 public Runnable next() {
1308     if (cursor >= array.length)
1309     throw new NoSuchElementException();
1310 jsr166 1.101 return array[lastRet = cursor++];
1311 dl 1.40 }
1312 jsr166 1.42
1313 dl 1.40 public void remove() {
1314     if (lastRet < 0)
1315     throw new IllegalStateException();
1316     DelayedWorkQueue.this.remove(array[lastRet]);
1317     lastRet = -1;
1318     }
1319 dl 1.13 }
1320     }
1321 dl 1.1 }