ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.49
Committed: Mon May 19 01:13:14 2008 UTC (16 years ago) by jsr166
Branch: MAIN
Changes since 1.48: +17 -7 lines
Log Message:
6620549: ExecutorService#shutdown should clearly state that it does not block

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p>Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>When a submitted task is cancelled before it is run, execution
27 * is suppressed. By default, such a cancelled task is not
28 * automatically removed from the work queue until its delay
29 * elapses. While this enables further inspection and monitoring, it
30 * may also cause unbounded retention of cancelled tasks. To avoid
31 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32 * causes tasks to be immediately removed from the work queue at
33 * time of cancellation.
34 *
35 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36 * of the inherited tuning methods are not useful for it. In
37 * particular, because it acts as a fixed-sized pool using
38 * {@code corePoolSize} threads and an unbounded queue, adjustments
39 * to {@code maximumPoolSize} have no useful effect. Additionally, it
40 * is almost never a good idea to set {@code corePoolSize} to zero or
41 * use {@code allowCoreThreadTimeOut} because this may leave the pool
42 * without threads to handle tasks once they become eligible to run.
43 *
44 * <p><b>Extension notes:</b> This class overrides the
45 * {@link ThreadPoolExecutor#execute execute} and
46 * {@link AbstractExecutorService#submit(Runnable) submit}
47 * methods to generate internal {@link ScheduledFuture} objects to
48 * control per-task delays and scheduling. To preserve
49 * functionality, any further overrides of these methods in
50 * subclasses must invoke superclass versions, which effectively
51 * disables additional task customization. However, this class
52 * provides alternative protected extension method
53 * {@code decorateTask} (one version each for {@code Runnable} and
54 * {@code Callable}) that can be used to customize the concrete task
55 * types used to execute commands entered via {@code execute},
56 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
57 * and {@code scheduleWithFixedDelay}. By default, a
58 * {@code ScheduledThreadPoolExecutor} uses a task type extending
59 * {@link FutureTask}. However, this may be modified or replaced using
60 * subclasses of the form:
61 *
62 * <pre> {@code
63 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
64 *
65 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
66 *
67 * protected <V> RunnableScheduledFuture<V> decorateTask(
68 * Runnable r, RunnableScheduledFuture<V> task) {
69 * return new CustomTask<V>(r, task);
70 * }
71 *
72 * protected <V> RunnableScheduledFuture<V> decorateTask(
73 * Callable<V> c, RunnableScheduledFuture<V> task) {
74 * return new CustomTask<V>(c, task);
75 * }
76 * // ... add constructors, etc.
77 * }}</pre>
78 *
79 * @since 1.5
80 * @author Doug Lea
81 */
82 public class ScheduledThreadPoolExecutor
83 extends ThreadPoolExecutor
84 implements ScheduledExecutorService {
85
86 /*
87 * This class specializes ThreadPoolExecutor implementation by
88 *
89 * 1. Using a custom task type, ScheduledFutureTask for
90 * tasks, even those that don't require scheduling (i.e.,
91 * those submitted using ExecutorService execute, not
92 * ScheduledExecutorService methods) which are treated as
93 * delayed tasks with a delay of zero.
94 *
95 * 2. Using a custom queue (DelayedWorkQueue), a variant of
96 * unbounded DelayQueue. The lack of capacity constraint and
97 * the fact that corePoolSize and maximumPoolSize are
98 * effectively identical simplifies some execution mechanics
99 * (see delayedExecute) compared to ThreadPoolExecutor.
100 *
101 * 3. Supporting optional run-after-shutdown parameters, which
102 * leads to overrides of shutdown methods to remove and cancel
103 * tasks that should NOT be run after shutdown, as well as
104 * different recheck logic when task (re)submission overlaps
105 * with a shutdown.
106 *
107 * 4. Task decoration methods to allow interception and
108 * instrumentation, which are needed because subclasses cannot
109 * otherwise override submit methods to get this effect. These
110 * don't have any impact on pool control logic though.
111 */
112
113 /**
114 * False if should cancel/suppress periodic tasks on shutdown.
115 */
116 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
117
118 /**
119 * False if should cancel non-periodic tasks on shutdown.
120 */
121 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
122
123 /**
124 * True if ScheduledFutureTask.cancel should remove from queue
125 */
126 private volatile boolean removeOnCancel = false;
127
128 /**
129 * Sequence number to break scheduling ties, and in turn to
130 * guarantee FIFO order among tied entries.
131 */
132 private static final AtomicLong sequencer = new AtomicLong(0);
133
134 /**
135 * Returns current nanosecond time.
136 */
137 final long now() {
138 return System.nanoTime();
139 }
140
141 private class ScheduledFutureTask<V>
142 extends FutureTask<V> implements RunnableScheduledFuture<V> {
143
144 /** Sequence number to break ties FIFO */
145 private final long sequenceNumber;
146
147 /** The time the task is enabled to execute in nanoTime units */
148 private long time;
149
150 /**
151 * Period in nanoseconds for repeating tasks. A positive
152 * value indicates fixed-rate execution. A negative value
153 * indicates fixed-delay execution. A value of 0 indicates a
154 * non-repeating task.
155 */
156 private final long period;
157
158 /** The actual task to be re-enqueued by reExecutePeriodic */
159 RunnableScheduledFuture<V> outerTask = this;
160
161 /**
162 * Index into delay queue, to support faster cancellation.
163 */
164 int heapIndex;
165
166 /**
167 * Creates a one-shot action with given nanoTime-based trigger time.
168 */
169 ScheduledFutureTask(Runnable r, V result, long ns) {
170 super(r, result);
171 this.time = ns;
172 this.period = 0;
173 this.sequenceNumber = sequencer.getAndIncrement();
174 }
175
176 /**
177 * Creates a periodic action with given nano time and period.
178 */
179 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
180 super(r, result);
181 this.time = ns;
182 this.period = period;
183 this.sequenceNumber = sequencer.getAndIncrement();
184 }
185
186 /**
187 * Creates a one-shot action with given nanoTime-based trigger.
188 */
189 ScheduledFutureTask(Callable<V> callable, long ns) {
190 super(callable);
191 this.time = ns;
192 this.period = 0;
193 this.sequenceNumber = sequencer.getAndIncrement();
194 }
195
196 public long getDelay(TimeUnit unit) {
197 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
198 return d;
199 }
200
201 public int compareTo(Delayed other) {
202 if (other == this) // compare zero ONLY if same object
203 return 0;
204 if (other instanceof ScheduledFutureTask) {
205 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
206 long diff = time - x.time;
207 if (diff < 0)
208 return -1;
209 else if (diff > 0)
210 return 1;
211 else if (sequenceNumber < x.sequenceNumber)
212 return -1;
213 else
214 return 1;
215 }
216 long d = (getDelay(TimeUnit.NANOSECONDS) -
217 other.getDelay(TimeUnit.NANOSECONDS));
218 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
219 }
220
221 /**
222 * Returns true if this is a periodic (not a one-shot) action.
223 *
224 * @return true if periodic
225 */
226 public boolean isPeriodic() {
227 return period != 0;
228 }
229
230 /**
231 * Sets the next time to run for a periodic task.
232 */
233 private void setNextRunTime() {
234 long p = period;
235 if (p > 0)
236 time += p;
237 else
238 time = now() - p;
239 }
240
241 public boolean cancel(boolean mayInterruptIfRunning) {
242 boolean cancelled = super.cancel(mayInterruptIfRunning);
243 if (cancelled && removeOnCancel && heapIndex >= 0)
244 remove(this);
245 return cancelled;
246 }
247
248 /**
249 * Overrides FutureTask version so as to reset/requeue if periodic.
250 */
251 public void run() {
252 boolean periodic = isPeriodic();
253 if (!canRunInCurrentRunState(periodic))
254 cancel(false);
255 else if (!periodic)
256 ScheduledFutureTask.super.run();
257 else if (ScheduledFutureTask.super.runAndReset()) {
258 setNextRunTime();
259 reExecutePeriodic(outerTask);
260 }
261 }
262 }
263
264 /**
265 * Returns true if can run a task given current run state
266 * and run-after-shutdown parameters.
267 *
268 * @param periodic true if this task periodic, false if delayed
269 */
270 boolean canRunInCurrentRunState(boolean periodic) {
271 return isRunningOrShutdown(periodic ?
272 continueExistingPeriodicTasksAfterShutdown :
273 executeExistingDelayedTasksAfterShutdown);
274 }
275
276 /**
277 * Main execution method for delayed or periodic tasks. If pool
278 * is shut down, rejects the task. Otherwise adds task to queue
279 * and starts a thread, if necessary, to run it. (We cannot
280 * prestart the thread to run the task because the task (probably)
281 * shouldn't be run yet,) If the pool is shut down while the task
282 * is being added, cancel and remove it if required by state and
283 * run-after-shutdown parameters.
284 *
285 * @param task the task
286 */
287 private void delayedExecute(RunnableScheduledFuture<?> task) {
288 if (isShutdown())
289 reject(task);
290 else {
291 super.getQueue().add(task);
292 if (isShutdown() &&
293 !canRunInCurrentRunState(task.isPeriodic()) &&
294 remove(task))
295 task.cancel(false);
296 else
297 prestartCoreThread();
298 }
299 }
300
301 /**
302 * Requeues a periodic task unless current run state precludes it.
303 * Same idea as delayedExecute except drops task rather than rejecting.
304 *
305 * @param task the task
306 */
307 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
308 if (canRunInCurrentRunState(true)) {
309 super.getQueue().add(task);
310 if (!canRunInCurrentRunState(true) && remove(task))
311 task.cancel(false);
312 else
313 prestartCoreThread();
314 }
315 }
316
317 /**
318 * Cancels and clears the queue of all tasks that should not be run
319 * due to shutdown policy. Invoked within super.shutdown.
320 */
321 @Override void onShutdown() {
322 BlockingQueue<Runnable> q = super.getQueue();
323 boolean keepDelayed =
324 getExecuteExistingDelayedTasksAfterShutdownPolicy();
325 boolean keepPeriodic =
326 getContinueExistingPeriodicTasksAfterShutdownPolicy();
327 if (!keepDelayed && !keepPeriodic)
328 q.clear();
329 else {
330 // Traverse snapshot to avoid iterator exceptions
331 for (Object e : q.toArray()) {
332 if (e instanceof RunnableScheduledFuture) {
333 RunnableScheduledFuture<?> t =
334 (RunnableScheduledFuture<?>)e;
335 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
336 t.isCancelled()) { // also remove if already cancelled
337 if (q.remove(t))
338 t.cancel(false);
339 }
340 }
341 }
342 }
343 tryTerminate();
344 }
345
346 /**
347 * Modifies or replaces the task used to execute a runnable.
348 * This method can be used to override the concrete
349 * class used for managing internal tasks.
350 * The default implementation simply returns the given task.
351 *
352 * @param runnable the submitted Runnable
353 * @param task the task created to execute the runnable
354 * @return a task that can execute the runnable
355 * @since 1.6
356 */
357 protected <V> RunnableScheduledFuture<V> decorateTask(
358 Runnable runnable, RunnableScheduledFuture<V> task) {
359 return task;
360 }
361
362 /**
363 * Modifies or replaces the task used to execute a callable.
364 * This method can be used to override the concrete
365 * class used for managing internal tasks.
366 * The default implementation simply returns the given task.
367 *
368 * @param callable the submitted Callable
369 * @param task the task created to execute the callable
370 * @return a task that can execute the callable
371 * @since 1.6
372 */
373 protected <V> RunnableScheduledFuture<V> decorateTask(
374 Callable<V> callable, RunnableScheduledFuture<V> task) {
375 return task;
376 }
377
378 /**
379 * Creates a new {@code ScheduledThreadPoolExecutor} with the
380 * given core pool size.
381 *
382 * @param corePoolSize the number of threads to keep in the pool, even
383 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
384 * @throws IllegalArgumentException if {@code corePoolSize < 0}
385 */
386 public ScheduledThreadPoolExecutor(int corePoolSize) {
387 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
388 new DelayedWorkQueue());
389 }
390
391 /**
392 * Creates a new {@code ScheduledThreadPoolExecutor} with the
393 * given initial parameters.
394 *
395 * @param corePoolSize the number of threads to keep in the pool, even
396 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
397 * @param threadFactory the factory to use when the executor
398 * creates a new thread
399 * @throws IllegalArgumentException if {@code corePoolSize < 0}
400 * @throws NullPointerException if {@code threadFactory} is null
401 */
402 public ScheduledThreadPoolExecutor(int corePoolSize,
403 ThreadFactory threadFactory) {
404 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
405 new DelayedWorkQueue(), threadFactory);
406 }
407
408 /**
409 * Creates a new ScheduledThreadPoolExecutor with the given
410 * initial parameters.
411 *
412 * @param corePoolSize the number of threads to keep in the pool, even
413 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
414 * @param handler the handler to use when execution is blocked
415 * because the thread bounds and queue capacities are reached
416 * @throws IllegalArgumentException if {@code corePoolSize < 0}
417 * @throws NullPointerException if {@code handler} is null
418 */
419 public ScheduledThreadPoolExecutor(int corePoolSize,
420 RejectedExecutionHandler handler) {
421 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
422 new DelayedWorkQueue(), handler);
423 }
424
425 /**
426 * Creates a new ScheduledThreadPoolExecutor with the given
427 * initial parameters.
428 *
429 * @param corePoolSize the number of threads to keep in the pool, even
430 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431 * @param threadFactory the factory to use when the executor
432 * creates a new thread
433 * @param handler the handler to use when execution is blocked
434 * because the thread bounds and queue capacities are reached
435 * @throws IllegalArgumentException if {@code corePoolSize < 0}
436 * @throws NullPointerException if {@code threadFactory} or
437 * {@code handler} is null
438 */
439 public ScheduledThreadPoolExecutor(int corePoolSize,
440 ThreadFactory threadFactory,
441 RejectedExecutionHandler handler) {
442 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
443 new DelayedWorkQueue(), threadFactory, handler);
444 }
445
446 /**
447 * @throws RejectedExecutionException {@inheritDoc}
448 * @throws NullPointerException {@inheritDoc}
449 */
450 public ScheduledFuture<?> schedule(Runnable command,
451 long delay,
452 TimeUnit unit) {
453 if (command == null || unit == null)
454 throw new NullPointerException();
455 if (delay < 0) delay = 0;
456 long triggerTime = now() + unit.toNanos(delay);
457 RunnableScheduledFuture<?> t = decorateTask(command,
458 new ScheduledFutureTask<Void>(command, null, triggerTime));
459 delayedExecute(t);
460 return t;
461 }
462
463 /**
464 * @throws RejectedExecutionException {@inheritDoc}
465 * @throws NullPointerException {@inheritDoc}
466 */
467 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
468 long delay,
469 TimeUnit unit) {
470 if (callable == null || unit == null)
471 throw new NullPointerException();
472 if (delay < 0) delay = 0;
473 long triggerTime = now() + unit.toNanos(delay);
474 RunnableScheduledFuture<V> t = decorateTask(callable,
475 new ScheduledFutureTask<V>(callable, triggerTime));
476 delayedExecute(t);
477 return t;
478 }
479
480 /**
481 * @throws RejectedExecutionException {@inheritDoc}
482 * @throws NullPointerException {@inheritDoc}
483 * @throws IllegalArgumentException {@inheritDoc}
484 */
485 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
486 long initialDelay,
487 long period,
488 TimeUnit unit) {
489 if (command == null || unit == null)
490 throw new NullPointerException();
491 if (period <= 0)
492 throw new IllegalArgumentException();
493 if (initialDelay < 0) initialDelay = 0;
494 long triggerTime = now() + unit.toNanos(initialDelay);
495 ScheduledFutureTask<Void> sft =
496 new ScheduledFutureTask<Void>(command,
497 null,
498 triggerTime,
499 unit.toNanos(period));
500 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
501 sft.outerTask = t;
502 delayedExecute(t);
503 return t;
504 }
505
506 /**
507 * @throws RejectedExecutionException {@inheritDoc}
508 * @throws NullPointerException {@inheritDoc}
509 * @throws IllegalArgumentException {@inheritDoc}
510 */
511 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
512 long initialDelay,
513 long delay,
514 TimeUnit unit) {
515 if (command == null || unit == null)
516 throw new NullPointerException();
517 if (delay <= 0)
518 throw new IllegalArgumentException();
519 if (initialDelay < 0) initialDelay = 0;
520 long triggerTime = now() + unit.toNanos(initialDelay);
521 ScheduledFutureTask<Void> sft =
522 new ScheduledFutureTask<Void>(command,
523 null,
524 triggerTime,
525 unit.toNanos(-delay));
526 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
527 sft.outerTask = t;
528 delayedExecute(t);
529 return t;
530 }
531
532 /**
533 * Executes {@code command} with zero required delay.
534 * This has effect equivalent to
535 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
536 * Note that inspections of the queue and of the list returned by
537 * {@code shutdownNow} will access the zero-delayed
538 * {@link ScheduledFuture}, not the {@code command} itself.
539 *
540 * <p>A consequence of the use of {@code ScheduledFuture} objects is
541 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
542 * called with a null second {@code Throwable} argument, even if the
543 * {@code command} terminated abruptly. Instead, the {@code Throwable}
544 * thrown by such a task can be obtained via {@link Future#get}.
545 *
546 * @throws RejectedExecutionException at discretion of
547 * {@code RejectedExecutionHandler}, if the task
548 * cannot be accepted for execution because the
549 * executor has been shut down
550 * @throws NullPointerException {@inheritDoc}
551 */
552 public void execute(Runnable command) {
553 schedule(command, 0, TimeUnit.NANOSECONDS);
554 }
555
556 // Override AbstractExecutorService methods
557
558 /**
559 * @throws RejectedExecutionException {@inheritDoc}
560 * @throws NullPointerException {@inheritDoc}
561 */
562 public Future<?> submit(Runnable task) {
563 return schedule(task, 0, TimeUnit.NANOSECONDS);
564 }
565
566 /**
567 * @throws RejectedExecutionException {@inheritDoc}
568 * @throws NullPointerException {@inheritDoc}
569 */
570 public <T> Future<T> submit(Runnable task, T result) {
571 return schedule(Executors.callable(task, result),
572 0, TimeUnit.NANOSECONDS);
573 }
574
575 /**
576 * @throws RejectedExecutionException {@inheritDoc}
577 * @throws NullPointerException {@inheritDoc}
578 */
579 public <T> Future<T> submit(Callable<T> task) {
580 return schedule(task, 0, TimeUnit.NANOSECONDS);
581 }
582
583 /**
584 * Sets the policy on whether to continue executing existing
585 * periodic tasks even when this executor has been {@code shutdown}.
586 * In this case, these tasks will only terminate upon
587 * {@code shutdownNow} or after setting the policy to
588 * {@code false} when already shutdown.
589 * This value is by default {@code false}.
590 *
591 * @param value if {@code true}, continue after shutdown, else don't.
592 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
593 */
594 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
595 continueExistingPeriodicTasksAfterShutdown = value;
596 if (!value && isShutdown())
597 onShutdown();
598 }
599
600 /**
601 * Gets the policy on whether to continue executing existing
602 * periodic tasks even when this executor has been {@code shutdown}.
603 * In this case, these tasks will only terminate upon
604 * {@code shutdownNow} or after setting the policy to
605 * {@code false} when already shutdown.
606 * This value is by default {@code false}.
607 *
608 * @return {@code true} if will continue after shutdown
609 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
610 */
611 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
612 return continueExistingPeriodicTasksAfterShutdown;
613 }
614
615 /**
616 * Sets the policy on whether to execute existing delayed
617 * tasks even when this executor has been {@code shutdown}.
618 * In this case, these tasks will only terminate upon
619 * {@code shutdownNow}, or after setting the policy to
620 * {@code false} when already shutdown.
621 * This value is by default {@code true}.
622 *
623 * @param value if {@code true}, execute after shutdown, else don't.
624 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
625 */
626 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
627 executeExistingDelayedTasksAfterShutdown = value;
628 if (!value && isShutdown())
629 onShutdown();
630 }
631
632 /**
633 * Gets the policy on whether to execute existing delayed
634 * tasks even when this executor has been {@code shutdown}.
635 * In this case, these tasks will only terminate upon
636 * {@code shutdownNow}, or after setting the policy to
637 * {@code false} when already shutdown.
638 * This value is by default {@code true}.
639 *
640 * @return {@code true} if will execute after shutdown
641 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
642 */
643 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
644 return executeExistingDelayedTasksAfterShutdown;
645 }
646
647 /**
648 * Sets the policy on whether cancelled tasks should be immediately
649 * removed from the work queue at time of cancellation. This value is
650 * by default {@code false}.
651 *
652 * @param value if {@code true}, remove on cancellation, else don't
653 * @see #getRemoveOnCancelPolicy
654 * @since 1.7
655 */
656 public void setRemoveOnCancelPolicy(boolean value) {
657 removeOnCancel = value;
658 }
659
660 /**
661 * Gets the policy on whether cancelled tasks should be immediately
662 * removed from the work queue at time of cancellation. This value is
663 * by default {@code false}.
664 *
665 * @return {@code true} if cancelled tasks are immediately removed
666 * from the queue
667 * @see #setRemoveOnCancelPolicy
668 * @since 1.7
669 */
670 public boolean getRemoveOnCancelPolicy() {
671 return removeOnCancel;
672 }
673
674 /**
675 * Initiates an orderly shutdown in which previously submitted
676 * tasks are executed, but no new tasks will be accepted.
677 * Invocation has no additional effect if already shut down.
678 *
679 * <p>This method does not wait for previously submitted tasks to
680 * complete execution. Use {@link #awaitTermination awaitTermination}
681 * to do that.
682 *
683 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
684 * has been set {@code false}, existing delayed tasks whose delays
685 * have not yet elapsed are cancelled. And unless the {@code
686 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
687 * {@code true}, future executions of existing periodic tasks will
688 * be cancelled.
689 *
690 * @throws SecurityException {@inheritDoc}
691 */
692 public void shutdown() {
693 super.shutdown();
694 }
695
696 /**
697 * Attempts to stop all actively executing tasks, halts the
698 * processing of waiting tasks, and returns a list of the tasks
699 * that were awaiting execution.
700 *
701 * <p>This method does not wait for actively executing tasks to
702 * terminate. Use {@link #awaitTermination awaitTermination} to
703 * do that.
704 *
705 * <p>There are no guarantees beyond best-effort attempts to stop
706 * processing actively executing tasks. This implementation
707 * cancels tasks via {@link Thread#interrupt}, so any task that
708 * fails to respond to interrupts may never terminate.
709 *
710 * @return list of tasks that never commenced execution.
711 * Each element of this list is a {@link ScheduledFuture},
712 * including those tasks submitted using {@code execute},
713 * which are for scheduling purposes used as the basis of a
714 * zero-delay {@code ScheduledFuture}.
715 * @throws SecurityException {@inheritDoc}
716 */
717 public List<Runnable> shutdownNow() {
718 return super.shutdownNow();
719 }
720
721 /**
722 * Returns the task queue used by this executor. Each element of
723 * this queue is a {@link ScheduledFuture}, including those
724 * tasks submitted using {@code execute} which are for scheduling
725 * purposes used as the basis of a zero-delay
726 * {@code ScheduledFuture}. Iteration over this queue is
727 * <em>not</em> guaranteed to traverse tasks in the order in
728 * which they will execute.
729 *
730 * @return the task queue
731 */
732 public BlockingQueue<Runnable> getQueue() {
733 return super.getQueue();
734 }
735
736 /**
737 * Specialized delay queue. To mesh with TPE declarations, this
738 * class must be declared as a BlockingQueue<Runnable> even though
739 * it can only hold RunnableScheduledFutures.
740 */
741 static class DelayedWorkQueue extends AbstractQueue<Runnable>
742 implements BlockingQueue<Runnable> {
743
744 /*
745 * A DelayedWorkQueue is based on a heap-based data structure
746 * like those in DelayQueue and PriorityQueue, except that
747 * every ScheduledFutureTask also records its index into the
748 * heap array. This eliminates the need to find a task upon
749 * cancellation, greatly speeding up removal (down from O(n)
750 * to O(log n)), and reducing garbage retention that would
751 * otherwise occur by waiting for the element to rise to top
752 * before clearing. But because the queue may also hold
753 * RunnableScheduledFutures that are not ScheduledFutureTasks,
754 * we are not guaranteed to have such indices available, in
755 * which case we fall back to linear search. (We expect that
756 * most tasks will not be decorated, and that the faster cases
757 * will be much more common.)
758 *
759 * All heap operations must record index changes -- mainly
760 * within siftUp and siftDown. Upon removal, a task's
761 * heapIndex is set to -1. Note that ScheduledFutureTasks can
762 * appear at most once in the queue (this need not be true for
763 * other kinds of tasks or work queues), so are uniquely
764 * identified by heapIndex.
765 */
766
767 private static final int INITIAL_CAPACITY = 16;
768 private RunnableScheduledFuture[] queue =
769 new RunnableScheduledFuture[INITIAL_CAPACITY];
770 private final ReentrantLock lock = new ReentrantLock();
771 private int size = 0;
772
773 /**
774 * Thread designated to wait for the task at the head of the
775 * queue. This variant of the Leader-Follower pattern
776 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
777 * minimize unnecessary timed waiting. When a thread becomes
778 * the leader, it waits only for the next delay to elapse, but
779 * other threads await indefinitely. The leader thread must
780 * signal some other thread before returning from take() or
781 * poll(...), unless some other thread becomes leader in the
782 * interim. Whenever the head of the queue is replaced with a
783 * task with an earlier expiration time, the leader field is
784 * invalidated by being reset to null, and some waiting
785 * thread, but not necessarily the current leader, is
786 * signalled. So waiting threads must be prepared to acquire
787 * and lose leadership while waiting.
788 */
789 private Thread leader = null;
790
791 /**
792 * Condition signalled when a newer task becomes available at the
793 * head of the queue or a new thread may need to become leader.
794 */
795 private final Condition available = lock.newCondition();
796
797 /**
798 * Set f's heapIndex if it is a ScheduledFutureTask.
799 */
800 private void setIndex(RunnableScheduledFuture f, int idx) {
801 if (f instanceof ScheduledFutureTask)
802 ((ScheduledFutureTask)f).heapIndex = idx;
803 }
804
805 /**
806 * Sift element added at bottom up to its heap-ordered spot.
807 * Call only when holding lock.
808 */
809 private void siftUp(int k, RunnableScheduledFuture key) {
810 while (k > 0) {
811 int parent = (k - 1) >>> 1;
812 RunnableScheduledFuture e = queue[parent];
813 if (key.compareTo(e) >= 0)
814 break;
815 queue[k] = e;
816 setIndex(e, k);
817 k = parent;
818 }
819 queue[k] = key;
820 setIndex(key, k);
821 }
822
823 /**
824 * Sift element added at top down to its heap-ordered spot.
825 * Call only when holding lock.
826 */
827 private void siftDown(int k, RunnableScheduledFuture key) {
828 int half = size >>> 1;
829 while (k < half) {
830 int child = (k << 1) + 1;
831 RunnableScheduledFuture c = queue[child];
832 int right = child + 1;
833 if (right < size && c.compareTo(queue[right]) > 0)
834 c = queue[child = right];
835 if (key.compareTo(c) <= 0)
836 break;
837 queue[k] = c;
838 setIndex(c, k);
839 k = child;
840 }
841 queue[k] = key;
842 setIndex(key, k);
843 }
844
845 /**
846 * Resize the heap array. Call only when holding lock.
847 */
848 private void grow() {
849 int oldCapacity = queue.length;
850 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
851 if (newCapacity < 0) // overflow
852 newCapacity = Integer.MAX_VALUE;
853 queue = Arrays.copyOf(queue, newCapacity);
854 }
855
856 /**
857 * Find index of given object, or -1 if absent
858 */
859 private int indexOf(Object x) {
860 if (x != null) {
861 if (x instanceof ScheduledFutureTask) {
862 int i = ((ScheduledFutureTask) x).heapIndex;
863 // Sanity check; x could conceivably be a
864 // ScheduledFutureTask from some other pool.
865 if (i >= 0 && i < size && queue[i] == x)
866 return i;
867 } else {
868 for (int i = 0; i < size; i++)
869 if (x.equals(queue[i]))
870 return i;
871 }
872 }
873 return -1;
874 }
875
876 public boolean contains(Object x) {
877 final ReentrantLock lock = this.lock;
878 lock.lock();
879 try {
880 return indexOf(x) != -1;
881 } finally {
882 lock.unlock();
883 }
884 }
885
886 public boolean remove(Object x) {
887 final ReentrantLock lock = this.lock;
888 lock.lock();
889 try {
890 int i = indexOf(x);
891 if (i < 0)
892 return false;
893
894 setIndex(queue[i], -1);
895 int s = --size;
896 RunnableScheduledFuture replacement = queue[s];
897 queue[s] = null;
898 if (s != i) {
899 siftDown(i, replacement);
900 if (queue[i] == replacement)
901 siftUp(i, replacement);
902 }
903 return true;
904 } finally {
905 lock.unlock();
906 }
907 }
908
909 public int size() {
910 final ReentrantLock lock = this.lock;
911 lock.lock();
912 try {
913 return size;
914 } finally {
915 lock.unlock();
916 }
917 }
918
919 public boolean isEmpty() {
920 return size() == 0;
921 }
922
923 public int remainingCapacity() {
924 return Integer.MAX_VALUE;
925 }
926
927 public RunnableScheduledFuture peek() {
928 final ReentrantLock lock = this.lock;
929 lock.lock();
930 try {
931 return queue[0];
932 } finally {
933 lock.unlock();
934 }
935 }
936
937 public boolean offer(Runnable x) {
938 if (x == null)
939 throw new NullPointerException();
940 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
941 final ReentrantLock lock = this.lock;
942 lock.lock();
943 try {
944 int i = size;
945 if (i >= queue.length)
946 grow();
947 size = i + 1;
948 if (i == 0) {
949 queue[0] = e;
950 setIndex(e, 0);
951 } else {
952 siftUp(i, e);
953 }
954 if (queue[0] == e) {
955 leader = null;
956 available.signal();
957 }
958 } finally {
959 lock.unlock();
960 }
961 return true;
962 }
963
964 public void put(Runnable e) {
965 offer(e);
966 }
967
968 public boolean add(Runnable e) {
969 return offer(e);
970 }
971
972 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
973 return offer(e);
974 }
975
976 /**
977 * Performs common bookkeeping for poll and take: Replaces
978 * first element with last and sifts it down. Call only when
979 * holding lock.
980 * @param f the task to remove and return
981 */
982 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
983 int s = --size;
984 RunnableScheduledFuture x = queue[s];
985 queue[s] = null;
986 if (s != 0)
987 siftDown(0, x);
988 setIndex(f, -1);
989 return f;
990 }
991
992 public RunnableScheduledFuture poll() {
993 final ReentrantLock lock = this.lock;
994 lock.lock();
995 try {
996 RunnableScheduledFuture first = queue[0];
997 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
998 return null;
999 else
1000 return finishPoll(first);
1001 } finally {
1002 lock.unlock();
1003 }
1004 }
1005
1006 public RunnableScheduledFuture take() throws InterruptedException {
1007 final ReentrantLock lock = this.lock;
1008 lock.lockInterruptibly();
1009 try {
1010 for (;;) {
1011 RunnableScheduledFuture first = queue[0];
1012 if (first == null)
1013 available.await();
1014 else {
1015 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1016 if (delay <= 0)
1017 return finishPoll(first);
1018 else if (leader != null)
1019 available.await();
1020 else {
1021 Thread thisThread = Thread.currentThread();
1022 leader = thisThread;
1023 try {
1024 available.awaitNanos(delay);
1025 } finally {
1026 if (leader == thisThread)
1027 leader = null;
1028 }
1029 }
1030 }
1031 }
1032 } finally {
1033 if (leader == null && queue[0] != null)
1034 available.signal();
1035 lock.unlock();
1036 }
1037 }
1038
1039 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1040 throws InterruptedException {
1041 long nanos = unit.toNanos(timeout);
1042 final ReentrantLock lock = this.lock;
1043 lock.lockInterruptibly();
1044 try {
1045 for (;;) {
1046 RunnableScheduledFuture first = queue[0];
1047 if (first == null) {
1048 if (nanos <= 0)
1049 return null;
1050 else
1051 nanos = available.awaitNanos(nanos);
1052 } else {
1053 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1054 if (delay <= 0)
1055 return finishPoll(first);
1056 if (nanos <= 0)
1057 return null;
1058 if (nanos < delay || leader != null)
1059 nanos = available.awaitNanos(nanos);
1060 else {
1061 Thread thisThread = Thread.currentThread();
1062 leader = thisThread;
1063 try {
1064 long timeLeft = available.awaitNanos(delay);
1065 nanos -= delay - timeLeft;
1066 } finally {
1067 if (leader == thisThread)
1068 leader = null;
1069 }
1070 }
1071 }
1072 }
1073 } finally {
1074 if (leader == null && queue[0] != null)
1075 available.signal();
1076 lock.unlock();
1077 }
1078 }
1079
1080 public void clear() {
1081 final ReentrantLock lock = this.lock;
1082 lock.lock();
1083 try {
1084 for (int i = 0; i < size; i++) {
1085 RunnableScheduledFuture t = queue[i];
1086 if (t != null) {
1087 queue[i] = null;
1088 setIndex(t, -1);
1089 }
1090 }
1091 size = 0;
1092 } finally {
1093 lock.unlock();
1094 }
1095 }
1096
1097 /**
1098 * Return and remove first element only if it is expired.
1099 * Used only by drainTo. Call only when holding lock.
1100 */
1101 private RunnableScheduledFuture pollExpired() {
1102 RunnableScheduledFuture first = queue[0];
1103 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1104 return null;
1105 return finishPoll(first);
1106 }
1107
1108 public int drainTo(Collection<? super Runnable> c) {
1109 if (c == null)
1110 throw new NullPointerException();
1111 if (c == this)
1112 throw new IllegalArgumentException();
1113 final ReentrantLock lock = this.lock;
1114 lock.lock();
1115 try {
1116 RunnableScheduledFuture first;
1117 int n = 0;
1118 while ((first = pollExpired()) != null) {
1119 c.add(first);
1120 ++n;
1121 }
1122 return n;
1123 } finally {
1124 lock.unlock();
1125 }
1126 }
1127
1128 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1129 if (c == null)
1130 throw new NullPointerException();
1131 if (c == this)
1132 throw new IllegalArgumentException();
1133 if (maxElements <= 0)
1134 return 0;
1135 final ReentrantLock lock = this.lock;
1136 lock.lock();
1137 try {
1138 RunnableScheduledFuture first;
1139 int n = 0;
1140 while (n < maxElements && (first = pollExpired()) != null) {
1141 c.add(first);
1142 ++n;
1143 }
1144 return n;
1145 } finally {
1146 lock.unlock();
1147 }
1148 }
1149
1150 public Object[] toArray() {
1151 final ReentrantLock lock = this.lock;
1152 lock.lock();
1153 try {
1154 return Arrays.copyOf(queue, size, Object[].class);
1155 } finally {
1156 lock.unlock();
1157 }
1158 }
1159
1160 @SuppressWarnings("unchecked")
1161 public <T> T[] toArray(T[] a) {
1162 final ReentrantLock lock = this.lock;
1163 lock.lock();
1164 try {
1165 if (a.length < size)
1166 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1167 System.arraycopy(queue, 0, a, 0, size);
1168 if (a.length > size)
1169 a[size] = null;
1170 return a;
1171 } finally {
1172 lock.unlock();
1173 }
1174 }
1175
1176 public Iterator<Runnable> iterator() {
1177 return new Itr(Arrays.copyOf(queue, size));
1178 }
1179
1180 /**
1181 * Snapshot iterator that works off copy of underlying q array.
1182 */
1183 private class Itr implements Iterator<Runnable> {
1184 final RunnableScheduledFuture[] array;
1185 int cursor = 0; // index of next element to return
1186 int lastRet = -1; // index of last element, or -1 if no such
1187
1188 Itr(RunnableScheduledFuture[] array) {
1189 this.array = array;
1190 }
1191
1192 public boolean hasNext() {
1193 return cursor < array.length;
1194 }
1195
1196 public Runnable next() {
1197 if (cursor >= array.length)
1198 throw new NoSuchElementException();
1199 lastRet = cursor;
1200 return array[cursor++];
1201 }
1202
1203 public void remove() {
1204 if (lastRet < 0)
1205 throw new IllegalStateException();
1206 DelayedWorkQueue.this.remove(array[lastRet]);
1207 lastRet = -1;
1208 }
1209 }
1210 }
1211 }