ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.40
Committed: Sun Feb 18 23:16:35 2007 UTC (17 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.39: +439 -42 lines
Log Message:
Faster task cancellation and removal

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p> Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission. Cancelled tasks are automatically removed from the
25 * work queue.
26 *
27 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
28 * of the inherited tuning methods are not useful for it. In
29 * particular, because it acts as a fixed-sized pool using
30 * {@code corePoolSize} threads and an unbounded queue, adjustments
31 * to {@code maximumPoolSize} have no useful effect. Additionally, it
32 * is almost never a good idea to set {@code corePoolSize} to zero or
33 * use {@code allowCoreThreadTimeOut} because this may leave the pool
34 * without threads to handle tasks once they become eligible to run.
35 *
36 * <p><b>Extension notes:</b> This class overrides the
37 * {@link ThreadPoolExecutor#execute execute} and
38 * {@link AbstractExecutorService#submit(Runnable) submit}
39 * methods to generate internal {@link ScheduledFuture} objects to
40 * control per-task delays and scheduling. To preserve
41 * functionality, any further overrides of these methods in
42 * subclasses must invoke superclass versions, which effectively
43 * disables additional task customization. However, this class
44 * provides alternative protected extension method
45 * {@code decorateTask} (one version each for {@code Runnable} and
46 * {@code Callable}) that can be used to customize the concrete task
47 * types used to execute commands entered via {@code execute},
48 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
49 * and {@code scheduleWithFixedDelay}. By default, a
50 * {@code ScheduledThreadPoolExecutor} uses a task type extending
51 * {@link FutureTask}. However, this may be modified or replaced using
52 * subclasses of the form:
53 *
54 * <pre> {@code
55 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
56 *
57 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
58 *
59 * protected <V> RunnableScheduledFuture<V> decorateTask(
60 * Runnable r, RunnableScheduledFuture<V> task) {
61 * return new CustomTask<V>(r, task);
62 * }
63 *
64 * protected <V> RunnableScheduledFuture<V> decorateTask(
65 * Callable<V> c, RunnableScheduledFuture<V> task) {
66 * return new CustomTask<V>(c, task);
67 * }
68 * // ... add constructors, etc.
69 * }}</pre>
70 *
71 * @since 1.5
72 * @author Doug Lea
73 */
74 public class ScheduledThreadPoolExecutor
75 extends ThreadPoolExecutor
76 implements ScheduledExecutorService {
77
78 /*
79 * This class specializes ThreadPoolExecutor implementation by
80 *
81 * 1. Using a custom task type, ScheduledFutureTask for
82 * tasks, even those that don't require scheduling (i.e.,
83 * those submitted using ExecutorService execute, not
84 * ScheduledExecutorService methods) which are treated as
85 * delayed tasks with a delay of zero.
86 *
87 * 2. Using a custom queue (DelayedWorkQueue) based on an
88 * unbounded DelayQueue. The lack of capacity constraint and
89 * the fact that corePoolSize and maximumPoolSize are
90 * effectively identical simplifies some execution mechanics
91 * (see delayedExecute) compared to ThreadPoolExecutor
92 * version.
93 *
94 * The DelayedWorkQueue class is defined below for the sake of
95 * ensuring that all elements are instances of
96 * RunnableScheduledFuture. Since DelayQueue otherwise
97 * requires type be Delayed, but not necessarily Runnable, and
98 * the workQueue requires the opposite, we need to explicitly
99 * define a class that requires both to ensure that users don't
100 * add objects that aren't RunnableScheduledFutures via
101 * getQueue().add() etc.
102 *
103 * 3. Supporting optional run-after-shutdown parameters, which
104 * leads to overrides of shutdown methods to remove and cancel
105 * tasks that should NOT be run after shutdown, as well as
106 * different recheck logic when task (re)submission overlaps
107 * with a shutdown.
108 *
109 * 4. Task decoration methods to allow interception and
110 * instrumentation, which are needed because subclasses cannot
111 * otherwise override submit methods to get this effect. These
112 * don't have any impact on pool control logic though.
113 */
114
115 /**
116 * False if should cancel/suppress periodic tasks on shutdown.
117 */
118 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
119
120 /**
121 * False if should cancel non-periodic tasks on shutdown.
122 */
123 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
124
125 /**
126 * Sequence number to break scheduling ties, and in turn to
127 * guarantee FIFO order among tied entries.
128 */
129 private static final AtomicLong sequencer = new AtomicLong(0);
130
131 /**
132 * Returns current nanosecond time.
133 */
134 final long now() {
135 return System.nanoTime();
136 }
137
138 private class ScheduledFutureTask<V>
139 extends FutureTask<V> implements RunnableScheduledFuture<V> {
140
141 /** Sequence number to break ties FIFO */
142 private final long sequenceNumber;
143 /** The time the task is enabled to execute in nanoTime units */
144 private long time;
145 /**
146 * Period in nanoseconds for repeating tasks. A positive
147 * value indicates fixed-rate execution. A negative value
148 * indicates fixed-delay execution. A value of 0 indicates a
149 * non-repeating task.
150 */
151 private final long period;
152
153 /**
154 * Index into delay queue, to support faster cancellation.
155 */
156 int heapIndex;
157
158 /**
159 * Creates a one-shot action with given nanoTime-based trigger time.
160 */
161 ScheduledFutureTask(Runnable r, V result, long ns) {
162 super(r, result);
163 this.time = ns;
164 this.period = 0;
165 this.sequenceNumber = sequencer.getAndIncrement();
166 }
167
168 /**
169 * Creates a periodic action with given nano time and period.
170 */
171 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
172 super(r, result);
173 this.time = ns;
174 this.period = period;
175 this.sequenceNumber = sequencer.getAndIncrement();
176 }
177
178 /**
179 * Creates a one-shot action with given nanoTime-based trigger.
180 */
181 ScheduledFutureTask(Callable<V> callable, long ns) {
182 super(callable);
183 this.time = ns;
184 this.period = 0;
185 this.sequenceNumber = sequencer.getAndIncrement();
186 }
187
188 public long getDelay(TimeUnit unit) {
189 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
190 return d;
191 }
192
193 public int compareTo(Delayed other) {
194 if (other == this) // compare zero ONLY if same object
195 return 0;
196 if (other instanceof ScheduledFutureTask) {
197 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
198 long diff = time - x.time;
199 if (diff < 0)
200 return -1;
201 else if (diff > 0)
202 return 1;
203 else if (sequenceNumber < x.sequenceNumber)
204 return -1;
205 else
206 return 1;
207 }
208 long d = (getDelay(TimeUnit.NANOSECONDS) -
209 other.getDelay(TimeUnit.NANOSECONDS));
210 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
211 }
212
213 /**
214 * Returns true if this is a periodic (not a one-shot) action.
215 *
216 * @return true if periodic
217 */
218 public boolean isPeriodic() {
219 return period != 0;
220 }
221
222 /**
223 * Sets the next time to run for a periodic task.
224 */
225 private void setNextRunTime() {
226 long p = period;
227 if (p > 0)
228 time += p;
229 else
230 time = now() - p;
231 }
232
233 public boolean cancel(boolean mayInterruptIfRunning) {
234 remove(this); // unconditionally remove
235 return super.cancel(mayInterruptIfRunning);
236 }
237
238 /**
239 * Overrides FutureTask version so as to reset/requeue if periodic.
240 */
241 public void run() {
242 boolean periodic = isPeriodic();
243 if (!canRunInCurrentRunState(periodic))
244 cancel(false);
245 else if (!periodic)
246 ScheduledFutureTask.super.run();
247 else if (ScheduledFutureTask.super.runAndReset()) {
248 setNextRunTime();
249 reExecutePeriodic(this);
250 }
251 }
252 }
253
254 /**
255 * Returns true if can run a task given current run state
256 * and run-after-shutdown parameters.
257 *
258 * @param periodic true if this task periodic, false if delayed
259 */
260 boolean canRunInCurrentRunState(boolean periodic) {
261 return isRunningOrShutdown(periodic ?
262 continueExistingPeriodicTasksAfterShutdown :
263 executeExistingDelayedTasksAfterShutdown);
264 }
265
266 /**
267 * Main execution method for delayed or periodic tasks. If pool
268 * is shut down, rejects the task. Otherwise adds task to queue
269 * and starts a thread, if necessary, to run it. (We cannot
270 * prestart the thread to run the task because the task (probably)
271 * shouldn't be run yet,) If the pool is shut down while the task
272 * is being added, cancel and remove it if required by state and
273 * run-after-shutdown parameters.
274 *
275 * @param task the task
276 */
277 private void delayedExecute(RunnableScheduledFuture<?> task) {
278 if (isShutdown())
279 reject(task);
280 else {
281 super.getQueue().add(task);
282 if (isShutdown() &&
283 !canRunInCurrentRunState(task.isPeriodic()) &&
284 remove(task))
285 task.cancel(false);
286 else
287 prestartCoreThread();
288 }
289 }
290
291 /**
292 * Requeues a periodic task unless current run state precludes it.
293 * Same idea as delayedExecute except drops task rather than rejecting.
294 *
295 * @param task the task
296 */
297 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
298 if (canRunInCurrentRunState(true)) {
299 super.getQueue().add(task);
300 if (!canRunInCurrentRunState(true) && remove(task))
301 task.cancel(false);
302 else
303 prestartCoreThread();
304 }
305 }
306
307 /**
308 * Cancels and clears the queue of all tasks that should not be run
309 * due to shutdown policy. Invoked within super.shutdown.
310 */
311 @Override void onShutdown() {
312 BlockingQueue<Runnable> q = super.getQueue();
313 boolean keepDelayed =
314 getExecuteExistingDelayedTasksAfterShutdownPolicy();
315 boolean keepPeriodic =
316 getContinueExistingPeriodicTasksAfterShutdownPolicy();
317 if (!keepDelayed && !keepPeriodic)
318 q.clear();
319 else {
320 // Traverse snapshot to avoid iterator exceptions
321 for (Object e : q.toArray()) {
322 if (e instanceof RunnableScheduledFuture) {
323 RunnableScheduledFuture<?> t =
324 (RunnableScheduledFuture<?>)e;
325 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed))
326 t.cancel(false);
327 }
328 }
329 }
330 tryTerminate();
331 }
332
333 /**
334 * Modifies or replaces the task used to execute a runnable.
335 * This method can be used to override the concrete
336 * class used for managing internal tasks.
337 * The default implementation simply returns the given task.
338 *
339 * @param runnable the submitted Runnable
340 * @param task the task created to execute the runnable
341 * @return a task that can execute the runnable
342 * @since 1.6
343 */
344 protected <V> RunnableScheduledFuture<V> decorateTask(
345 Runnable runnable, RunnableScheduledFuture<V> task) {
346 return task;
347 }
348
349 /**
350 * Modifies or replaces the task used to execute a callable.
351 * This method can be used to override the concrete
352 * class used for managing internal tasks.
353 * The default implementation simply returns the given task.
354 *
355 * @param callable the submitted Callable
356 * @param task the task created to execute the callable
357 * @return a task that can execute the callable
358 * @since 1.6
359 */
360 protected <V> RunnableScheduledFuture<V> decorateTask(
361 Callable<V> callable, RunnableScheduledFuture<V> task) {
362 return task;
363 }
364
365 /**
366 * Creates a new {@code ScheduledThreadPoolExecutor} with the
367 * given core pool size.
368 *
369 * @param corePoolSize the number of threads to keep in the pool, even
370 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
371 * @throws IllegalArgumentException if {@code corePoolSize < 0}
372 */
373 public ScheduledThreadPoolExecutor(int corePoolSize) {
374 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
375 new DelayedWorkQueue());
376 }
377
378 /**
379 * Creates a new {@code ScheduledThreadPoolExecutor} with the
380 * given initial parameters.
381 *
382 * @param corePoolSize the number of threads to keep in the pool, even
383 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
384 * @param threadFactory the factory to use when the executor
385 * creates a new thread
386 * @throws IllegalArgumentException if {@code corePoolSize < 0}
387 * @throws NullPointerException if {@code threadFactory} is null
388 */
389 public ScheduledThreadPoolExecutor(int corePoolSize,
390 ThreadFactory threadFactory) {
391 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
392 new DelayedWorkQueue(), threadFactory);
393 }
394
395 /**
396 * Creates a new ScheduledThreadPoolExecutor with the given
397 * initial parameters.
398 *
399 * @param corePoolSize the number of threads to keep in the pool, even
400 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
401 * @param handler the handler to use when execution is blocked
402 * because the thread bounds and queue capacities are reached
403 * @throws IllegalArgumentException if {@code corePoolSize < 0}
404 * @throws NullPointerException if {@code handler} is null
405 */
406 public ScheduledThreadPoolExecutor(int corePoolSize,
407 RejectedExecutionHandler handler) {
408 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
409 new DelayedWorkQueue(), handler);
410 }
411
412 /**
413 * Creates a new ScheduledThreadPoolExecutor with the given
414 * initial parameters.
415 *
416 * @param corePoolSize the number of threads to keep in the pool, even
417 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
418 * @param threadFactory the factory to use when the executor
419 * creates a new thread
420 * @param handler the handler to use when execution is blocked
421 * because the thread bounds and queue capacities are reached
422 * @throws IllegalArgumentException if {@code corePoolSize < 0}
423 * @throws NullPointerException if {@code threadFactory} or
424 * {@code handler} is null
425 */
426 public ScheduledThreadPoolExecutor(int corePoolSize,
427 ThreadFactory threadFactory,
428 RejectedExecutionHandler handler) {
429 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
430 new DelayedWorkQueue(), threadFactory, handler);
431 }
432
433 /**
434 * @throws RejectedExecutionException {@inheritDoc}
435 * @throws NullPointerException {@inheritDoc}
436 */
437 public ScheduledFuture<?> schedule(Runnable command,
438 long delay,
439 TimeUnit unit) {
440 if (command == null || unit == null)
441 throw new NullPointerException();
442 if (delay < 0) delay = 0;
443 long triggerTime = now() + unit.toNanos(delay);
444 RunnableScheduledFuture<?> t = decorateTask(command,
445 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
446 delayedExecute(t);
447 return t;
448 }
449
450 /**
451 * @throws RejectedExecutionException {@inheritDoc}
452 * @throws NullPointerException {@inheritDoc}
453 */
454 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
455 long delay,
456 TimeUnit unit) {
457 if (callable == null || unit == null)
458 throw new NullPointerException();
459 if (delay < 0) delay = 0;
460 long triggerTime = now() + unit.toNanos(delay);
461 RunnableScheduledFuture<V> t = decorateTask(callable,
462 new ScheduledFutureTask<V>(callable, triggerTime));
463 delayedExecute(t);
464 return t;
465 }
466
467 /**
468 * @throws RejectedExecutionException {@inheritDoc}
469 * @throws NullPointerException {@inheritDoc}
470 * @throws IllegalArgumentException {@inheritDoc}
471 */
472 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
473 long initialDelay,
474 long period,
475 TimeUnit unit) {
476 if (command == null || unit == null)
477 throw new NullPointerException();
478 if (period <= 0)
479 throw new IllegalArgumentException();
480 if (initialDelay < 0) initialDelay = 0;
481 long triggerTime = now() + unit.toNanos(initialDelay);
482 RunnableScheduledFuture<?> t = decorateTask(command,
483 new ScheduledFutureTask<Object>(command,
484 null,
485 triggerTime,
486 unit.toNanos(period)));
487 delayedExecute(t);
488 return t;
489 }
490
491 /**
492 * @throws RejectedExecutionException {@inheritDoc}
493 * @throws NullPointerException {@inheritDoc}
494 * @throws IllegalArgumentException {@inheritDoc}
495 */
496 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
497 long initialDelay,
498 long delay,
499 TimeUnit unit) {
500 if (command == null || unit == null)
501 throw new NullPointerException();
502 if (delay <= 0)
503 throw new IllegalArgumentException();
504 if (initialDelay < 0) initialDelay = 0;
505 long triggerTime = now() + unit.toNanos(initialDelay);
506 RunnableScheduledFuture<?> t = decorateTask(command,
507 new ScheduledFutureTask<Boolean>(command,
508 null,
509 triggerTime,
510 unit.toNanos(-delay)));
511 delayedExecute(t);
512 return t;
513 }
514
515 /**
516 * Executes {@code command} with zero required delay.
517 * This has effect equivalent to
518 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
519 * Note that inspections of the queue and of the list returned by
520 * {@code shutdownNow} will access the zero-delayed
521 * {@link ScheduledFuture}, not the {@code command} itself.
522 *
523 * <p>A consequence of the use of {@code ScheduledFuture} objects is
524 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
525 * called with a null second {@code Throwable} argument, even if the
526 * {@code command} terminated abruptly. Instead, the {@code Throwable}
527 * thrown by such a task can be obtained via {@link Future#get}.
528 *
529 * @throws RejectedExecutionException at discretion of
530 * {@code RejectedExecutionHandler}, if the task
531 * cannot be accepted for execution because the
532 * executor has been shut down
533 * @throws NullPointerException {@inheritDoc}
534 */
535 public void execute(Runnable command) {
536 schedule(command, 0, TimeUnit.NANOSECONDS);
537 }
538
539 // Override AbstractExecutorService methods
540
541 /**
542 * @throws RejectedExecutionException {@inheritDoc}
543 * @throws NullPointerException {@inheritDoc}
544 */
545 public Future<?> submit(Runnable task) {
546 return schedule(task, 0, TimeUnit.NANOSECONDS);
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 */
553 public <T> Future<T> submit(Runnable task, T result) {
554 return schedule(Executors.callable(task, result),
555 0, TimeUnit.NANOSECONDS);
556 }
557
558 /**
559 * @throws RejectedExecutionException {@inheritDoc}
560 * @throws NullPointerException {@inheritDoc}
561 */
562 public <T> Future<T> submit(Callable<T> task) {
563 return schedule(task, 0, TimeUnit.NANOSECONDS);
564 }
565
566 /**
567 * Sets the policy on whether to continue executing existing
568 * periodic tasks even when this executor has been {@code shutdown}.
569 * In this case, these tasks will only terminate upon
570 * {@code shutdownNow} or after setting the policy to
571 * {@code false} when already shutdown.
572 * This value is by default {@code false}.
573 *
574 * @param value if {@code true}, continue after shutdown, else don't.
575 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
576 */
577 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
578 continueExistingPeriodicTasksAfterShutdown = value;
579 if (!value && isShutdown())
580 onShutdown();
581 }
582
583 /**
584 * Gets the policy on whether to continue executing existing
585 * periodic tasks even when this executor has been {@code shutdown}.
586 * In this case, these tasks will only terminate upon
587 * {@code shutdownNow} or after setting the policy to
588 * {@code false} when already shutdown.
589 * This value is by default {@code false}.
590 *
591 * @return {@code true} if will continue after shutdown
592 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
593 */
594 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
595 return continueExistingPeriodicTasksAfterShutdown;
596 }
597
598 /**
599 * Sets the policy on whether to execute existing delayed
600 * tasks even when this executor has been {@code shutdown}.
601 * In this case, these tasks will only terminate upon
602 * {@code shutdownNow}, or after setting the policy to
603 * {@code false} when already shutdown.
604 * This value is by default {@code true}.
605 *
606 * @param value if {@code true}, execute after shutdown, else don't.
607 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
608 */
609 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
610 executeExistingDelayedTasksAfterShutdown = value;
611 if (!value && isShutdown())
612 onShutdown();
613 }
614
615 /**
616 * Gets the policy on whether to execute existing delayed
617 * tasks even when this executor has been {@code shutdown}.
618 * In this case, these tasks will only terminate upon
619 * {@code shutdownNow}, or after setting the policy to
620 * {@code false} when already shutdown.
621 * This value is by default {@code true}.
622 *
623 * @return {@code true} if will execute after shutdown
624 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
625 */
626 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
627 return executeExistingDelayedTasksAfterShutdown;
628 }
629
630 /**
631 * Initiates an orderly shutdown in which previously submitted
632 * tasks are executed, but no new tasks will be accepted. If the
633 * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
634 * been set {@code false}, existing delayed tasks whose delays
635 * have not yet elapsed are cancelled. And unless the
636 * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
637 * been set {@code true}, future executions of existing periodic
638 * tasks will be cancelled.
639 *
640 * @throws SecurityException {@inheritDoc}
641 */
642 public void shutdown() {
643 super.shutdown();
644 }
645
646 /**
647 * Attempts to stop all actively executing tasks, halts the
648 * processing of waiting tasks, and returns a list of the tasks
649 * that were awaiting execution.
650 *
651 * <p>There are no guarantees beyond best-effort attempts to stop
652 * processing actively executing tasks. This implementation
653 * cancels tasks via {@link Thread#interrupt}, so any task that
654 * fails to respond to interrupts may never terminate.
655 *
656 * @return list of tasks that never commenced execution.
657 * Each element of this list is a {@link ScheduledFuture},
658 * including those tasks submitted using {@code execute},
659 * which are for scheduling purposes used as the basis of a
660 * zero-delay {@code ScheduledFuture}.
661 * @throws SecurityException {@inheritDoc}
662 */
663 public List<Runnable> shutdownNow() {
664 return super.shutdownNow();
665 }
666
667 /**
668 * Returns the task queue used by this executor. Each element of
669 * this queue is a {@link ScheduledFuture}, including those
670 * tasks submitted using {@code execute} which are for scheduling
671 * purposes used as the basis of a zero-delay
672 * {@code ScheduledFuture}. Iteration over this queue is
673 * <em>not</em> guaranteed to traverse tasks in the order in
674 * which they will execute.
675 *
676 * @return the task queue
677 */
678 public BlockingQueue<Runnable> getQueue() {
679 return super.getQueue();
680 }
681
682 /**
683 * Specialized delay queue. To mesh with TPE declarations, this
684 * class must be declared as a BlockingQueue<Runnable> even though
685 * it can only hold RunnableScheduledFutures
686 */
687 static class DelayedWorkQueue extends AbstractQueue<Runnable>
688 implements BlockingQueue<Runnable> {
689
690 /*
691 * A DelayedWorkQueue is based on a heap-based data structure
692 * like those in DelayQueue and PriorityQueue, except that
693 * every ScheduledFutureTask also records its index into the
694 * heap array. This eliminates the need to find a task upon
695 * cancellation, greatly speeding up removal (down from O(n)
696 * to O(log n)), and reducing garbage retention that would
697 * otherwise occur by waiting for the element to rise to top
698 * before clearing. But because the queue may also hold
699 * RunnableScheduledFutures that are not ScheduledFutureTasks,
700 * we are not guaranteed to have such indices available, in
701 * which case we fall back to linear search. (We expect that
702 * most tasks will not be decorated, and that the faster cases
703 * will be much more common.)
704 *
705 * All heap operations must record index changes -- mainly
706 * within siftUp and siftDown. Upon removal, a task's
707 * heapIndex is set to -1. Note that ScheduledFutureTasks can
708 * appear at most once in the queue (this need not be true for
709 * other kinds of tasks or work queues), so are uniquely
710 * identified by heapIndex.
711 */
712
713 private static final int INITIAL_CAPACITY = 64;
714 private transient RunnableScheduledFuture[] queue =
715 new RunnableScheduledFuture[INITIAL_CAPACITY];
716 private transient final ReentrantLock lock = new ReentrantLock();
717 private transient final Condition available = lock.newCondition();
718 private int size = 0;
719
720
721 /**
722 * Set f's heapIndex if it is a ScheduledFutureTask
723 */
724 private void setIndex(Object f, int idx) {
725 if (f instanceof ScheduledFutureTask)
726 ((ScheduledFutureTask)f).heapIndex = idx;
727 }
728
729 /**
730 * Sift element added at bottom up to its heap-ordered spot
731 * Call only when holding lock.
732 */
733 private void siftUp(int k, RunnableScheduledFuture key) {
734 while (k > 0) {
735 int parent = (k - 1) >>> 1;
736 RunnableScheduledFuture e = queue[parent];
737 if (key.compareTo(e) >= 0)
738 break;
739 queue[k] = e;
740 setIndex(e, k);
741 k = parent;
742 }
743 queue[k] = key;
744 setIndex(key, k);
745 }
746
747 /**
748 * Sift element added at top down to its heap-ordered spot
749 * Call only when holding lock.
750 */
751 private void siftDown(int k, RunnableScheduledFuture key) {
752 int half = size >>> 1;
753 while (k < half) {
754 int child = (k << 1) + 1;
755 RunnableScheduledFuture c = queue[child];
756 int right = child + 1;
757 if (right < size && c.compareTo(queue[right]) > 0)
758 c = queue[child = right];
759 if (key.compareTo(c) <= 0)
760 break;
761 queue[k] = c;
762 setIndex(c, k);
763 k = child;
764 }
765 queue[k] = key;
766 setIndex(key, k);
767 }
768
769 /**
770 * Performs common bookkeeping for poll and take: Replaces
771 * first element with last; sifts it down, and signals any
772 * waiting consumers. Call only when holding lock.
773 * @param f the task to remove and return
774 */
775 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
776 int s = --size;
777 RunnableScheduledFuture x = queue[s];
778 queue[s] = null;
779 if (s != 0) {
780 siftDown(0, x);
781 available.signalAll();
782 }
783 setIndex(f, -1);
784 return f;
785 }
786
787 /**
788 * Resize the heap array. Call only when holding lock.
789 */
790 private void grow() {
791 int oldCapacity = queue.length;
792 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
793 if (newCapacity < 0) // overflow
794 newCapacity = Integer.MAX_VALUE;
795 queue = Arrays.copyOf(queue, newCapacity);
796 }
797
798 /**
799 * Find index of given object, or -1 if absent
800 */
801 private int indexOf(Object x) {
802 if (x != null) {
803 for (int i = 0; i < size; i++)
804 if (x.equals(queue[i]))
805 return i;
806 }
807 return -1;
808 }
809
810 public boolean remove(Object x) {
811 boolean removed;
812 final ReentrantLock lock = this.lock;
813 lock.lock();
814 try {
815 int i;
816 if (x instanceof ScheduledFutureTask)
817 i = ((ScheduledFutureTask)x).heapIndex;
818 else
819 i = indexOf(x);
820 if (removed = (i >= 0 && i < size && queue[i] == x)) {
821 setIndex(x, -1);
822 int s = --size;
823 RunnableScheduledFuture replacement = queue[s];
824 queue[s] = null;
825 if (s != i) {
826 siftDown(i, replacement);
827 if (queue[i] == replacement)
828 siftUp(i, replacement);
829 }
830 }
831 } finally {
832 lock.unlock();
833 }
834 return removed;
835 }
836
837 public int size() {
838 int s;
839 final ReentrantLock lock = this.lock;
840 lock.lock();
841 try {
842 s = size;
843 } finally {
844 lock.unlock();
845 }
846 return s;
847 }
848
849 public boolean isEmpty() {
850 return size() == 0;
851 }
852
853 public int remainingCapacity() {
854 return Integer.MAX_VALUE;
855 }
856
857 public RunnableScheduledFuture peek() {
858 final ReentrantLock lock = this.lock;
859 lock.lock();
860 try {
861 return queue[0];
862 } finally {
863 lock.unlock();
864 }
865 }
866
867 public boolean offer(Runnable x) {
868 if (x == null)
869 throw new NullPointerException();
870 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
871 final ReentrantLock lock = this.lock;
872 lock.lock();
873 try {
874 int i = size;
875 if (i >= queue.length)
876 grow();
877 size = i + 1;
878 boolean notify;
879 if (i == 0) {
880 notify = true;
881 queue[0] = e;
882 setIndex(e, 0);
883 }
884 else {
885 notify = e.compareTo(queue[0]) < 0;
886 siftUp(i, e);
887 }
888 if (notify)
889 available.signalAll();
890 } finally {
891 lock.unlock();
892 }
893 return true;
894 }
895
896 public void put(Runnable e) {
897 offer(e);
898 }
899
900 public boolean add(Runnable e) {
901 return offer(e);
902 }
903
904 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
905 return offer(e);
906 }
907
908 public RunnableScheduledFuture poll() {
909 final ReentrantLock lock = this.lock;
910 lock.lock();
911 try {
912 RunnableScheduledFuture first = queue[0];
913 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
914 return null;
915 else
916 return finishPoll(first);
917 } finally {
918 lock.unlock();
919 }
920 }
921
922 public RunnableScheduledFuture take() throws InterruptedException {
923 final ReentrantLock lock = this.lock;
924 lock.lockInterruptibly();
925 try {
926 for (;;) {
927 RunnableScheduledFuture first = queue[0];
928 if (first == null)
929 available.await();
930 else {
931 long delay = first.getDelay(TimeUnit.NANOSECONDS);
932 if (delay > 0)
933 available.awaitNanos(delay);
934 else
935 return finishPoll(first);
936 }
937 }
938 } finally {
939 lock.unlock();
940 }
941 }
942
943 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
944 throws InterruptedException {
945 long nanos = unit.toNanos(timeout);
946 final ReentrantLock lock = this.lock;
947 lock.lockInterruptibly();
948 try {
949 for (;;) {
950 RunnableScheduledFuture first = queue[0];
951 if (first == null) {
952 if (nanos <= 0)
953 return null;
954 else
955 nanos = available.awaitNanos(nanos);
956 } else {
957 long delay = first.getDelay(TimeUnit.NANOSECONDS);
958 if (delay > 0) {
959 if (nanos <= 0)
960 return null;
961 if (delay > nanos)
962 delay = nanos;
963 long timeLeft = available.awaitNanos(delay);
964 nanos -= delay - timeLeft;
965 } else
966 return finishPoll(first);
967 }
968 }
969 } finally {
970 lock.unlock();
971 }
972 }
973
974 public void clear() {
975 final ReentrantLock lock = this.lock;
976 lock.lock();
977 try {
978 for (int i = 0; i < size; i++) {
979 RunnableScheduledFuture t = queue[i];
980 if (t != null) {
981 queue[i] = null;
982 setIndex(t, -1);
983 }
984 }
985 size = 0;
986 } finally {
987 lock.unlock();
988 }
989 }
990
991 /**
992 * Return and remove first element only if it is expired.
993 * Used only by drainTo. Call only when holding lock.
994 */
995 private RunnableScheduledFuture pollExpired() {
996 RunnableScheduledFuture first = queue[0];
997 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
998 return null;
999 setIndex(first, -1);
1000 int s = --size;
1001 RunnableScheduledFuture x = queue[s];
1002 queue[s] = null;
1003 if (s != 0)
1004 siftDown(0, x);
1005 return first;
1006 }
1007
1008 public int drainTo(Collection<? super Runnable> c) {
1009 if (c == null)
1010 throw new NullPointerException();
1011 if (c == this)
1012 throw new IllegalArgumentException();
1013 final ReentrantLock lock = this.lock;
1014 lock.lock();
1015 try {
1016 int n = 0;
1017 for (;;) {
1018 RunnableScheduledFuture first = pollExpired();
1019 if (first != null) {
1020 c.add(first);
1021 ++n;
1022 }
1023 else
1024 break;
1025 }
1026 if (n > 0)
1027 available.signalAll();
1028 return n;
1029 } finally {
1030 lock.unlock();
1031 }
1032 }
1033
1034 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1035 if (c == null)
1036 throw new NullPointerException();
1037 if (c == this)
1038 throw new IllegalArgumentException();
1039 if (maxElements <= 0)
1040 return 0;
1041 final ReentrantLock lock = this.lock;
1042 lock.lock();
1043 try {
1044 int n = 0;
1045 while (n < maxElements) {
1046 RunnableScheduledFuture first = pollExpired();
1047 if (first != null) {
1048 c.add(first);
1049 ++n;
1050 }
1051 else
1052 break;
1053 }
1054 if (n > 0)
1055 available.signalAll();
1056 return n;
1057 } finally {
1058 lock.unlock();
1059 }
1060 }
1061
1062 public Object[] toArray() {
1063 final ReentrantLock lock = this.lock;
1064 lock.lock();
1065 try {
1066 return Arrays.copyOf(queue, size);
1067 } finally {
1068 lock.unlock();
1069 }
1070 }
1071
1072 public <T> T[] toArray(T[] a) {
1073 final ReentrantLock lock = this.lock;
1074 lock.lock();
1075 try {
1076 if (a.length < size)
1077 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1078 System.arraycopy(queue, 0, a, 0, size);
1079 if (a.length > size)
1080 a[size] = null;
1081 return a;
1082 } finally {
1083 lock.unlock();
1084 }
1085 }
1086
1087 public Iterator<Runnable> iterator() {
1088 return new Itr(toArray());
1089 }
1090
1091 /**
1092 * Snapshot iterator that works off copy of underlying q array.
1093 */
1094 private class Itr implements Iterator<Runnable> {
1095 final Object[] array; // Array of all elements
1096 int cursor; // index of next element to return;
1097 int lastRet; // index of last element, or -1 if no such
1098
1099 Itr(Object[] array) {
1100 lastRet = -1;
1101 this.array = array;
1102 }
1103
1104 public boolean hasNext() {
1105 return cursor < array.length;
1106 }
1107
1108 public Runnable next() {
1109 if (cursor >= array.length)
1110 throw new NoSuchElementException();
1111 lastRet = cursor;
1112 return (Runnable)array[cursor++];
1113 }
1114
1115 public void remove() {
1116 if (lastRet < 0)
1117 throw new IllegalStateException();
1118 DelayedWorkQueue.this.remove(array[lastRet]);
1119 lastRet = -1;
1120 }
1121 }
1122 }
1123 }