ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.103
Committed: Wed Mar 22 20:19:55 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.102: +5 -0 lines
Log Message:
clarify default rejected execution handler and thread factory

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.11 * Expert Group and released to the public domain, as explained at
4 jsr166 1.58 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8 jsr166 1.82
9 jsr166 1.83 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 jsr166 1.62 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11 jsr166 1.83
12 jsr166 1.82 import java.util.AbstractQueue;
13     import java.util.Arrays;
14     import java.util.Collection;
15     import java.util.Iterator;
16     import java.util.List;
17     import java.util.NoSuchElementException;
18 jsr166 1.83 import java.util.concurrent.atomic.AtomicLong;
19     import java.util.concurrent.locks.Condition;
20     import java.util.concurrent.locks.ReentrantLock;
21 dl 1.1
22     /**
23 dl 1.7 * A {@link ThreadPoolExecutor} that can additionally schedule
24 jsr166 1.75 * commands to run after a given delay, or to execute periodically.
25     * This class is preferable to {@link java.util.Timer} when multiple
26     * worker threads are needed, or when the additional flexibility or
27     * capabilities of {@link ThreadPoolExecutor} (which this class
28     * extends) are required.
29 dl 1.1 *
30 jsr166 1.46 * <p>Delayed tasks execute no sooner than they are enabled, but
31 dl 1.18 * without any real-time guarantees about when, after they are
32     * enabled, they will commence. Tasks scheduled for exactly the same
33     * execution time are enabled in first-in-first-out (FIFO) order of
34 jsr166 1.46 * submission.
35     *
36     * <p>When a submitted task is cancelled before it is run, execution
37 jsr166 1.75 * is suppressed. By default, such a cancelled task is not
38     * automatically removed from the work queue until its delay elapses.
39     * While this enables further inspection and monitoring, it may also
40     * cause unbounded retention of cancelled tasks. To avoid this, use
41     * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42     * removed from the work queue at time of cancellation.
43 dl 1.1 *
44 jsr166 1.75 * <p>Successive executions of a periodic task scheduled via
45 jsr166 1.84 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46     * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47     * do not overlap. While different executions may be performed by
48     * different threads, the effects of prior executions
49     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 dl 1.51 * those of subsequent ones.
51     *
52 dl 1.1 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 dl 1.8 * of the inherited tuning methods are not useful for it. In
54 jsr166 1.53 * particular, because it acts as a fixed-sized pool using
55     * {@code corePoolSize} threads and an unbounded queue, adjustments
56     * to {@code maximumPoolSize} have no useful effect. Additionally, it
57     * is almost never a good idea to set {@code corePoolSize} to zero or
58     * use {@code allowCoreThreadTimeOut} because this may leave the pool
59     * without threads to handle tasks once they become eligible to run.
60 dl 1.1 *
61 jsr166 1.103 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
62     * this class uses {@link Executors#defaultThreadFactory} as the
63     * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
64     * as the default rejected execution handler.
65     *
66 jsr166 1.39 * <p><b>Extension notes:</b> This class overrides the
67 jsr166 1.69 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
68 jsr166 1.39 * {@link AbstractExecutorService#submit(Runnable) submit}
69     * methods to generate internal {@link ScheduledFuture} objects to
70     * control per-task delays and scheduling. To preserve
71     * functionality, any further overrides of these methods in
72 dl 1.32 * subclasses must invoke superclass versions, which effectively
73 jsr166 1.39 * disables additional task customization. However, this class
74 dl 1.32 * provides alternative protected extension method
75 jsr166 1.39 * {@code decorateTask} (one version each for {@code Runnable} and
76     * {@code Callable}) that can be used to customize the concrete task
77     * types used to execute commands entered via {@code execute},
78     * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
79     * and {@code scheduleWithFixedDelay}. By default, a
80     * {@code ScheduledThreadPoolExecutor} uses a task type extending
81 dl 1.32 * {@link FutureTask}. However, this may be modified or replaced using
82     * subclasses of the form:
83     *
84 jsr166 1.85 * <pre> {@code
85 dl 1.23 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
86     *
87 jsr166 1.39 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
88 dl 1.23 *
89 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
90     * Runnable r, RunnableScheduledFuture<V> task) {
91     * return new CustomTask<V>(r, task);
92 jsr166 1.29 * }
93 dl 1.23 *
94 jsr166 1.39 * protected <V> RunnableScheduledFuture<V> decorateTask(
95     * Callable<V> c, RunnableScheduledFuture<V> task) {
96     * return new CustomTask<V>(c, task);
97 jsr166 1.29 * }
98     * // ... add constructors, etc.
99 jsr166 1.39 * }}</pre>
100     *
101 dl 1.1 * @since 1.5
102     * @author Doug Lea
103     */
104 jsr166 1.21 public class ScheduledThreadPoolExecutor
105     extends ThreadPoolExecutor
106 tim 1.3 implements ScheduledExecutorService {
107 dl 1.1
108 dl 1.37 /*
109     * This class specializes ThreadPoolExecutor implementation by
110     *
111 jsr166 1.99 * 1. Using a custom task type ScheduledFutureTask, even for tasks
112     * that don't require scheduling because they are submitted
113     * using ExecutorService rather than ScheduledExecutorService
114     * methods, which are treated as tasks with a delay of zero.
115 dl 1.37 *
116 jsr166 1.46 * 2. Using a custom queue (DelayedWorkQueue), a variant of
117 dl 1.37 * unbounded DelayQueue. The lack of capacity constraint and
118     * the fact that corePoolSize and maximumPoolSize are
119     * effectively identical simplifies some execution mechanics
120 jsr166 1.46 * (see delayedExecute) compared to ThreadPoolExecutor.
121 dl 1.37 *
122     * 3. Supporting optional run-after-shutdown parameters, which
123     * leads to overrides of shutdown methods to remove and cancel
124     * tasks that should NOT be run after shutdown, as well as
125     * different recheck logic when task (re)submission overlaps
126     * with a shutdown.
127     *
128     * 4. Task decoration methods to allow interception and
129     * instrumentation, which are needed because subclasses cannot
130     * otherwise override submit methods to get this effect. These
131     * don't have any impact on pool control logic though.
132     */
133    
134 dl 1.1 /**
135     * False if should cancel/suppress periodic tasks on shutdown.
136     */
137     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
138    
139     /**
140     * False if should cancel non-periodic tasks on shutdown.
141     */
142     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
143    
144     /**
145 jsr166 1.75 * True if ScheduledFutureTask.cancel should remove from queue.
146 dl 1.41 */
147 jsr166 1.90 volatile boolean removeOnCancel;
148 dl 1.41
149     /**
150 dl 1.1 * Sequence number to break scheduling ties, and in turn to
151     * guarantee FIFO order among tied entries.
152     */
153 jsr166 1.60 private static final AtomicLong sequencer = new AtomicLong();
154 dl 1.14
155 jsr166 1.21 private class ScheduledFutureTask<V>
156 peierls 1.22 extends FutureTask<V> implements RunnableScheduledFuture<V> {
157 jsr166 1.21
158 dl 1.1 /** Sequence number to break ties FIFO */
159     private final long sequenceNumber;
160 jsr166 1.44
161 jsr166 1.99 /** The nanoTime-based time when the task is enabled to execute. */
162 dl 1.88 private volatile long time;
163 jsr166 1.44
164 dl 1.16 /**
165 jsr166 1.99 * Period for repeating tasks, in nanoseconds.
166 jsr166 1.75 * A positive value indicates fixed-rate execution.
167     * A negative value indicates fixed-delay execution.
168     * A value of 0 indicates a non-repeating (one-shot) task.
169 dl 1.16 */
170 dl 1.1 private final long period;
171    
172 jsr166 1.48 /** The actual task to be re-enqueued by reExecutePeriodic */
173     RunnableScheduledFuture<V> outerTask = this;
174 jsr166 1.44
175 dl 1.1 /**
176 dl 1.40 * Index into delay queue, to support faster cancellation.
177     */
178     int heapIndex;
179    
180     /**
181 jsr166 1.30 * Creates a one-shot action with given nanoTime-based trigger time.
182 dl 1.1 */
183 jsr166 1.89 ScheduledFutureTask(Runnable r, V result, long triggerTime,
184     long sequenceNumber) {
185 dl 1.1 super(r, result);
186 jsr166 1.75 this.time = triggerTime;
187 dl 1.1 this.period = 0;
188 jsr166 1.89 this.sequenceNumber = sequenceNumber;
189 dl 1.1 }
190    
191     /**
192 jsr166 1.75 * Creates a periodic action with given nanoTime-based initial
193     * trigger time and period.
194 dl 1.1 */
195 jsr166 1.75 ScheduledFutureTask(Runnable r, V result, long triggerTime,
196 jsr166 1.89 long period, long sequenceNumber) {
197 dl 1.1 super(r, result);
198 jsr166 1.75 this.time = triggerTime;
199 dl 1.1 this.period = period;
200 jsr166 1.89 this.sequenceNumber = sequenceNumber;
201 dl 1.1 }
202    
203     /**
204 jsr166 1.65 * Creates a one-shot action with given nanoTime-based trigger time.
205 dl 1.1 */
206 jsr166 1.89 ScheduledFutureTask(Callable<V> callable, long triggerTime,
207     long sequenceNumber) {
208 dl 1.1 super(callable);
209 jsr166 1.75 this.time = triggerTime;
210 dl 1.1 this.period = 0;
211 jsr166 1.89 this.sequenceNumber = sequenceNumber;
212 dl 1.1 }
213    
214     public long getDelay(TimeUnit unit) {
215 jsr166 1.98 return unit.convert(time - System.nanoTime(), NANOSECONDS);
216 dl 1.1 }
217    
218 dl 1.20 public int compareTo(Delayed other) {
219 dl 1.59 if (other == this) // compare zero if same object
220 dl 1.1 return 0;
221 dl 1.34 if (other instanceof ScheduledFutureTask) {
222     ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
223     long diff = time - x.time;
224     if (diff < 0)
225     return -1;
226     else if (diff > 0)
227     return 1;
228     else if (sequenceNumber < x.sequenceNumber)
229     return -1;
230     else
231     return 1;
232     }
233 jsr166 1.64 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
234 jsr166 1.61 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
235 dl 1.1 }
236    
237     /**
238 jsr166 1.70 * Returns {@code true} if this is a periodic (not a one-shot) action.
239 jsr166 1.30 *
240 jsr166 1.70 * @return {@code true} if periodic
241 dl 1.1 */
242 dl 1.23 public boolean isPeriodic() {
243 dl 1.16 return period != 0;
244 dl 1.1 }
245    
246     /**
247 jsr166 1.39 * Sets the next time to run for a periodic task.
248 dl 1.13 */
249 dl 1.37 private void setNextRunTime() {
250     long p = period;
251     if (p > 0)
252     time += p;
253     else
254 jsr166 1.54 time = triggerTime(-p);
255 dl 1.13 }
256    
257 dl 1.40 public boolean cancel(boolean mayInterruptIfRunning) {
258 jsr166 1.100 // The racy read of heapIndex below is benign:
259     // if heapIndex < 0, then OOTA guarantees that we have surely
260     // been removed; else we recheck under lock in remove()
261 dl 1.41 boolean cancelled = super.cancel(mayInterruptIfRunning);
262     if (cancelled && removeOnCancel && heapIndex >= 0)
263 jsr166 1.42 remove(this);
264 dl 1.41 return cancelled;
265 dl 1.40 }
266    
267 dl 1.13 /**
268 dl 1.5 * Overrides FutureTask version so as to reset/requeue if periodic.
269 jsr166 1.21 */
270 dl 1.1 public void run() {
271 dl 1.37 boolean periodic = isPeriodic();
272     if (!canRunInCurrentRunState(periodic))
273     cancel(false);
274     else if (!periodic)
275 jsr166 1.91 super.run();
276     else if (super.runAndReset()) {
277 dl 1.37 setNextRunTime();
278 jsr166 1.44 reExecutePeriodic(outerTask);
279 dl 1.37 }
280 dl 1.1 }
281     }
282    
283     /**
284 dl 1.37 * Returns true if can run a task given current run state
285 jsr166 1.39 * and run-after-shutdown parameters.
286     *
287 dl 1.37 * @param periodic true if this task periodic, false if delayed
288     */
289     boolean canRunInCurrentRunState(boolean periodic) {
290 jsr166 1.38 return isRunningOrShutdown(periodic ?
291 dl 1.37 continueExistingPeriodicTasksAfterShutdown :
292     executeExistingDelayedTasksAfterShutdown);
293     }
294    
295     /**
296     * Main execution method for delayed or periodic tasks. If pool
297     * is shut down, rejects the task. Otherwise adds task to queue
298     * and starts a thread, if necessary, to run it. (We cannot
299     * prestart the thread to run the task because the task (probably)
300 jsr166 1.67 * shouldn't be run yet.) If the pool is shut down while the task
301 dl 1.37 * is being added, cancel and remove it if required by state and
302 jsr166 1.39 * run-after-shutdown parameters.
303     *
304 dl 1.37 * @param task the task
305     */
306     private void delayedExecute(RunnableScheduledFuture<?> task) {
307     if (isShutdown())
308     reject(task);
309     else {
310     super.getQueue().add(task);
311     if (isShutdown() &&
312     !canRunInCurrentRunState(task.isPeriodic()) &&
313     remove(task))
314     task.cancel(false);
315 jsr166 1.48 else
316 dl 1.63 ensurePrestart();
317 dl 1.37 }
318     }
319 jsr166 1.21
320 dl 1.37 /**
321 jsr166 1.39 * Requeues a periodic task unless current run state precludes it.
322     * Same idea as delayedExecute except drops task rather than rejecting.
323     *
324 dl 1.37 * @param task the task
325     */
326     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
327     if (canRunInCurrentRunState(true)) {
328     super.getQueue().add(task);
329     if (!canRunInCurrentRunState(true) && remove(task))
330     task.cancel(false);
331 jsr166 1.48 else
332 dl 1.63 ensurePrestart();
333 dl 1.37 }
334 dl 1.13 }
335 dl 1.1
336 dl 1.13 /**
337 jsr166 1.21 * Cancels and clears the queue of all tasks that should not be run
338 jsr166 1.39 * due to shutdown policy. Invoked within super.shutdown.
339 dl 1.13 */
340 dl 1.37 @Override void onShutdown() {
341     BlockingQueue<Runnable> q = super.getQueue();
342     boolean keepDelayed =
343     getExecuteExistingDelayedTasksAfterShutdownPolicy();
344     boolean keepPeriodic =
345     getContinueExistingPeriodicTasksAfterShutdownPolicy();
346 jsr166 1.57 if (!keepDelayed && !keepPeriodic) {
347     for (Object e : q.toArray())
348     if (e instanceof RunnableScheduledFuture<?>)
349     ((RunnableScheduledFuture<?>) e).cancel(false);
350 dl 1.37 q.clear();
351 jsr166 1.57 }
352 dl 1.37 else {
353     // Traverse snapshot to avoid iterator exceptions
354 jsr166 1.39 for (Object e : q.toArray()) {
355 dl 1.23 if (e instanceof RunnableScheduledFuture) {
356 dl 1.37 RunnableScheduledFuture<?> t =
357     (RunnableScheduledFuture<?>)e;
358 dl 1.41 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
359     t.isCancelled()) { // also remove if already cancelled
360     if (q.remove(t))
361     t.cancel(false);
362     }
363 dl 1.13 }
364     }
365 dl 1.1 }
366 jsr166 1.48 tryTerminate();
367 dl 1.1 }
368    
369 dl 1.23 /**
370 jsr166 1.30 * Modifies or replaces the task used to execute a runnable.
371 jsr166 1.28 * This method can be used to override the concrete
372 dl 1.23 * class used for managing internal tasks.
373 jsr166 1.30 * The default implementation simply returns the given task.
374 jsr166 1.28 *
375 dl 1.23 * @param runnable the submitted Runnable
376     * @param task the task created to execute the runnable
377 jsr166 1.72 * @param <V> the type of the task's result
378 dl 1.23 * @return a task that can execute the runnable
379     * @since 1.6
380     */
381 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
382 dl 1.23 Runnable runnable, RunnableScheduledFuture<V> task) {
383     return task;
384 peierls 1.22 }
385    
386 dl 1.23 /**
387 jsr166 1.30 * Modifies or replaces the task used to execute a callable.
388 jsr166 1.28 * This method can be used to override the concrete
389 dl 1.23 * class used for managing internal tasks.
390 jsr166 1.30 * The default implementation simply returns the given task.
391 jsr166 1.28 *
392 dl 1.23 * @param callable the submitted Callable
393     * @param task the task created to execute the callable
394 jsr166 1.72 * @param <V> the type of the task's result
395 dl 1.23 * @return a task that can execute the callable
396     * @since 1.6
397     */
398 peierls 1.22 protected <V> RunnableScheduledFuture<V> decorateTask(
399 dl 1.23 Callable<V> callable, RunnableScheduledFuture<V> task) {
400     return task;
401 dl 1.19 }
402    
403 dl 1.1 /**
404 jsr166 1.78 * The default keep-alive time for pool threads.
405     *
406     * Normally, this value is unused because all pool threads will be
407     * core threads, but if a user creates a pool with a corePoolSize
408     * of zero (against our advice), we keep a thread alive as long as
409     * there are queued tasks. If the keep alive time is zero (the
410     * historic value), we end up hot-spinning in getTask, wasting a
411     * CPU. But on the other hand, if we set the value too high, and
412     * users create a one-shot pool which they don't cleanly shutdown,
413     * the pool's non-daemon threads will prevent JVM termination. A
414     * small but non-zero value (relative to a JVM's lifetime) seems
415     * best.
416     */
417     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
418    
419     /**
420 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
421     * given core pool size.
422 jsr166 1.21 *
423 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
424     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
425     * @throws IllegalArgumentException if {@code corePoolSize < 0}
426 dl 1.1 */
427     public ScheduledThreadPoolExecutor(int corePoolSize) {
428 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
429     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
430 dl 1.1 new DelayedWorkQueue());
431     }
432    
433     /**
434 jsr166 1.39 * Creates a new {@code ScheduledThreadPoolExecutor} with the
435     * given initial parameters.
436 jsr166 1.21 *
437 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
438     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
439 dl 1.1 * @param threadFactory the factory to use when the executor
440 jsr166 1.39 * creates a new thread
441     * @throws IllegalArgumentException if {@code corePoolSize < 0}
442     * @throws NullPointerException if {@code threadFactory} is null
443 dl 1.1 */
444     public ScheduledThreadPoolExecutor(int corePoolSize,
445 jsr166 1.56 ThreadFactory threadFactory) {
446 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
447     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
448 dl 1.1 new DelayedWorkQueue(), threadFactory);
449     }
450    
451     /**
452 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
453     * given initial parameters.
454 jsr166 1.21 *
455 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
456     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
457 dl 1.1 * @param handler the handler to use when execution is blocked
458 jsr166 1.39 * because the thread bounds and queue capacities are reached
459     * @throws IllegalArgumentException if {@code corePoolSize < 0}
460     * @throws NullPointerException if {@code handler} is null
461 dl 1.1 */
462     public ScheduledThreadPoolExecutor(int corePoolSize,
463 jsr166 1.56 RejectedExecutionHandler handler) {
464 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
465     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
466 dl 1.1 new DelayedWorkQueue(), handler);
467     }
468    
469     /**
470 jsr166 1.79 * Creates a new {@code ScheduledThreadPoolExecutor} with the
471     * given initial parameters.
472 jsr166 1.21 *
473 jsr166 1.39 * @param corePoolSize the number of threads to keep in the pool, even
474     * if they are idle, unless {@code allowCoreThreadTimeOut} is set
475 dl 1.1 * @param threadFactory the factory to use when the executor
476 jsr166 1.39 * creates a new thread
477 dl 1.1 * @param handler the handler to use when execution is blocked
478 jsr166 1.39 * because the thread bounds and queue capacities are reached
479     * @throws IllegalArgumentException if {@code corePoolSize < 0}
480     * @throws NullPointerException if {@code threadFactory} or
481     * {@code handler} is null
482 dl 1.1 */
483     public ScheduledThreadPoolExecutor(int corePoolSize,
484 jsr166 1.56 ThreadFactory threadFactory,
485     RejectedExecutionHandler handler) {
486 jsr166 1.78 super(corePoolSize, Integer.MAX_VALUE,
487     DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
488 dl 1.1 new DelayedWorkQueue(), threadFactory, handler);
489     }
490    
491 dl 1.37 /**
492 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
493 jsr166 1.54 */
494     private long triggerTime(long delay, TimeUnit unit) {
495     return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
496     }
497    
498     /**
499 jsr166 1.73 * Returns the nanoTime-based trigger time of a delayed action.
500 dl 1.50 */
501 jsr166 1.54 long triggerTime(long delay) {
502 jsr166 1.98 return System.nanoTime() +
503 jsr166 1.54 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
504     }
505    
506     /**
507     * Constrains the values of all delays in the queue to be within
508     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
509     * This may occur if a task is eligible to be dequeued, but has
510     * not yet been, while some other task is added with a delay of
511     * Long.MAX_VALUE.
512     */
513     private long overflowFree(long delay) {
514     Delayed head = (Delayed) super.getQueue().peek();
515     if (head != null) {
516 jsr166 1.62 long headDelay = head.getDelay(NANOSECONDS);
517 jsr166 1.54 if (headDelay < 0 && (delay - headDelay < 0))
518     delay = Long.MAX_VALUE + headDelay;
519     }
520     return delay;
521 dl 1.50 }
522    
523     /**
524 dl 1.37 * @throws RejectedExecutionException {@inheritDoc}
525     * @throws NullPointerException {@inheritDoc}
526     */
527 jsr166 1.21 public ScheduledFuture<?> schedule(Runnable command,
528     long delay,
529 dl 1.13 TimeUnit unit) {
530 dl 1.9 if (command == null || unit == null)
531 dl 1.1 throw new NullPointerException();
532 jsr166 1.76 RunnableScheduledFuture<Void> t = decorateTask(command,
533 jsr166 1.54 new ScheduledFutureTask<Void>(command, null,
534 jsr166 1.89 triggerTime(delay, unit),
535     sequencer.getAndIncrement()));
536 dl 1.1 delayedExecute(t);
537     return t;
538     }
539 jsr166 1.52
540 dl 1.37 /**
541     * @throws RejectedExecutionException {@inheritDoc}
542     * @throws NullPointerException {@inheritDoc}
543     */
544 jsr166 1.21 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
545     long delay,
546 dl 1.13 TimeUnit unit) {
547 dl 1.9 if (callable == null || unit == null)
548 dl 1.1 throw new NullPointerException();
549 peierls 1.22 RunnableScheduledFuture<V> t = decorateTask(callable,
550 jsr166 1.54 new ScheduledFutureTask<V>(callable,
551 jsr166 1.89 triggerTime(delay, unit),
552     sequencer.getAndIncrement()));
553 dl 1.1 delayedExecute(t);
554     return t;
555     }
556    
557 dl 1.37 /**
558     * @throws RejectedExecutionException {@inheritDoc}
559     * @throws NullPointerException {@inheritDoc}
560     * @throws IllegalArgumentException {@inheritDoc}
561     */
562 jsr166 1.21 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
563     long initialDelay,
564     long period,
565 dl 1.13 TimeUnit unit) {
566 dl 1.9 if (command == null || unit == null)
567 dl 1.1 throw new NullPointerException();
568 jsr166 1.96 if (period <= 0L)
569 dl 1.1 throw new IllegalArgumentException();
570 jsr166 1.48 ScheduledFutureTask<Void> sft =
571     new ScheduledFutureTask<Void>(command,
572     null,
573 jsr166 1.54 triggerTime(initialDelay, unit),
574 jsr166 1.89 unit.toNanos(period),
575     sequencer.getAndIncrement());
576 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
577 jsr166 1.48 sft.outerTask = t;
578 dl 1.1 delayedExecute(t);
579     return t;
580     }
581 jsr166 1.21
582 dl 1.37 /**
583     * @throws RejectedExecutionException {@inheritDoc}
584     * @throws NullPointerException {@inheritDoc}
585     * @throws IllegalArgumentException {@inheritDoc}
586     */
587 jsr166 1.21 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
588     long initialDelay,
589     long delay,
590 dl 1.13 TimeUnit unit) {
591 dl 1.9 if (command == null || unit == null)
592 dl 1.1 throw new NullPointerException();
593 jsr166 1.96 if (delay <= 0L)
594 dl 1.1 throw new IllegalArgumentException();
595 jsr166 1.48 ScheduledFutureTask<Void> sft =
596     new ScheduledFutureTask<Void>(command,
597     null,
598 jsr166 1.54 triggerTime(initialDelay, unit),
599 jsr166 1.97 -unit.toNanos(delay),
600 jsr166 1.89 sequencer.getAndIncrement());
601 jsr166 1.44 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
602 jsr166 1.48 sft.outerTask = t;
603 dl 1.1 delayedExecute(t);
604     return t;
605     }
606 jsr166 1.21
607 dl 1.1 /**
608 jsr166 1.39 * Executes {@code command} with zero required delay.
609     * This has effect equivalent to
610     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
611     * Note that inspections of the queue and of the list returned by
612     * {@code shutdownNow} will access the zero-delayed
613     * {@link ScheduledFuture}, not the {@code command} itself.
614     *
615     * <p>A consequence of the use of {@code ScheduledFuture} objects is
616     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
617     * called with a null second {@code Throwable} argument, even if the
618     * {@code command} terminated abruptly. Instead, the {@code Throwable}
619     * thrown by such a task can be obtained via {@link Future#get}.
620 dl 1.1 *
621     * @throws RejectedExecutionException at discretion of
622 jsr166 1.39 * {@code RejectedExecutionHandler}, if the task
623     * cannot be accepted for execution because the
624     * executor has been shut down
625     * @throws NullPointerException {@inheritDoc}
626 dl 1.1 */
627     public void execute(Runnable command) {
628 jsr166 1.62 schedule(command, 0, NANOSECONDS);
629 dl 1.1 }
630    
631 dl 1.13 // Override AbstractExecutorService methods
632    
633 dl 1.37 /**
634     * @throws RejectedExecutionException {@inheritDoc}
635     * @throws NullPointerException {@inheritDoc}
636     */
637 dl 1.7 public Future<?> submit(Runnable task) {
638 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
639 dl 1.7 }
640    
641 dl 1.37 /**
642     * @throws RejectedExecutionException {@inheritDoc}
643     * @throws NullPointerException {@inheritDoc}
644     */
645 dl 1.7 public <T> Future<T> submit(Runnable task, T result) {
646 jsr166 1.62 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
647 dl 1.7 }
648    
649 dl 1.37 /**
650     * @throws RejectedExecutionException {@inheritDoc}
651     * @throws NullPointerException {@inheritDoc}
652     */
653 dl 1.7 public <T> Future<T> submit(Callable<T> task) {
654 jsr166 1.62 return schedule(task, 0, NANOSECONDS);
655 dl 1.7 }
656 dl 1.1
657     /**
658 dl 1.37 * Sets the policy on whether to continue executing existing
659 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
660     * In this case, these tasks will only terminate upon
661     * {@code shutdownNow} or after setting the policy to
662     * {@code false} when already shutdown.
663     * This value is by default {@code false}.
664 jsr166 1.30 *
665 jsr166 1.68 * @param value if {@code true}, continue after shutdown, else don't
666 jsr166 1.25 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
667 dl 1.1 */
668     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
669     continueExistingPeriodicTasksAfterShutdown = value;
670 jsr166 1.39 if (!value && isShutdown())
671 dl 1.37 onShutdown();
672 dl 1.1 }
673    
674     /**
675 jsr166 1.21 * Gets the policy on whether to continue executing existing
676 jsr166 1.39 * periodic tasks even when this executor has been {@code shutdown}.
677     * In this case, these tasks will only terminate upon
678     * {@code shutdownNow} or after setting the policy to
679     * {@code false} when already shutdown.
680     * This value is by default {@code false}.
681 jsr166 1.30 *
682 jsr166 1.39 * @return {@code true} if will continue after shutdown
683 dl 1.16 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
684 dl 1.1 */
685     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
686     return continueExistingPeriodicTasksAfterShutdown;
687     }
688    
689     /**
690 jsr166 1.21 * Sets the policy on whether to execute existing delayed
691 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
692     * In this case, these tasks will only terminate upon
693     * {@code shutdownNow}, or after setting the policy to
694     * {@code false} when already shutdown.
695     * This value is by default {@code true}.
696 jsr166 1.30 *
697 jsr166 1.68 * @param value if {@code true}, execute after shutdown, else don't
698 dl 1.16 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
699 dl 1.1 */
700     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
701     executeExistingDelayedTasksAfterShutdown = value;
702 jsr166 1.39 if (!value && isShutdown())
703 dl 1.37 onShutdown();
704 dl 1.1 }
705    
706     /**
707 jsr166 1.21 * Gets the policy on whether to execute existing delayed
708 jsr166 1.39 * tasks even when this executor has been {@code shutdown}.
709     * In this case, these tasks will only terminate upon
710     * {@code shutdownNow}, or after setting the policy to
711     * {@code false} when already shutdown.
712     * This value is by default {@code true}.
713 jsr166 1.30 *
714 jsr166 1.39 * @return {@code true} if will execute after shutdown
715 dl 1.16 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
716 dl 1.1 */
717     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
718     return executeExistingDelayedTasksAfterShutdown;
719     }
720    
721     /**
722 jsr166 1.46 * Sets the policy on whether cancelled tasks should be immediately
723     * removed from the work queue at time of cancellation. This value is
724     * by default {@code false}.
725 dl 1.41 *
726 jsr166 1.42 * @param value if {@code true}, remove on cancellation, else don't
727 dl 1.41 * @see #getRemoveOnCancelPolicy
728 jsr166 1.43 * @since 1.7
729 dl 1.41 */
730     public void setRemoveOnCancelPolicy(boolean value) {
731     removeOnCancel = value;
732     }
733    
734     /**
735 jsr166 1.46 * Gets the policy on whether cancelled tasks should be immediately
736     * removed from the work queue at time of cancellation. This value is
737     * by default {@code false}.
738 dl 1.41 *
739 jsr166 1.46 * @return {@code true} if cancelled tasks are immediately removed
740     * from the queue
741 dl 1.41 * @see #setRemoveOnCancelPolicy
742 jsr166 1.43 * @since 1.7
743 dl 1.41 */
744     public boolean getRemoveOnCancelPolicy() {
745     return removeOnCancel;
746     }
747    
748     /**
749 dl 1.1 * Initiates an orderly shutdown in which previously submitted
750 jsr166 1.49 * tasks are executed, but no new tasks will be accepted.
751     * Invocation has no additional effect if already shut down.
752     *
753     * <p>This method does not wait for previously submitted tasks to
754     * complete execution. Use {@link #awaitTermination awaitTermination}
755     * to do that.
756     *
757     * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
758     * has been set {@code false}, existing delayed tasks whose delays
759     * have not yet elapsed are cancelled. And unless the {@code
760     * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
761     * {@code true}, future executions of existing periodic tasks will
762     * be cancelled.
763 jsr166 1.39 *
764     * @throws SecurityException {@inheritDoc}
765 dl 1.1 */
766     public void shutdown() {
767     super.shutdown();
768     }
769    
770     /**
771     * Attempts to stop all actively executing tasks, halts the
772 jsr166 1.30 * processing of waiting tasks, and returns a list of the tasks
773 jsr166 1.92 * that were awaiting execution. These tasks are drained (removed)
774     * from the task queue upon return from this method.
775 jsr166 1.21 *
776 jsr166 1.49 * <p>This method does not wait for actively executing tasks to
777     * terminate. Use {@link #awaitTermination awaitTermination} to
778     * do that.
779     *
780 dl 1.1 * <p>There are no guarantees beyond best-effort attempts to stop
781 dl 1.18 * processing actively executing tasks. This implementation
782 jsr166 1.93 * interrupts tasks via {@link Thread#interrupt}; any task that
783 jsr166 1.31 * fails to respond to interrupts may never terminate.
784 dl 1.1 *
785 jsr166 1.39 * @return list of tasks that never commenced execution.
786 jsr166 1.77 * Each element of this list is a {@link ScheduledFuture}.
787     * For tasks submitted via one of the {@code schedule}
788     * methods, the element will be identical to the returned
789     * {@code ScheduledFuture}. For tasks submitted using
790 jsr166 1.84 * {@link #execute execute}, the element will be a
791     * zero-delay {@code ScheduledFuture}.
792 jsr166 1.31 * @throws SecurityException {@inheritDoc}
793 dl 1.1 */
794 tim 1.4 public List<Runnable> shutdownNow() {
795 dl 1.1 return super.shutdownNow();
796     }
797    
798     /**
799 jsr166 1.95 * Returns the task queue used by this executor. Access to the
800     * task queue is intended primarily for debugging and monitoring.
801     * This queue may be in active use. Retrieving the task queue
802     * does not prevent queued tasks from executing.
803     *
804     * <p>Each element of this queue is a {@link ScheduledFuture}.
805 jsr166 1.77 * For tasks submitted via one of the {@code schedule} methods, the
806     * element will be identical to the returned {@code ScheduledFuture}.
807 jsr166 1.84 * For tasks submitted using {@link #execute execute}, the element
808     * will be a zero-delay {@code ScheduledFuture}.
809 jsr166 1.77 *
810     * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
811     * tasks in the order in which they will execute.
812 dl 1.1 *
813     * @return the task queue
814     */
815     public BlockingQueue<Runnable> getQueue() {
816     return super.getQueue();
817     }
818    
819 dl 1.13 /**
820 dl 1.40 * Specialized delay queue. To mesh with TPE declarations, this
821     * class must be declared as a BlockingQueue<Runnable> even though
822 jsr166 1.42 * it can only hold RunnableScheduledFutures.
823 jsr166 1.21 */
824 dl 1.40 static class DelayedWorkQueue extends AbstractQueue<Runnable>
825 dl 1.13 implements BlockingQueue<Runnable> {
826 jsr166 1.21
827 dl 1.40 /*
828     * A DelayedWorkQueue is based on a heap-based data structure
829     * like those in DelayQueue and PriorityQueue, except that
830     * every ScheduledFutureTask also records its index into the
831     * heap array. This eliminates the need to find a task upon
832     * cancellation, greatly speeding up removal (down from O(n)
833     * to O(log n)), and reducing garbage retention that would
834     * otherwise occur by waiting for the element to rise to top
835     * before clearing. But because the queue may also hold
836     * RunnableScheduledFutures that are not ScheduledFutureTasks,
837     * we are not guaranteed to have such indices available, in
838     * which case we fall back to linear search. (We expect that
839     * most tasks will not be decorated, and that the faster cases
840     * will be much more common.)
841     *
842     * All heap operations must record index changes -- mainly
843     * within siftUp and siftDown. Upon removal, a task's
844     * heapIndex is set to -1. Note that ScheduledFutureTasks can
845     * appear at most once in the queue (this need not be true for
846     * other kinds of tasks or work queues), so are uniquely
847     * identified by heapIndex.
848     */
849    
850 jsr166 1.46 private static final int INITIAL_CAPACITY = 16;
851 jsr166 1.61 private RunnableScheduledFuture<?>[] queue =
852     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
853 jsr166 1.46 private final ReentrantLock lock = new ReentrantLock();
854 jsr166 1.80 private int size;
855 dl 1.40
856 jsr166 1.48 /**
857     * Thread designated to wait for the task at the head of the
858     * queue. This variant of the Leader-Follower pattern
859     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
860     * minimize unnecessary timed waiting. When a thread becomes
861     * the leader, it waits only for the next delay to elapse, but
862     * other threads await indefinitely. The leader thread must
863     * signal some other thread before returning from take() or
864     * poll(...), unless some other thread becomes leader in the
865     * interim. Whenever the head of the queue is replaced with a
866     * task with an earlier expiration time, the leader field is
867     * invalidated by being reset to null, and some waiting
868     * thread, but not necessarily the current leader, is
869     * signalled. So waiting threads must be prepared to acquire
870     * and lose leadership while waiting.
871     */
872 jsr166 1.80 private Thread leader;
873 jsr166 1.48
874     /**
875     * Condition signalled when a newer task becomes available at the
876     * head of the queue or a new thread may need to become leader.
877     */
878     private final Condition available = lock.newCondition();
879 dl 1.40
880     /**
881 jsr166 1.66 * Sets f's heapIndex if it is a ScheduledFutureTask.
882 dl 1.40 */
883 jsr166 1.102 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
884 dl 1.40 if (f instanceof ScheduledFutureTask)
885     ((ScheduledFutureTask)f).heapIndex = idx;
886     }
887    
888     /**
889 jsr166 1.66 * Sifts element added at bottom up to its heap-ordered spot.
890 dl 1.40 * Call only when holding lock.
891     */
892 jsr166 1.61 private void siftUp(int k, RunnableScheduledFuture<?> key) {
893 dl 1.40 while (k > 0) {
894     int parent = (k - 1) >>> 1;
895 jsr166 1.61 RunnableScheduledFuture<?> e = queue[parent];
896 dl 1.40 if (key.compareTo(e) >= 0)
897     break;
898     queue[k] = e;
899     setIndex(e, k);
900     k = parent;
901     }
902     queue[k] = key;
903     setIndex(key, k);
904     }
905    
906     /**
907 jsr166 1.66 * Sifts element added at top down to its heap-ordered spot.
908 dl 1.40 * Call only when holding lock.
909     */
910 jsr166 1.61 private void siftDown(int k, RunnableScheduledFuture<?> key) {
911 jsr166 1.42 int half = size >>> 1;
912 dl 1.40 while (k < half) {
913 jsr166 1.42 int child = (k << 1) + 1;
914 jsr166 1.61 RunnableScheduledFuture<?> c = queue[child];
915 dl 1.40 int right = child + 1;
916     if (right < size && c.compareTo(queue[right]) > 0)
917     c = queue[child = right];
918     if (key.compareTo(c) <= 0)
919     break;
920     queue[k] = c;
921     setIndex(c, k);
922     k = child;
923     }
924     queue[k] = key;
925     setIndex(key, k);
926     }
927    
928     /**
929 jsr166 1.66 * Resizes the heap array. Call only when holding lock.
930 dl 1.40 */
931     private void grow() {
932     int oldCapacity = queue.length;
933     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
934     if (newCapacity < 0) // overflow
935     newCapacity = Integer.MAX_VALUE;
936     queue = Arrays.copyOf(queue, newCapacity);
937     }
938    
939     /**
940 jsr166 1.66 * Finds index of given object, or -1 if absent.
941 dl 1.40 */
942     private int indexOf(Object x) {
943     if (x != null) {
944 jsr166 1.48 if (x instanceof ScheduledFutureTask) {
945     int i = ((ScheduledFutureTask) x).heapIndex;
946     // Sanity check; x could conceivably be a
947     // ScheduledFutureTask from some other pool.
948     if (i >= 0 && i < size && queue[i] == x)
949     return i;
950     } else {
951     for (int i = 0; i < size; i++)
952     if (x.equals(queue[i]))
953     return i;
954     }
955 dl 1.40 }
956     return -1;
957     }
958    
959 jsr166 1.48 public boolean contains(Object x) {
960     final ReentrantLock lock = this.lock;
961 jsr166 1.45 lock.lock();
962     try {
963 jsr166 1.48 return indexOf(x) != -1;
964 jsr166 1.45 } finally {
965     lock.unlock();
966     }
967 jsr166 1.48 }
968 jsr166 1.45
969 dl 1.40 public boolean remove(Object x) {
970     final ReentrantLock lock = this.lock;
971     lock.lock();
972     try {
973 jsr166 1.45 int i = indexOf(x);
974 jsr166 1.48 if (i < 0)
975     return false;
976 jsr166 1.45
977 jsr166 1.48 setIndex(queue[i], -1);
978     int s = --size;
979 jsr166 1.61 RunnableScheduledFuture<?> replacement = queue[s];
980 jsr166 1.48 queue[s] = null;
981     if (s != i) {
982     siftDown(i, replacement);
983     if (queue[i] == replacement)
984     siftUp(i, replacement);
985     }
986     return true;
987 dl 1.40 } finally {
988     lock.unlock();
989     }
990     }
991    
992     public int size() {
993     final ReentrantLock lock = this.lock;
994     lock.lock();
995     try {
996 jsr166 1.45 return size;
997 dl 1.40 } finally {
998     lock.unlock();
999     }
1000     }
1001    
1002 jsr166 1.42 public boolean isEmpty() {
1003     return size() == 0;
1004 dl 1.40 }
1005    
1006     public int remainingCapacity() {
1007     return Integer.MAX_VALUE;
1008     }
1009    
1010 jsr166 1.61 public RunnableScheduledFuture<?> peek() {
1011 dl 1.40 final ReentrantLock lock = this.lock;
1012     lock.lock();
1013     try {
1014     return queue[0];
1015     } finally {
1016     lock.unlock();
1017     }
1018 dl 1.13 }
1019    
1020 dl 1.40 public boolean offer(Runnable x) {
1021     if (x == null)
1022     throw new NullPointerException();
1023 jsr166 1.61 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1024 dl 1.40 final ReentrantLock lock = this.lock;
1025     lock.lock();
1026     try {
1027     int i = size;
1028     if (i >= queue.length)
1029     grow();
1030     size = i + 1;
1031     if (i == 0) {
1032     queue[0] = e;
1033     setIndex(e, 0);
1034 jsr166 1.45 } else {
1035 dl 1.40 siftUp(i, e);
1036     }
1037 jsr166 1.46 if (queue[0] == e) {
1038 jsr166 1.48 leader = null;
1039 jsr166 1.46 available.signal();
1040 jsr166 1.48 }
1041 dl 1.40 } finally {
1042     lock.unlock();
1043     }
1044     return true;
1045 jsr166 1.48 }
1046 dl 1.40
1047     public void put(Runnable e) {
1048     offer(e);
1049     }
1050    
1051     public boolean add(Runnable e) {
1052 jsr166 1.48 return offer(e);
1053     }
1054 dl 1.40
1055     public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1056     return offer(e);
1057     }
1058 jsr166 1.42
1059 jsr166 1.46 /**
1060     * Performs common bookkeeping for poll and take: Replaces
1061 jsr166 1.47 * first element with last and sifts it down. Call only when
1062     * holding lock.
1063 jsr166 1.46 * @param f the task to remove and return
1064     */
1065 jsr166 1.61 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1066 jsr166 1.46 int s = --size;
1067 jsr166 1.61 RunnableScheduledFuture<?> x = queue[s];
1068 jsr166 1.46 queue[s] = null;
1069     if (s != 0)
1070     siftDown(0, x);
1071     setIndex(f, -1);
1072     return f;
1073     }
1074    
1075 jsr166 1.61 public RunnableScheduledFuture<?> poll() {
1076 dl 1.40 final ReentrantLock lock = this.lock;
1077     lock.lock();
1078     try {
1079 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1080 jsr166 1.87 return (first == null || first.getDelay(NANOSECONDS) > 0)
1081     ? null
1082     : finishPoll(first);
1083 dl 1.40 } finally {
1084     lock.unlock();
1085     }
1086     }
1087    
1088 jsr166 1.61 public RunnableScheduledFuture<?> take() throws InterruptedException {
1089 dl 1.40 final ReentrantLock lock = this.lock;
1090     lock.lockInterruptibly();
1091     try {
1092     for (;;) {
1093 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1094 jsr166 1.42 if (first == null)
1095 dl 1.40 available.await();
1096     else {
1097 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1098 jsr166 1.96 if (delay <= 0L)
1099 jsr166 1.48 return finishPoll(first);
1100 jsr166 1.71 first = null; // don't retain ref while waiting
1101     if (leader != null)
1102 jsr166 1.48 available.await();
1103     else {
1104     Thread thisThread = Thread.currentThread();
1105     leader = thisThread;
1106     try {
1107     available.awaitNanos(delay);
1108     } finally {
1109     if (leader == thisThread)
1110     leader = null;
1111     }
1112     }
1113 dl 1.40 }
1114     }
1115     } finally {
1116 jsr166 1.48 if (leader == null && queue[0] != null)
1117     available.signal();
1118 dl 1.40 lock.unlock();
1119     }
1120     }
1121    
1122 jsr166 1.61 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1123 dl 1.40 throws InterruptedException {
1124     long nanos = unit.toNanos(timeout);
1125     final ReentrantLock lock = this.lock;
1126     lock.lockInterruptibly();
1127     try {
1128     for (;;) {
1129 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1130 dl 1.40 if (first == null) {
1131 jsr166 1.96 if (nanos <= 0L)
1132 dl 1.40 return null;
1133     else
1134     nanos = available.awaitNanos(nanos);
1135     } else {
1136 jsr166 1.62 long delay = first.getDelay(NANOSECONDS);
1137 jsr166 1.96 if (delay <= 0L)
1138 dl 1.40 return finishPoll(first);
1139 jsr166 1.96 if (nanos <= 0L)
1140 jsr166 1.48 return null;
1141 jsr166 1.71 first = null; // don't retain ref while waiting
1142 jsr166 1.48 if (nanos < delay || leader != null)
1143     nanos = available.awaitNanos(nanos);
1144     else {
1145     Thread thisThread = Thread.currentThread();
1146     leader = thisThread;
1147     try {
1148     long timeLeft = available.awaitNanos(delay);
1149     nanos -= delay - timeLeft;
1150     } finally {
1151     if (leader == thisThread)
1152     leader = null;
1153     }
1154     }
1155     }
1156     }
1157 dl 1.40 } finally {
1158 jsr166 1.48 if (leader == null && queue[0] != null)
1159     available.signal();
1160 dl 1.40 lock.unlock();
1161     }
1162     }
1163    
1164     public void clear() {
1165     final ReentrantLock lock = this.lock;
1166     lock.lock();
1167     try {
1168     for (int i = 0; i < size; i++) {
1169 jsr166 1.61 RunnableScheduledFuture<?> t = queue[i];
1170 dl 1.40 if (t != null) {
1171     queue[i] = null;
1172     setIndex(t, -1);
1173     }
1174     }
1175     size = 0;
1176     } finally {
1177     lock.unlock();
1178     }
1179 dl 1.13 }
1180 dl 1.40
1181     /**
1182 jsr166 1.66 * Returns first element only if it is expired.
1183 dl 1.40 * Used only by drainTo. Call only when holding lock.
1184     */
1185 jsr166 1.62 private RunnableScheduledFuture<?> peekExpired() {
1186     // assert lock.isHeldByCurrentThread();
1187 jsr166 1.61 RunnableScheduledFuture<?> first = queue[0];
1188 jsr166 1.62 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1189     null : first;
1190 dl 1.40 }
1191    
1192     public int drainTo(Collection<? super Runnable> c) {
1193     if (c == null)
1194     throw new NullPointerException();
1195     if (c == this)
1196     throw new IllegalArgumentException();
1197     final ReentrantLock lock = this.lock;
1198     lock.lock();
1199     try {
1200 jsr166 1.61 RunnableScheduledFuture<?> first;
1201 dl 1.40 int n = 0;
1202 jsr166 1.62 while ((first = peekExpired()) != null) {
1203     c.add(first); // In this order, in case add() throws.
1204     finishPoll(first);
1205 jsr166 1.48 ++n;
1206     }
1207 dl 1.40 return n;
1208     } finally {
1209     lock.unlock();
1210     }
1211 dl 1.13 }
1212    
1213 jsr166 1.21 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1214 dl 1.40 if (c == null)
1215     throw new NullPointerException();
1216     if (c == this)
1217     throw new IllegalArgumentException();
1218     if (maxElements <= 0)
1219     return 0;
1220     final ReentrantLock lock = this.lock;
1221     lock.lock();
1222     try {
1223 jsr166 1.61 RunnableScheduledFuture<?> first;
1224 dl 1.40 int n = 0;
1225 jsr166 1.62 while (n < maxElements && (first = peekExpired()) != null) {
1226     c.add(first); // In this order, in case add() throws.
1227     finishPoll(first);
1228 jsr166 1.48 ++n;
1229     }
1230 dl 1.40 return n;
1231     } finally {
1232     lock.unlock();
1233     }
1234     }
1235    
1236     public Object[] toArray() {
1237     final ReentrantLock lock = this.lock;
1238     lock.lock();
1239     try {
1240 jsr166 1.45 return Arrays.copyOf(queue, size, Object[].class);
1241 dl 1.40 } finally {
1242     lock.unlock();
1243     }
1244     }
1245    
1246 jsr166 1.48 @SuppressWarnings("unchecked")
1247 dl 1.40 public <T> T[] toArray(T[] a) {
1248     final ReentrantLock lock = this.lock;
1249     lock.lock();
1250     try {
1251     if (a.length < size)
1252     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1253     System.arraycopy(queue, 0, a, 0, size);
1254     if (a.length > size)
1255     a[size] = null;
1256     return a;
1257     } finally {
1258     lock.unlock();
1259     }
1260 dl 1.13 }
1261    
1262 jsr166 1.21 public Iterator<Runnable> iterator() {
1263 jsr166 1.45 return new Itr(Arrays.copyOf(queue, size));
1264 dl 1.40 }
1265 jsr166 1.42
1266 dl 1.40 /**
1267     * Snapshot iterator that works off copy of underlying q array.
1268     */
1269     private class Itr implements Iterator<Runnable> {
1270 jsr166 1.74 final RunnableScheduledFuture<?>[] array;
1271 jsr166 1.86 int cursor; // index of next element to return; initially 0
1272     int lastRet = -1; // index of last element returned; -1 if no such
1273 jsr166 1.42
1274 jsr166 1.74 Itr(RunnableScheduledFuture<?>[] array) {
1275 dl 1.40 this.array = array;
1276     }
1277 jsr166 1.42
1278 dl 1.40 public boolean hasNext() {
1279     return cursor < array.length;
1280     }
1281 jsr166 1.42
1282 dl 1.40 public Runnable next() {
1283     if (cursor >= array.length)
1284     throw new NoSuchElementException();
1285 jsr166 1.101 return array[lastRet = cursor++];
1286 dl 1.40 }
1287 jsr166 1.42
1288 dl 1.40 public void remove() {
1289     if (lastRet < 0)
1290     throw new IllegalStateException();
1291     DelayedWorkQueue.this.remove(array[lastRet]);
1292     lastRet = -1;
1293     }
1294 dl 1.13 }
1295     }
1296 dl 1.1 }