ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.99
Committed: Fri Nov 20 19:54:03 2015 UTC (8 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.98: +6 -7 lines
Log Message:
improve some comments

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.83 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.83
12 jsr166 1.82 import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 jsr166 1.83 import java.util.concurrent.atomic.AtomicLong;
19     import java.util.concurrent.locks.Condition;
20     import java.util.concurrent.locks.ReentrantLock;
21 dl 1.1
22     /**
23 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
24 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
25     * This class is preferable to {@link java.util.Timer} when multiple
26     * worker threads are needed, or when the additional flexibility or
27     * capabilities of {@link ThreadPoolExecutor} (which this class
28     * extends) are required.
29 dl 1.1 *
30 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
31 dl 1.18 * without any real-time guarantees about when, after they are
32     * enabled, they will commence. Tasks scheduled for exactly the same
33     * execution time are enabled in first-in-first-out (FIFO) order of
34 jsr166 1.46 * submission.
35     *
36     * <p>When a submitted task is cancelled before it is run, execution
37 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
38     * automatically removed from the work queue until its delay elapses.
39     * While this enables further inspection and monitoring, it may also
40     * cause unbounded retention of cancelled tasks. To avoid this, use
41     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42     * removed from the work queue at time of cancellation.
43 dl 1.1 *
44 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
45 jsr166 1.84 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47     * do not overlap. While different executions may be performed by
48     * different threads, the effects of prior executions
49     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 dl 1.51 * those of subsequent ones.
51     *
52 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 dl 1.8 * of the inherited tuning methods are not useful for it. In
54 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
55     * {@code corePoolSize} threads and an unbounded queue, adjustments
56     * to {@code maximumPoolSize} have no useful effect. Additionally, it
57     * is almost never a good idea to set {@code corePoolSize} to zero or
58     * use {@code allowCoreThreadTimeOut} because this may leave the pool
59     * without threads to handle tasks once they become eligible to run.
60 dl 1.1 *
61 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
62 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
64     * methods to generate internal {@link ScheduledFuture} objects to
65     * control per-task delays and scheduling. To preserve
66     * functionality, any further overrides of these methods in
67 dl 1.32 * subclasses must invoke superclass versions, which effectively
68 jsr166 1.39 * disables additional task customization. However, this class
69 dl 1.32 * provides alternative protected extension method
70 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
71     * {@code Callable}) that can be used to customize the concrete task
72     * types used to execute commands entered via {@code execute},
73     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74     * and {@code scheduleWithFixedDelay}. By default, a
75     * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
77     * subclasses of the form:
78     *
79 jsr166 1.85 * <pre> {@code
80 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81     *
82 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 dl 1.23 *
84 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
85     * Runnable r, RunnableScheduledFuture<V> task) {
86     * return new CustomTask<V>(r, task);
87 jsr166 1.29 * }
88 dl 1.23 *
89 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
90     * Callable<V> c, RunnableScheduledFuture<V> task) {
91     * return new CustomTask<V>(c, task);
92 jsr166 1.29 * }
93     * // ... add constructors, etc.
94 jsr166 1.39 * }}</pre>
95     *
96 dl 1.1 * @since 1.5
97     * @author Doug Lea
98     */
99 jsr166 1.21 public class ScheduledThreadPoolExecutor
100     extends ThreadPoolExecutor
101 tim 1.3 implements ScheduledExecutorService {
102 dl 1.1
103 dl 1.37 /*
104     * This class specializes ThreadPoolExecutor implementation by
105     *
106 jsr166 1.99 * 1. Using a custom task type ScheduledFutureTask, even for tasks
107     * that don't require scheduling because they are submitted
108     * using ExecutorService rather than ScheduledExecutorService
109     * methods, which are treated as tasks with a delay of zero.
110 dl 1.37 *
111 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
112 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
113     * the fact that corePoolSize and maximumPoolSize are
114     * effectively identical simplifies some execution mechanics
115 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
116 dl 1.37 *
117     * 3. Supporting optional run-after-shutdown parameters, which
118     * leads to overrides of shutdown methods to remove and cancel
119     * tasks that should NOT be run after shutdown, as well as
120     * different recheck logic when task (re)submission overlaps
121     * with a shutdown.
122     *
123     * 4. Task decoration methods to allow interception and
124     * instrumentation, which are needed because subclasses cannot
125     * otherwise override submit methods to get this effect. These
126     * don't have any impact on pool control logic though.
127     */
128    
129 dl 1.1 /**
130     * False if should cancel/suppress periodic tasks on shutdown.
131     */
132     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
133    
134     /**
135     * False if should cancel non-periodic tasks on shutdown.
136     */
137     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
138    
139     /**
140 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
141 dl 1.41 */
142 jsr166 1.90 volatile boolean removeOnCancel;
143 dl 1.41
144     /**
145 dl 1.1 * Sequence number to break scheduling ties, and in turn to
146     * guarantee FIFO order among tied entries.
147     */
148 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
149 dl 1.14
150 jsr166 1.21 private class ScheduledFutureTask<V>
151 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152 jsr166 1.21
153 dl 1.1 /** Sequence number to break ties FIFO */
154     private final long sequenceNumber;
155 jsr166 1.44
156 jsr166 1.99 /** The nanoTime-based time when the task is enabled to execute. */
157 dl 1.88 private volatile long time;
158 jsr166 1.44
159 dl 1.16 /**
160 jsr166 1.99 * Period for repeating tasks, in nanoseconds.
161 jsr166 1.75 * A positive value indicates fixed-rate execution.
162     * A negative value indicates fixed-delay execution.
163     * A value of 0 indicates a non-repeating (one-shot) task.
164 dl 1.16 */
165 dl 1.1 private final long period;
166    
167 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
168     RunnableScheduledFuture<V> outerTask = this;
169 jsr166 1.44
170 dl 1.1 /**
171 dl 1.40 * Index into delay queue, to support faster cancellation.
172     */
173     int heapIndex;
174    
175     /**
176 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
177 dl 1.1 */
178 jsr166 1.89 ScheduledFutureTask(Runnable r, V result, long triggerTime,
179     long sequenceNumber) {
180 dl 1.1 super(r, result);
181 jsr166 1.75 this.time = triggerTime;
182 dl 1.1 this.period = 0;
183 jsr166 1.89 this.sequenceNumber = sequenceNumber;
184 dl 1.1 }
185    
186     /**
187 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
188     * trigger time and period.
189 dl 1.1 */
190 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
191 jsr166 1.89 long period, long sequenceNumber) {
192 dl 1.1 super(r, result);
193 jsr166 1.75 this.time = triggerTime;
194 dl 1.1 this.period = period;
195 jsr166 1.89 this.sequenceNumber = sequenceNumber;
196 dl 1.1 }
197    
198     /**
199 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
200 dl 1.1 */
201 jsr166 1.89 ScheduledFutureTask(Callable<V> callable, long triggerTime,
202     long sequenceNumber) {
203 dl 1.1 super(callable);
204 jsr166 1.75 this.time = triggerTime;
205 dl 1.1 this.period = 0;
206 jsr166 1.89 this.sequenceNumber = sequenceNumber;
207 dl 1.1 }
208    
209     public long getDelay(TimeUnit unit) {
210 jsr166 1.98 return unit.convert(time - System.nanoTime(), NANOSECONDS);
211 dl 1.1 }
212    
213 dl 1.20 public int compareTo(Delayed other) {
214 dl 1.59 if (other == this) // compare zero if same object
215 dl 1.1 return 0;
216 dl 1.34 if (other instanceof ScheduledFutureTask) {
217     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
218     long diff = time - x.time;
219     if (diff < 0)
220     return -1;
221     else if (diff > 0)
222     return 1;
223     else if (sequenceNumber < x.sequenceNumber)
224     return -1;
225     else
226     return 1;
227     }
228 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
229 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
230 dl 1.1 }
231    
232     /**
233 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
234 jsr166 1.30 *
235 jsr166 1.70 * @return {@code true} if periodic
236 dl 1.1 */
237 dl 1.23 public boolean isPeriodic() {
238 dl 1.16 return period != 0;
239 dl 1.1 }
240    
241     /**
242 jsr166 1.39 * Sets the next time to run for a periodic task.
243 dl 1.13 */
244 dl 1.37 private void setNextRunTime() {
245     long p = period;
246     if (p > 0)
247     time += p;
248     else
249 jsr166 1.54 time = triggerTime(-p);
250 dl 1.13 }
251    
252 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
253 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
254     if (cancelled && removeOnCancel && heapIndex >= 0)
255 jsr166 1.42 remove(this);
256 dl 1.41 return cancelled;
257 dl 1.40 }
258    
259 dl 1.13 /**
260 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
261 jsr166 1.21 */
262 dl 1.1 public void run() {
263 dl 1.37 boolean periodic = isPeriodic();
264     if (!canRunInCurrentRunState(periodic))
265     cancel(false);
266     else if (!periodic)
267 jsr166 1.91 super.run();
268     else if (super.runAndReset()) {
269 dl 1.37 setNextRunTime();
270 jsr166 1.44 reExecutePeriodic(outerTask);
271 dl 1.37 }
272 dl 1.1 }
273     }
274    
275     /**
276 dl 1.37 * Returns true if can run a task given current run state
277 jsr166 1.39 * and run-after-shutdown parameters.
278     *
279 dl 1.37 * @param periodic true if this task periodic, false if delayed
280     */
281     boolean canRunInCurrentRunState(boolean periodic) {
282 jsr166 1.38 return isRunningOrShutdown(periodic ?
283 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
284     executeExistingDelayedTasksAfterShutdown);
285     }
286    
287     /**
288     * Main execution method for delayed or periodic tasks. If pool
289     * is shut down, rejects the task. Otherwise adds task to queue
290     * and starts a thread, if necessary, to run it. (We cannot
291     * prestart the thread to run the task because the task (probably)
292 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
293 dl 1.37 * is being added, cancel and remove it if required by state and
294 jsr166 1.39 * run-after-shutdown parameters.
295     *
296 dl 1.37 * @param task the task
297     */
298     private void delayedExecute(RunnableScheduledFuture<?> task) {
299     if (isShutdown())
300     reject(task);
301     else {
302     super.getQueue().add(task);
303     if (isShutdown() &&
304     !canRunInCurrentRunState(task.isPeriodic()) &&
305     remove(task))
306     task.cancel(false);
307 jsr166 1.48 else
308 dl 1.63 ensurePrestart();
309 dl 1.37 }
310     }
311 jsr166 1.21
312 dl 1.37 /**
313 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
314     * Same idea as delayedExecute except drops task rather than rejecting.
315     *
316 dl 1.37 * @param task the task
317     */
318     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
319     if (canRunInCurrentRunState(true)) {
320     super.getQueue().add(task);
321     if (!canRunInCurrentRunState(true) && remove(task))
322     task.cancel(false);
323 jsr166 1.48 else
324 dl 1.63 ensurePrestart();
325 dl 1.37 }
326 dl 1.13 }
327 dl 1.1
328 dl 1.13 /**
329 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
330 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
331 dl 1.13 */
332 dl 1.37 @Override void onShutdown() {
333     BlockingQueue<Runnable> q = super.getQueue();
334     boolean keepDelayed =
335     getExecuteExistingDelayedTasksAfterShutdownPolicy();
336     boolean keepPeriodic =
337     getContinueExistingPeriodicTasksAfterShutdownPolicy();
338 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
339     for (Object e : q.toArray())
340     if (e instanceof RunnableScheduledFuture<?>)
341     ((RunnableScheduledFuture<?>) e).cancel(false);
342 dl 1.37 q.clear();
343 jsr166 1.57 }
344 dl 1.37 else {
345     // Traverse snapshot to avoid iterator exceptions
346 jsr166 1.39 for (Object e : q.toArray()) {
347 dl 1.23 if (e instanceof RunnableScheduledFuture) {
348 dl 1.37 RunnableScheduledFuture<?> t =
349     (RunnableScheduledFuture<?>)e;
350 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
351     t.isCancelled()) { // also remove if already cancelled
352     if (q.remove(t))
353     t.cancel(false);
354     }
355 dl 1.13 }
356     }
357 dl 1.1 }
358 jsr166 1.48 tryTerminate();
359 dl 1.1 }
360    
361 dl 1.23 /**
362 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
363 jsr166 1.28 * This method can be used to override the concrete
364 dl 1.23 * class used for managing internal tasks.
365 jsr166 1.30 * The default implementation simply returns the given task.
366 jsr166 1.28 *
367 dl 1.23 * @param runnable the submitted Runnable
368     * @param task the task created to execute the runnable
369 jsr166 1.72 * @param <V> the type of the task's result
370 dl 1.23 * @return a task that can execute the runnable
371     * @since 1.6
372     */
373 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
374 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
375     return task;
376 peierls 1.22 }
377    
378 dl 1.23 /**
379 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
380 jsr166 1.28 * This method can be used to override the concrete
381 dl 1.23 * class used for managing internal tasks.
382 jsr166 1.30 * The default implementation simply returns the given task.
383 jsr166 1.28 *
384 dl 1.23 * @param callable the submitted Callable
385     * @param task the task created to execute the callable
386 jsr166 1.72 * @param <V> the type of the task's result
387 dl 1.23 * @return a task that can execute the callable
388     * @since 1.6
389     */
390 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
391 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
392     return task;
393 dl 1.19 }
394    
395 dl 1.1 /**
396 jsr166 1.78 * The default keep-alive time for pool threads.
397     *
398     * Normally, this value is unused because all pool threads will be
399     * core threads, but if a user creates a pool with a corePoolSize
400     * of zero (against our advice), we keep a thread alive as long as
401     * there are queued tasks. If the keep alive time is zero (the
402     * historic value), we end up hot-spinning in getTask, wasting a
403     * CPU. But on the other hand, if we set the value too high, and
404     * users create a one-shot pool which they don't cleanly shutdown,
405     * the pool's non-daemon threads will prevent JVM termination. A
406     * small but non-zero value (relative to a JVM's lifetime) seems
407     * best.
408     */
409     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
410    
411     /**
412 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
413     * given core pool size.
414 jsr166 1.21 *
415 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
416     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
417     * @throws IllegalArgumentException if {@code corePoolSize < 0}
418 dl 1.1 */
419     public ScheduledThreadPoolExecutor(int corePoolSize) {
420 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
421     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
422 dl 1.1 new DelayedWorkQueue());
423     }
424    
425     /**
426 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
427     * given initial parameters.
428 jsr166 1.21 *
429 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
430     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431 dl 1.1 * @param threadFactory the factory to use when the executor
432 jsr166 1.39 * creates a new thread
433     * @throws IllegalArgumentException if {@code corePoolSize < 0}
434     * @throws NullPointerException if {@code threadFactory} is null
435 dl 1.1 */
436     public ScheduledThreadPoolExecutor(int corePoolSize,
437 jsr166 1.56 ThreadFactory threadFactory) {
438 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
439     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
440 dl 1.1 new DelayedWorkQueue(), threadFactory);
441     }
442    
443     /**
444 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
445     * given initial parameters.
446 jsr166 1.21 *
447 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
448     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
449 dl 1.1 * @param handler the handler to use when execution is blocked
450 jsr166 1.39 * because the thread bounds and queue capacities are reached
451     * @throws IllegalArgumentException if {@code corePoolSize < 0}
452     * @throws NullPointerException if {@code handler} is null
453 dl 1.1 */
454     public ScheduledThreadPoolExecutor(int corePoolSize,
455 jsr166 1.56 RejectedExecutionHandler handler) {
456 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
457     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
458 dl 1.1 new DelayedWorkQueue(), handler);
459     }
460    
461     /**
462 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
463     * given initial parameters.
464 jsr166 1.21 *
465 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
466     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
467 dl 1.1 * @param threadFactory the factory to use when the executor
468 jsr166 1.39 * creates a new thread
469 dl 1.1 * @param handler the handler to use when execution is blocked
470 jsr166 1.39 * because the thread bounds and queue capacities are reached
471     * @throws IllegalArgumentException if {@code corePoolSize < 0}
472     * @throws NullPointerException if {@code threadFactory} or
473     * {@code handler} is null
474 dl 1.1 */
475     public ScheduledThreadPoolExecutor(int corePoolSize,
476 jsr166 1.56 ThreadFactory threadFactory,
477     RejectedExecutionHandler handler) {
478 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
479     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
480 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
481     }
482    
483 dl 1.37 /**
484 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
485 jsr166 1.54 */
486     private long triggerTime(long delay, TimeUnit unit) {
487     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
488     }
489    
490     /**
491 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
492 dl 1.50 */
493 jsr166 1.54 long triggerTime(long delay) {
494 jsr166 1.98 return System.nanoTime() +
495 jsr166 1.54 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
496     }
497    
498     /**
499     * Constrains the values of all delays in the queue to be within
500     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
501     * This may occur if a task is eligible to be dequeued, but has
502     * not yet been, while some other task is added with a delay of
503     * Long.MAX_VALUE.
504     */
505     private long overflowFree(long delay) {
506     Delayed head = (Delayed) super.getQueue().peek();
507     if (head != null) {
508 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
509 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
510     delay = Long.MAX_VALUE + headDelay;
511     }
512     return delay;
513 dl 1.50 }
514    
515     /**
516 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
517     * @throws NullPointerException {@inheritDoc}
518     */
519 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
520     long delay,
521 dl 1.13 TimeUnit unit) {
522 dl 1.9 if (command == null || unit == null)
523 dl 1.1 throw new NullPointerException();
524 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
525 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
526 jsr166 1.89 triggerTime(delay, unit),
527     sequencer.getAndIncrement()));
528 dl 1.1 delayedExecute(t);
529     return t;
530     }
531 jsr166 1.52
532 dl 1.37 /**
533     * @throws RejectedExecutionException {@inheritDoc}
534     * @throws NullPointerException {@inheritDoc}
535     */
536 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
537     long delay,
538 dl 1.13 TimeUnit unit) {
539 dl 1.9 if (callable == null || unit == null)
540 dl 1.1 throw new NullPointerException();
541 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
542 jsr166 1.54 new ScheduledFutureTask<V>(callable,
543 jsr166 1.89 triggerTime(delay, unit),
544     sequencer.getAndIncrement()));
545 dl 1.1 delayedExecute(t);
546     return t;
547     }
548    
549 dl 1.37 /**
550     * @throws RejectedExecutionException {@inheritDoc}
551     * @throws NullPointerException {@inheritDoc}
552     * @throws IllegalArgumentException {@inheritDoc}
553     */
554 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
555     long initialDelay,
556     long period,
557 dl 1.13 TimeUnit unit) {
558 dl 1.9 if (command == null || unit == null)
559 dl 1.1 throw new NullPointerException();
560 jsr166 1.96 if (period <= 0L)
561 dl 1.1 throw new IllegalArgumentException();
562 jsr166 1.48 ScheduledFutureTask<Void> sft =
563     new ScheduledFutureTask<Void>(command,
564     null,
565 jsr166 1.54 triggerTime(initialDelay, unit),
566 jsr166 1.89 unit.toNanos(period),
567     sequencer.getAndIncrement());
568 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
569 jsr166 1.48 sft.outerTask = t;
570 dl 1.1 delayedExecute(t);
571     return t;
572     }
573 jsr166 1.21
574 dl 1.37 /**
575     * @throws RejectedExecutionException {@inheritDoc}
576     * @throws NullPointerException {@inheritDoc}
577     * @throws IllegalArgumentException {@inheritDoc}
578     */
579 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
580     long initialDelay,
581     long delay,
582 dl 1.13 TimeUnit unit) {
583 dl 1.9 if (command == null || unit == null)
584 dl 1.1 throw new NullPointerException();
585 jsr166 1.96 if (delay <= 0L)
586 dl 1.1 throw new IllegalArgumentException();
587 jsr166 1.48 ScheduledFutureTask<Void> sft =
588     new ScheduledFutureTask<Void>(command,
589     null,
590 jsr166 1.54 triggerTime(initialDelay, unit),
591 jsr166 1.97 -unit.toNanos(delay),
592 jsr166 1.89 sequencer.getAndIncrement());
593 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
594 jsr166 1.48 sft.outerTask = t;
595 dl 1.1 delayedExecute(t);
596     return t;
597     }
598 jsr166 1.21
599 dl 1.1 /**
600 jsr166 1.39 * Executes {@code command} with zero required delay.
601     * This has effect equivalent to
602     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
603     * Note that inspections of the queue and of the list returned by
604     * {@code shutdownNow} will access the zero-delayed
605     * {@link ScheduledFuture}, not the {@code command} itself.
606     *
607     * <p>A consequence of the use of {@code ScheduledFuture} objects is
608     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
609     * called with a null second {@code Throwable} argument, even if the
610     * {@code command} terminated abruptly. Instead, the {@code Throwable}
611     * thrown by such a task can be obtained via {@link Future#get}.
612 dl 1.1 *
613     * @throws RejectedExecutionException at discretion of
614 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
615     * cannot be accepted for execution because the
616     * executor has been shut down
617     * @throws NullPointerException {@inheritDoc}
618 dl 1.1 */
619     public void execute(Runnable command) {
620 jsr166 1.62 schedule(command, 0, NANOSECONDS);
621 dl 1.1 }
622    
623 dl 1.13 // Override AbstractExecutorService methods
624    
625 dl 1.37 /**
626     * @throws RejectedExecutionException {@inheritDoc}
627     * @throws NullPointerException {@inheritDoc}
628     */
629 dl 1.7 public Future<?> submit(Runnable task) {
630 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
631 dl 1.7 }
632    
633 dl 1.37 /**
634     * @throws RejectedExecutionException {@inheritDoc}
635     * @throws NullPointerException {@inheritDoc}
636     */
637 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
638 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
639 dl 1.7 }
640    
641 dl 1.37 /**
642     * @throws RejectedExecutionException {@inheritDoc}
643     * @throws NullPointerException {@inheritDoc}
644     */
645 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
646 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
647 dl 1.7 }
648 dl 1.1
649     /**
650 dl 1.37 * Sets the policy on whether to continue executing existing
651 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
652     * In this case, these tasks will only terminate upon
653     * {@code shutdownNow} or after setting the policy to
654     * {@code false} when already shutdown.
655     * This value is by default {@code false}.
656 jsr166 1.30 *
657 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
658 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
659 dl 1.1 */
660     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
661     continueExistingPeriodicTasksAfterShutdown = value;
662 jsr166 1.39 if (!value && isShutdown())
663 dl 1.37 onShutdown();
664 dl 1.1 }
665    
666     /**
667 jsr166 1.21 * Gets the policy on whether to continue executing existing
668 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
669     * In this case, these tasks will only terminate upon
670     * {@code shutdownNow} or after setting the policy to
671     * {@code false} when already shutdown.
672     * This value is by default {@code false}.
673 jsr166 1.30 *
674 jsr166 1.39 * @return {@code true} if will continue after shutdown
675 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
676 dl 1.1 */
677     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
678     return continueExistingPeriodicTasksAfterShutdown;
679     }
680    
681     /**
682 jsr166 1.21 * Sets the policy on whether to execute existing delayed
683 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
684     * In this case, these tasks will only terminate upon
685     * {@code shutdownNow}, or after setting the policy to
686     * {@code false} when already shutdown.
687     * This value is by default {@code true}.
688 jsr166 1.30 *
689 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
690 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
691 dl 1.1 */
692     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
693     executeExistingDelayedTasksAfterShutdown = value;
694 jsr166 1.39 if (!value && isShutdown())
695 dl 1.37 onShutdown();
696 dl 1.1 }
697    
698     /**
699 jsr166 1.21 * Gets the policy on whether to execute existing delayed
700 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
701     * In this case, these tasks will only terminate upon
702     * {@code shutdownNow}, or after setting the policy to
703     * {@code false} when already shutdown.
704     * This value is by default {@code true}.
705 jsr166 1.30 *
706 jsr166 1.39 * @return {@code true} if will execute after shutdown
707 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
708 dl 1.1 */
709     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
710     return executeExistingDelayedTasksAfterShutdown;
711     }
712    
713     /**
714 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
715     * removed from the work queue at time of cancellation. This value is
716     * by default {@code false}.
717 dl 1.41 *
718 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
719 dl 1.41 * @see #getRemoveOnCancelPolicy
720 jsr166 1.43 * @since 1.7
721 dl 1.41 */
722     public void setRemoveOnCancelPolicy(boolean value) {
723     removeOnCancel = value;
724     }
725    
726     /**
727 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
728     * removed from the work queue at time of cancellation. This value is
729     * by default {@code false}.
730 dl 1.41 *
731 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
732     * from the queue
733 dl 1.41 * @see #setRemoveOnCancelPolicy
734 jsr166 1.43 * @since 1.7
735 dl 1.41 */
736     public boolean getRemoveOnCancelPolicy() {
737     return removeOnCancel;
738     }
739    
740     /**
741 dl 1.1 * Initiates an orderly shutdown in which previously submitted
742 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
743     * Invocation has no additional effect if already shut down.
744     *
745     * <p>This method does not wait for previously submitted tasks to
746     * complete execution. Use {@link #awaitTermination awaitTermination}
747     * to do that.
748     *
749     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
750     * has been set {@code false}, existing delayed tasks whose delays
751     * have not yet elapsed are cancelled. And unless the {@code
752     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
753     * {@code true}, future executions of existing periodic tasks will
754     * be cancelled.
755 jsr166 1.39 *
756     * @throws SecurityException {@inheritDoc}
757 dl 1.1 */
758     public void shutdown() {
759     super.shutdown();
760     }
761    
762     /**
763     * Attempts to stop all actively executing tasks, halts the
764 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
765 jsr166 1.92 * that were awaiting execution. These tasks are drained (removed)
766     * from the task queue upon return from this method.
767 jsr166 1.21 *
768 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
769     * terminate. Use {@link #awaitTermination awaitTermination} to
770     * do that.
771     *
772 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
773 dl 1.18 * processing actively executing tasks. This implementation
774 jsr166 1.93 * interrupts tasks via {@link Thread#interrupt}; any task that
775 jsr166 1.31 * fails to respond to interrupts may never terminate.
776 dl 1.1 *
777 jsr166 1.39 * @return list of tasks that never commenced execution.
778 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
779     * For tasks submitted via one of the {@code schedule}
780     * methods, the element will be identical to the returned
781     * {@code ScheduledFuture}. For tasks submitted using
782 jsr166 1.84 * {@link #execute execute}, the element will be a
783     * zero-delay {@code ScheduledFuture}.
784 jsr166 1.31 * @throws SecurityException {@inheritDoc}
785 dl 1.1 */
786 tim 1.4 public List<Runnable> shutdownNow() {
787 dl 1.1 return super.shutdownNow();
788     }
789    
790     /**
791 jsr166 1.95 * Returns the task queue used by this executor. Access to the
792     * task queue is intended primarily for debugging and monitoring.
793     * This queue may be in active use. Retrieving the task queue
794     * does not prevent queued tasks from executing.
795     *
796     * <p>Each element of this queue is a {@link ScheduledFuture}.
797 jsr166 1.77 * For tasks submitted via one of the {@code schedule} methods, the
798     * element will be identical to the returned {@code ScheduledFuture}.
799 jsr166 1.84 * For tasks submitted using {@link #execute execute}, the element
800     * will be a zero-delay {@code ScheduledFuture}.
801 jsr166 1.77 *
802     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
803     * tasks in the order in which they will execute.
804 dl 1.1 *
805     * @return the task queue
806     */
807     public BlockingQueue<Runnable> getQueue() {
808     return super.getQueue();
809     }
810    
811 dl 1.13 /**
812 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
813     * class must be declared as a BlockingQueue<Runnable> even though
814 jsr166 1.42 * it can only hold RunnableScheduledFutures.
815 jsr166 1.21 */
816 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
817 dl 1.13 implements BlockingQueue<Runnable> {
818 jsr166 1.21
819 dl 1.40 /*
820     * A DelayedWorkQueue is based on a heap-based data structure
821     * like those in DelayQueue and PriorityQueue, except that
822     * every ScheduledFutureTask also records its index into the
823     * heap array. This eliminates the need to find a task upon
824     * cancellation, greatly speeding up removal (down from O(n)
825     * to O(log n)), and reducing garbage retention that would
826     * otherwise occur by waiting for the element to rise to top
827     * before clearing. But because the queue may also hold
828     * RunnableScheduledFutures that are not ScheduledFutureTasks,
829     * we are not guaranteed to have such indices available, in
830     * which case we fall back to linear search. (We expect that
831     * most tasks will not be decorated, and that the faster cases
832     * will be much more common.)
833     *
834     * All heap operations must record index changes -- mainly
835     * within siftUp and siftDown. Upon removal, a task's
836     * heapIndex is set to -1. Note that ScheduledFutureTasks can
837     * appear at most once in the queue (this need not be true for
838     * other kinds of tasks or work queues), so are uniquely
839     * identified by heapIndex.
840     */
841    
842 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
843 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
844     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
845 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
846 jsr166 1.80 private int size;
847 dl 1.40
848 jsr166 1.48 /**
849     * Thread designated to wait for the task at the head of the
850     * queue. This variant of the Leader-Follower pattern
851     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
852     * minimize unnecessary timed waiting. When a thread becomes
853     * the leader, it waits only for the next delay to elapse, but
854     * other threads await indefinitely. The leader thread must
855     * signal some other thread before returning from take() or
856     * poll(...), unless some other thread becomes leader in the
857     * interim. Whenever the head of the queue is replaced with a
858     * task with an earlier expiration time, the leader field is
859     * invalidated by being reset to null, and some waiting
860     * thread, but not necessarily the current leader, is
861     * signalled. So waiting threads must be prepared to acquire
862     * and lose leadership while waiting.
863     */
864 jsr166 1.80 private Thread leader;
865 jsr166 1.48
866     /**
867     * Condition signalled when a newer task becomes available at the
868     * head of the queue or a new thread may need to become leader.
869     */
870     private final Condition available = lock.newCondition();
871 dl 1.40
872     /**
873 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
874 dl 1.40 */
875 jsr166 1.61 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
876 dl 1.40 if (f instanceof ScheduledFutureTask)
877     ((ScheduledFutureTask)f).heapIndex = idx;
878     }
879    
880     /**
881 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
882 dl 1.40 * Call only when holding lock.
883     */
884 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
885 dl 1.40 while (k > 0) {
886     int parent = (k - 1) >>> 1;
887 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
888 dl 1.40 if (key.compareTo(e) >= 0)
889     break;
890     queue[k] = e;
891     setIndex(e, k);
892     k = parent;
893     }
894     queue[k] = key;
895     setIndex(key, k);
896     }
897    
898     /**
899 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
900 dl 1.40 * Call only when holding lock.
901     */
902 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
903 jsr166 1.42 int half = size >>> 1;
904 dl 1.40 while (k < half) {
905 jsr166 1.42 int child = (k << 1) + 1;
906 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
907 dl 1.40 int right = child + 1;
908     if (right < size && c.compareTo(queue[right]) > 0)
909     c = queue[child = right];
910     if (key.compareTo(c) <= 0)
911     break;
912     queue[k] = c;
913     setIndex(c, k);
914     k = child;
915     }
916     queue[k] = key;
917     setIndex(key, k);
918     }
919    
920     /**
921 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
922 dl 1.40 */
923     private void grow() {
924     int oldCapacity = queue.length;
925     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
926     if (newCapacity < 0) // overflow
927     newCapacity = Integer.MAX_VALUE;
928     queue = Arrays.copyOf(queue, newCapacity);
929     }
930    
931     /**
932 jsr166 1.66 * Finds index of given object, or -1 if absent.
933 dl 1.40 */
934     private int indexOf(Object x) {
935     if (x != null) {
936 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
937     int i = ((ScheduledFutureTask) x).heapIndex;
938     // Sanity check; x could conceivably be a
939     // ScheduledFutureTask from some other pool.
940     if (i >= 0 && i < size && queue[i] == x)
941     return i;
942     } else {
943     for (int i = 0; i < size; i++)
944     if (x.equals(queue[i]))
945     return i;
946     }
947 dl 1.40 }
948     return -1;
949     }
950    
951 jsr166 1.48 public boolean contains(Object x) {
952     final ReentrantLock lock = this.lock;
953 jsr166 1.45 lock.lock();
954     try {
955 jsr166 1.48 return indexOf(x) != -1;
956 jsr166 1.45 } finally {
957     lock.unlock();
958     }
959 jsr166 1.48 }
960 jsr166 1.45
961 dl 1.40 public boolean remove(Object x) {
962     final ReentrantLock lock = this.lock;
963     lock.lock();
964     try {
965 jsr166 1.45 int i = indexOf(x);
966 jsr166 1.48 if (i < 0)
967     return false;
968 jsr166 1.45
969 jsr166 1.48 setIndex(queue[i], -1);
970     int s = --size;
971 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
972 jsr166 1.48 queue[s] = null;
973     if (s != i) {
974     siftDown(i, replacement);
975     if (queue[i] == replacement)
976     siftUp(i, replacement);
977     }
978     return true;
979 dl 1.40 } finally {
980     lock.unlock();
981     }
982     }
983    
984     public int size() {
985     final ReentrantLock lock = this.lock;
986     lock.lock();
987     try {
988 jsr166 1.45 return size;
989 dl 1.40 } finally {
990     lock.unlock();
991     }
992     }
993    
994 jsr166 1.42 public boolean isEmpty() {
995     return size() == 0;
996 dl 1.40 }
997    
998     public int remainingCapacity() {
999     return Integer.MAX_VALUE;
1000     }
1001    
1002 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
1003 dl 1.40 final ReentrantLock lock = this.lock;
1004     lock.lock();
1005     try {
1006     return queue[0];
1007     } finally {
1008     lock.unlock();
1009     }
1010 dl 1.13 }
1011    
1012 dl 1.40 public boolean offer(Runnable x) {
1013     if (x == null)
1014     throw new NullPointerException();
1015 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1016 dl 1.40 final ReentrantLock lock = this.lock;
1017     lock.lock();
1018     try {
1019     int i = size;
1020     if (i >= queue.length)
1021     grow();
1022     size = i + 1;
1023     if (i == 0) {
1024     queue[0] = e;
1025     setIndex(e, 0);
1026 jsr166 1.45 } else {
1027 dl 1.40 siftUp(i, e);
1028     }
1029 jsr166 1.46 if (queue[0] == e) {
1030 jsr166 1.48 leader = null;
1031 jsr166 1.46 available.signal();
1032 jsr166 1.48 }
1033 dl 1.40 } finally {
1034     lock.unlock();
1035     }
1036     return true;
1037 jsr166 1.48 }
1038 dl 1.40
1039     public void put(Runnable e) {
1040     offer(e);
1041     }
1042    
1043     public boolean add(Runnable e) {
1044 jsr166 1.48 return offer(e);
1045     }
1046 dl 1.40
1047     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1048     return offer(e);
1049     }
1050 jsr166 1.42
1051 jsr166 1.46 /**
1052     * Performs common bookkeeping for poll and take: Replaces
1053 jsr166 1.47 * first element with last and sifts it down. Call only when
1054     * holding lock.
1055 jsr166 1.46 * @param f the task to remove and return
1056     */
1057 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1058 jsr166 1.46 int s = --size;
1059 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1060 jsr166 1.46 queue[s] = null;
1061     if (s != 0)
1062     siftDown(0, x);
1063     setIndex(f, -1);
1064     return f;
1065     }
1066    
1067 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1068 dl 1.40 final ReentrantLock lock = this.lock;
1069     lock.lock();
1070     try {
1071 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1072 jsr166 1.87 return (first == null || first.getDelay(NANOSECONDS) > 0)
1073     ? null
1074     : finishPoll(first);
1075 dl 1.40 } finally {
1076     lock.unlock();
1077     }
1078     }
1079    
1080 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1081 dl 1.40 final ReentrantLock lock = this.lock;
1082     lock.lockInterruptibly();
1083     try {
1084     for (;;) {
1085 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1086 jsr166 1.42 if (first == null)
1087 dl 1.40 available.await();
1088     else {
1089 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1090 jsr166 1.96 if (delay <= 0L)
1091 jsr166 1.48 return finishPoll(first);
1092 jsr166 1.71 first = null; // don't retain ref while waiting
1093     if (leader != null)
1094 jsr166 1.48 available.await();
1095     else {
1096     Thread thisThread = Thread.currentThread();
1097     leader = thisThread;
1098     try {
1099     available.awaitNanos(delay);
1100     } finally {
1101     if (leader == thisThread)
1102     leader = null;
1103     }
1104     }
1105 dl 1.40 }
1106     }
1107     } finally {
1108 jsr166 1.48 if (leader == null && queue[0] != null)
1109     available.signal();
1110 dl 1.40 lock.unlock();
1111     }
1112     }
1113    
1114 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1115 dl 1.40 throws InterruptedException {
1116     long nanos = unit.toNanos(timeout);
1117     final ReentrantLock lock = this.lock;
1118     lock.lockInterruptibly();
1119     try {
1120     for (;;) {
1121 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1122 dl 1.40 if (first == null) {
1123 jsr166 1.96 if (nanos <= 0L)
1124 dl 1.40 return null;
1125     else
1126     nanos = available.awaitNanos(nanos);
1127     } else {
1128 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1129 jsr166 1.96 if (delay <= 0L)
1130 dl 1.40 return finishPoll(first);
1131 jsr166 1.96 if (nanos <= 0L)
1132 jsr166 1.48 return null;
1133 jsr166 1.71 first = null; // don't retain ref while waiting
1134 jsr166 1.48 if (nanos < delay || leader != null)
1135     nanos = available.awaitNanos(nanos);
1136     else {
1137     Thread thisThread = Thread.currentThread();
1138     leader = thisThread;
1139     try {
1140     long timeLeft = available.awaitNanos(delay);
1141     nanos -= delay - timeLeft;
1142     } finally {
1143     if (leader == thisThread)
1144     leader = null;
1145     }
1146     }
1147     }
1148     }
1149 dl 1.40 } finally {
1150 jsr166 1.48 if (leader == null && queue[0] != null)
1151     available.signal();
1152 dl 1.40 lock.unlock();
1153     }
1154     }
1155    
1156     public void clear() {
1157     final ReentrantLock lock = this.lock;
1158     lock.lock();
1159     try {
1160     for (int i = 0; i < size; i++) {
1161 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1162 dl 1.40 if (t != null) {
1163     queue[i] = null;
1164     setIndex(t, -1);
1165     }
1166     }
1167     size = 0;
1168     } finally {
1169     lock.unlock();
1170     }
1171 dl 1.13 }
1172 dl 1.40
1173     /**
1174 jsr166 1.66 * Returns first element only if it is expired.
1175 dl 1.40 * Used only by drainTo. Call only when holding lock.
1176     */
1177 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1178     // assert lock.isHeldByCurrentThread();
1179 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1180 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1181     null : first;
1182 dl 1.40 }
1183    
1184     public int drainTo(Collection<? super Runnable> c) {
1185     if (c == null)
1186     throw new NullPointerException();
1187     if (c == this)
1188     throw new IllegalArgumentException();
1189     final ReentrantLock lock = this.lock;
1190     lock.lock();
1191     try {
1192 jsr166 1.61 RunnableScheduledFuture<?> first;
1193 dl 1.40 int n = 0;
1194 jsr166 1.62 while ((first = peekExpired()) != null) {
1195     c.add(first); // In this order, in case add() throws.
1196     finishPoll(first);
1197 jsr166 1.48 ++n;
1198     }
1199 dl 1.40 return n;
1200     } finally {
1201     lock.unlock();
1202     }
1203 dl 1.13 }
1204    
1205 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1206 dl 1.40 if (c == null)
1207     throw new NullPointerException();
1208     if (c == this)
1209     throw new IllegalArgumentException();
1210     if (maxElements <= 0)
1211     return 0;
1212     final ReentrantLock lock = this.lock;
1213     lock.lock();
1214     try {
1215 jsr166 1.61 RunnableScheduledFuture<?> first;
1216 dl 1.40 int n = 0;
1217 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1218     c.add(first); // In this order, in case add() throws.
1219     finishPoll(first);
1220 jsr166 1.48 ++n;
1221     }
1222 dl 1.40 return n;
1223     } finally {
1224     lock.unlock();
1225     }
1226     }
1227    
1228     public Object[] toArray() {
1229     final ReentrantLock lock = this.lock;
1230     lock.lock();
1231     try {
1232 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1233 dl 1.40 } finally {
1234     lock.unlock();
1235     }
1236     }
1237    
1238 jsr166 1.48 @SuppressWarnings("unchecked")
1239 dl 1.40 public <T> T[] toArray(T[] a) {
1240     final ReentrantLock lock = this.lock;
1241     lock.lock();
1242     try {
1243     if (a.length < size)
1244     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1245     System.arraycopy(queue, 0, a, 0, size);
1246     if (a.length > size)
1247     a[size] = null;
1248     return a;
1249     } finally {
1250     lock.unlock();
1251     }
1252 dl 1.13 }
1253    
1254 jsr166 1.21 public Iterator<Runnable> iterator() {
1255 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1256 dl 1.40 }
1257 jsr166 1.42
1258 dl 1.40 /**
1259     * Snapshot iterator that works off copy of underlying q array.
1260     */
1261     private class Itr implements Iterator<Runnable> {
1262 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1263 jsr166 1.86 int cursor; // index of next element to return; initially 0
1264     int lastRet = -1; // index of last element returned; -1 if no such
1265 jsr166 1.42
1266 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1267 dl 1.40 this.array = array;
1268     }
1269 jsr166 1.42
1270 dl 1.40 public boolean hasNext() {
1271     return cursor < array.length;
1272     }
1273 jsr166 1.42
1274 dl 1.40 public Runnable next() {
1275     if (cursor >= array.length)
1276     throw new NoSuchElementException();
1277     lastRet = cursor;
1278 jsr166 1.45 return array[cursor++];
1279 dl 1.40 }
1280 jsr166 1.42
1281 dl 1.40 public void remove() {
1282     if (lastRet < 0)
1283     throw new IllegalStateException();
1284     DelayedWorkQueue.this.remove(array[lastRet]);
1285     lastRet = -1;
1286     }
1287 dl 1.13 }
1288     }
1289 dl 1.1 }