ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.72
Committed: Thu Jul 18 17:13:42 2013 UTC (10 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.71: +2 -0 lines
Log Message:
doclint warning fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.atomic.AtomicLong;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13
14 /**
15 * A {@link ThreadPoolExecutor} that can additionally schedule
16 * commands to run after a given delay, or to execute
17 * periodically. This class is preferable to {@link java.util.Timer}
18 * when multiple worker threads are needed, or when the additional
19 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
20 * this class extends) are required.
21 *
22 * <p>Delayed tasks execute no sooner than they are enabled, but
23 * without any real-time guarantees about when, after they are
24 * enabled, they will commence. Tasks scheduled for exactly the same
25 * execution time are enabled in first-in-first-out (FIFO) order of
26 * submission.
27 *
28 * <p>When a submitted task is cancelled before it is run, execution
29 * is suppressed. By default, such a cancelled task is not
30 * automatically removed from the work queue until its delay
31 * elapses. While this enables further inspection and monitoring, it
32 * may also cause unbounded retention of cancelled tasks. To avoid
33 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
34 * causes tasks to be immediately removed from the work queue at
35 * time of cancellation.
36 *
37 * <p>Successive executions of a task scheduled via
38 * {@code scheduleAtFixedRate} or
39 * {@code scheduleWithFixedDelay} do not overlap. While different
40 * executions may be performed by different threads, the effects of
41 * prior executions <a
42 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
43 * those of subsequent ones.
44 *
45 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46 * of the inherited tuning methods are not useful for it. In
47 * particular, because it acts as a fixed-sized pool using
48 * {@code corePoolSize} threads and an unbounded queue, adjustments
49 * to {@code maximumPoolSize} have no useful effect. Additionally, it
50 * is almost never a good idea to set {@code corePoolSize} to zero or
51 * use {@code allowCoreThreadTimeOut} because this may leave the pool
52 * without threads to handle tasks once they become eligible to run.
53 *
54 * <p><b>Extension notes:</b> This class overrides the
55 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
56 * {@link AbstractExecutorService#submit(Runnable) submit}
57 * methods to generate internal {@link ScheduledFuture} objects to
58 * control per-task delays and scheduling. To preserve
59 * functionality, any further overrides of these methods in
60 * subclasses must invoke superclass versions, which effectively
61 * disables additional task customization. However, this class
62 * provides alternative protected extension method
63 * {@code decorateTask} (one version each for {@code Runnable} and
64 * {@code Callable}) that can be used to customize the concrete task
65 * types used to execute commands entered via {@code execute},
66 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
67 * and {@code scheduleWithFixedDelay}. By default, a
68 * {@code ScheduledThreadPoolExecutor} uses a task type extending
69 * {@link FutureTask}. However, this may be modified or replaced using
70 * subclasses of the form:
71 *
72 * <pre> {@code
73 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
74 *
75 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
76 *
77 * protected <V> RunnableScheduledFuture<V> decorateTask(
78 * Runnable r, RunnableScheduledFuture<V> task) {
79 * return new CustomTask<V>(r, task);
80 * }
81 *
82 * protected <V> RunnableScheduledFuture<V> decorateTask(
83 * Callable<V> c, RunnableScheduledFuture<V> task) {
84 * return new CustomTask<V>(c, task);
85 * }
86 * // ... add constructors, etc.
87 * }}</pre>
88 *
89 * @since 1.5
90 * @author Doug Lea
91 */
92 public class ScheduledThreadPoolExecutor
93 extends ThreadPoolExecutor
94 implements ScheduledExecutorService {
95
96 /*
97 * This class specializes ThreadPoolExecutor implementation by
98 *
99 * 1. Using a custom task type, ScheduledFutureTask for
100 * tasks, even those that don't require scheduling (i.e.,
101 * those submitted using ExecutorService execute, not
102 * ScheduledExecutorService methods) which are treated as
103 * delayed tasks with a delay of zero.
104 *
105 * 2. Using a custom queue (DelayedWorkQueue), a variant of
106 * unbounded DelayQueue. The lack of capacity constraint and
107 * the fact that corePoolSize and maximumPoolSize are
108 * effectively identical simplifies some execution mechanics
109 * (see delayedExecute) compared to ThreadPoolExecutor.
110 *
111 * 3. Supporting optional run-after-shutdown parameters, which
112 * leads to overrides of shutdown methods to remove and cancel
113 * tasks that should NOT be run after shutdown, as well as
114 * different recheck logic when task (re)submission overlaps
115 * with a shutdown.
116 *
117 * 4. Task decoration methods to allow interception and
118 * instrumentation, which are needed because subclasses cannot
119 * otherwise override submit methods to get this effect. These
120 * don't have any impact on pool control logic though.
121 */
122
123 /**
124 * False if should cancel/suppress periodic tasks on shutdown.
125 */
126 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
127
128 /**
129 * False if should cancel non-periodic tasks on shutdown.
130 */
131 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
132
133 /**
134 * True if ScheduledFutureTask.cancel should remove from queue
135 */
136 private volatile boolean removeOnCancel = false;
137
138 /**
139 * Sequence number to break scheduling ties, and in turn to
140 * guarantee FIFO order among tied entries.
141 */
142 private static final AtomicLong sequencer = new AtomicLong();
143
144 /**
145 * Returns current nanosecond time.
146 */
147 final long now() {
148 return System.nanoTime();
149 }
150
151 private class ScheduledFutureTask<V>
152 extends FutureTask<V> implements RunnableScheduledFuture<V> {
153
154 /** Sequence number to break ties FIFO */
155 private final long sequenceNumber;
156
157 /** The time the task is enabled to execute in nanoTime units */
158 private long time;
159
160 /**
161 * Period in nanoseconds for repeating tasks. A positive
162 * value indicates fixed-rate execution. A negative value
163 * indicates fixed-delay execution. A value of 0 indicates a
164 * non-repeating task.
165 */
166 private final long period;
167
168 /** The actual task to be re-enqueued by reExecutePeriodic */
169 RunnableScheduledFuture<V> outerTask = this;
170
171 /**
172 * Index into delay queue, to support faster cancellation.
173 */
174 int heapIndex;
175
176 /**
177 * Creates a one-shot action with given nanoTime-based trigger time.
178 */
179 ScheduledFutureTask(Runnable r, V result, long ns) {
180 super(r, result);
181 this.time = ns;
182 this.period = 0;
183 this.sequenceNumber = sequencer.getAndIncrement();
184 }
185
186 /**
187 * Creates a periodic action with given nano time and period.
188 */
189 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
190 super(r, result);
191 this.time = ns;
192 this.period = period;
193 this.sequenceNumber = sequencer.getAndIncrement();
194 }
195
196 /**
197 * Creates a one-shot action with given nanoTime-based trigger time.
198 */
199 ScheduledFutureTask(Callable<V> callable, long ns) {
200 super(callable);
201 this.time = ns;
202 this.period = 0;
203 this.sequenceNumber = sequencer.getAndIncrement();
204 }
205
206 public long getDelay(TimeUnit unit) {
207 return unit.convert(time - now(), NANOSECONDS);
208 }
209
210 public int compareTo(Delayed other) {
211 if (other == this) // compare zero if same object
212 return 0;
213 if (other instanceof ScheduledFutureTask) {
214 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
215 long diff = time - x.time;
216 if (diff < 0)
217 return -1;
218 else if (diff > 0)
219 return 1;
220 else if (sequenceNumber < x.sequenceNumber)
221 return -1;
222 else
223 return 1;
224 }
225 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
226 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
227 }
228
229 /**
230 * Returns {@code true} if this is a periodic (not a one-shot) action.
231 *
232 * @return {@code true} if periodic
233 */
234 public boolean isPeriodic() {
235 return period != 0;
236 }
237
238 /**
239 * Sets the next time to run for a periodic task.
240 */
241 private void setNextRunTime() {
242 long p = period;
243 if (p > 0)
244 time += p;
245 else
246 time = triggerTime(-p);
247 }
248
249 public boolean cancel(boolean mayInterruptIfRunning) {
250 boolean cancelled = super.cancel(mayInterruptIfRunning);
251 if (cancelled && removeOnCancel && heapIndex >= 0)
252 remove(this);
253 return cancelled;
254 }
255
256 /**
257 * Overrides FutureTask version so as to reset/requeue if periodic.
258 */
259 public void run() {
260 boolean periodic = isPeriodic();
261 if (!canRunInCurrentRunState(periodic))
262 cancel(false);
263 else if (!periodic)
264 ScheduledFutureTask.super.run();
265 else if (ScheduledFutureTask.super.runAndReset()) {
266 setNextRunTime();
267 reExecutePeriodic(outerTask);
268 }
269 }
270 }
271
272 /**
273 * Returns true if can run a task given current run state
274 * and run-after-shutdown parameters.
275 *
276 * @param periodic true if this task periodic, false if delayed
277 */
278 boolean canRunInCurrentRunState(boolean periodic) {
279 return isRunningOrShutdown(periodic ?
280 continueExistingPeriodicTasksAfterShutdown :
281 executeExistingDelayedTasksAfterShutdown);
282 }
283
284 /**
285 * Main execution method for delayed or periodic tasks. If pool
286 * is shut down, rejects the task. Otherwise adds task to queue
287 * and starts a thread, if necessary, to run it. (We cannot
288 * prestart the thread to run the task because the task (probably)
289 * shouldn't be run yet.) If the pool is shut down while the task
290 * is being added, cancel and remove it if required by state and
291 * run-after-shutdown parameters.
292 *
293 * @param task the task
294 */
295 private void delayedExecute(RunnableScheduledFuture<?> task) {
296 if (isShutdown())
297 reject(task);
298 else {
299 super.getQueue().add(task);
300 if (isShutdown() &&
301 !canRunInCurrentRunState(task.isPeriodic()) &&
302 remove(task))
303 task.cancel(false);
304 else
305 ensurePrestart();
306 }
307 }
308
309 /**
310 * Requeues a periodic task unless current run state precludes it.
311 * Same idea as delayedExecute except drops task rather than rejecting.
312 *
313 * @param task the task
314 */
315 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
316 if (canRunInCurrentRunState(true)) {
317 super.getQueue().add(task);
318 if (!canRunInCurrentRunState(true) && remove(task))
319 task.cancel(false);
320 else
321 ensurePrestart();
322 }
323 }
324
325 /**
326 * Cancels and clears the queue of all tasks that should not be run
327 * due to shutdown policy. Invoked within super.shutdown.
328 */
329 @Override void onShutdown() {
330 BlockingQueue<Runnable> q = super.getQueue();
331 boolean keepDelayed =
332 getExecuteExistingDelayedTasksAfterShutdownPolicy();
333 boolean keepPeriodic =
334 getContinueExistingPeriodicTasksAfterShutdownPolicy();
335 if (!keepDelayed && !keepPeriodic) {
336 for (Object e : q.toArray())
337 if (e instanceof RunnableScheduledFuture<?>)
338 ((RunnableScheduledFuture<?>) e).cancel(false);
339 q.clear();
340 }
341 else {
342 // Traverse snapshot to avoid iterator exceptions
343 for (Object e : q.toArray()) {
344 if (e instanceof RunnableScheduledFuture) {
345 RunnableScheduledFuture<?> t =
346 (RunnableScheduledFuture<?>)e;
347 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
348 t.isCancelled()) { // also remove if already cancelled
349 if (q.remove(t))
350 t.cancel(false);
351 }
352 }
353 }
354 }
355 tryTerminate();
356 }
357
358 /**
359 * Modifies or replaces the task used to execute a runnable.
360 * This method can be used to override the concrete
361 * class used for managing internal tasks.
362 * The default implementation simply returns the given task.
363 *
364 * @param runnable the submitted Runnable
365 * @param task the task created to execute the runnable
366 * @param <V> the type of the task's result
367 * @return a task that can execute the runnable
368 * @since 1.6
369 */
370 protected <V> RunnableScheduledFuture<V> decorateTask(
371 Runnable runnable, RunnableScheduledFuture<V> task) {
372 return task;
373 }
374
375 /**
376 * Modifies or replaces the task used to execute a callable.
377 * This method can be used to override the concrete
378 * class used for managing internal tasks.
379 * The default implementation simply returns the given task.
380 *
381 * @param callable the submitted Callable
382 * @param task the task created to execute the callable
383 * @param <V> the type of the task's result
384 * @return a task that can execute the callable
385 * @since 1.6
386 */
387 protected <V> RunnableScheduledFuture<V> decorateTask(
388 Callable<V> callable, RunnableScheduledFuture<V> task) {
389 return task;
390 }
391
392 /**
393 * Creates a new {@code ScheduledThreadPoolExecutor} with the
394 * given core pool size.
395 *
396 * @param corePoolSize the number of threads to keep in the pool, even
397 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
398 * @throws IllegalArgumentException if {@code corePoolSize < 0}
399 */
400 public ScheduledThreadPoolExecutor(int corePoolSize) {
401 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
402 new DelayedWorkQueue());
403 }
404
405 /**
406 * Creates a new {@code ScheduledThreadPoolExecutor} with the
407 * given initial parameters.
408 *
409 * @param corePoolSize the number of threads to keep in the pool, even
410 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
411 * @param threadFactory the factory to use when the executor
412 * creates a new thread
413 * @throws IllegalArgumentException if {@code corePoolSize < 0}
414 * @throws NullPointerException if {@code threadFactory} is null
415 */
416 public ScheduledThreadPoolExecutor(int corePoolSize,
417 ThreadFactory threadFactory) {
418 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
419 new DelayedWorkQueue(), threadFactory);
420 }
421
422 /**
423 * Creates a new ScheduledThreadPoolExecutor with the given
424 * initial parameters.
425 *
426 * @param corePoolSize the number of threads to keep in the pool, even
427 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
428 * @param handler the handler to use when execution is blocked
429 * because the thread bounds and queue capacities are reached
430 * @throws IllegalArgumentException if {@code corePoolSize < 0}
431 * @throws NullPointerException if {@code handler} is null
432 */
433 public ScheduledThreadPoolExecutor(int corePoolSize,
434 RejectedExecutionHandler handler) {
435 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
436 new DelayedWorkQueue(), handler);
437 }
438
439 /**
440 * Creates a new ScheduledThreadPoolExecutor with the given
441 * initial parameters.
442 *
443 * @param corePoolSize the number of threads to keep in the pool, even
444 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
445 * @param threadFactory the factory to use when the executor
446 * creates a new thread
447 * @param handler the handler to use when execution is blocked
448 * because the thread bounds and queue capacities are reached
449 * @throws IllegalArgumentException if {@code corePoolSize < 0}
450 * @throws NullPointerException if {@code threadFactory} or
451 * {@code handler} is null
452 */
453 public ScheduledThreadPoolExecutor(int corePoolSize,
454 ThreadFactory threadFactory,
455 RejectedExecutionHandler handler) {
456 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
457 new DelayedWorkQueue(), threadFactory, handler);
458 }
459
460 /**
461 * Returns the trigger time of a delayed action.
462 */
463 private long triggerTime(long delay, TimeUnit unit) {
464 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
465 }
466
467 /**
468 * Returns the trigger time of a delayed action.
469 */
470 long triggerTime(long delay) {
471 return now() +
472 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
473 }
474
475 /**
476 * Constrains the values of all delays in the queue to be within
477 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
478 * This may occur if a task is eligible to be dequeued, but has
479 * not yet been, while some other task is added with a delay of
480 * Long.MAX_VALUE.
481 */
482 private long overflowFree(long delay) {
483 Delayed head = (Delayed) super.getQueue().peek();
484 if (head != null) {
485 long headDelay = head.getDelay(NANOSECONDS);
486 if (headDelay < 0 && (delay - headDelay < 0))
487 delay = Long.MAX_VALUE + headDelay;
488 }
489 return delay;
490 }
491
492 /**
493 * @throws RejectedExecutionException {@inheritDoc}
494 * @throws NullPointerException {@inheritDoc}
495 */
496 public ScheduledFuture<?> schedule(Runnable command,
497 long delay,
498 TimeUnit unit) {
499 if (command == null || unit == null)
500 throw new NullPointerException();
501 RunnableScheduledFuture<?> t = decorateTask(command,
502 new ScheduledFutureTask<Void>(command, null,
503 triggerTime(delay, unit)));
504 delayedExecute(t);
505 return t;
506 }
507
508 /**
509 * @throws RejectedExecutionException {@inheritDoc}
510 * @throws NullPointerException {@inheritDoc}
511 */
512 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
513 long delay,
514 TimeUnit unit) {
515 if (callable == null || unit == null)
516 throw new NullPointerException();
517 RunnableScheduledFuture<V> t = decorateTask(callable,
518 new ScheduledFutureTask<V>(callable,
519 triggerTime(delay, unit)));
520 delayedExecute(t);
521 return t;
522 }
523
524 /**
525 * @throws RejectedExecutionException {@inheritDoc}
526 * @throws NullPointerException {@inheritDoc}
527 * @throws IllegalArgumentException {@inheritDoc}
528 */
529 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
530 long initialDelay,
531 long period,
532 TimeUnit unit) {
533 if (command == null || unit == null)
534 throw new NullPointerException();
535 if (period <= 0)
536 throw new IllegalArgumentException();
537 ScheduledFutureTask<Void> sft =
538 new ScheduledFutureTask<Void>(command,
539 null,
540 triggerTime(initialDelay, unit),
541 unit.toNanos(period));
542 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
543 sft.outerTask = t;
544 delayedExecute(t);
545 return t;
546 }
547
548 /**
549 * @throws RejectedExecutionException {@inheritDoc}
550 * @throws NullPointerException {@inheritDoc}
551 * @throws IllegalArgumentException {@inheritDoc}
552 */
553 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
554 long initialDelay,
555 long delay,
556 TimeUnit unit) {
557 if (command == null || unit == null)
558 throw new NullPointerException();
559 if (delay <= 0)
560 throw new IllegalArgumentException();
561 ScheduledFutureTask<Void> sft =
562 new ScheduledFutureTask<Void>(command,
563 null,
564 triggerTime(initialDelay, unit),
565 unit.toNanos(-delay));
566 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
567 sft.outerTask = t;
568 delayedExecute(t);
569 return t;
570 }
571
572 /**
573 * Executes {@code command} with zero required delay.
574 * This has effect equivalent to
575 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
576 * Note that inspections of the queue and of the list returned by
577 * {@code shutdownNow} will access the zero-delayed
578 * {@link ScheduledFuture}, not the {@code command} itself.
579 *
580 * <p>A consequence of the use of {@code ScheduledFuture} objects is
581 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
582 * called with a null second {@code Throwable} argument, even if the
583 * {@code command} terminated abruptly. Instead, the {@code Throwable}
584 * thrown by such a task can be obtained via {@link Future#get}.
585 *
586 * @throws RejectedExecutionException at discretion of
587 * {@code RejectedExecutionHandler}, if the task
588 * cannot be accepted for execution because the
589 * executor has been shut down
590 * @throws NullPointerException {@inheritDoc}
591 */
592 public void execute(Runnable command) {
593 schedule(command, 0, NANOSECONDS);
594 }
595
596 // Override AbstractExecutorService methods
597
598 /**
599 * @throws RejectedExecutionException {@inheritDoc}
600 * @throws NullPointerException {@inheritDoc}
601 */
602 public Future<?> submit(Runnable task) {
603 return schedule(task, 0, NANOSECONDS);
604 }
605
606 /**
607 * @throws RejectedExecutionException {@inheritDoc}
608 * @throws NullPointerException {@inheritDoc}
609 */
610 public <T> Future<T> submit(Runnable task, T result) {
611 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
612 }
613
614 /**
615 * @throws RejectedExecutionException {@inheritDoc}
616 * @throws NullPointerException {@inheritDoc}
617 */
618 public <T> Future<T> submit(Callable<T> task) {
619 return schedule(task, 0, NANOSECONDS);
620 }
621
622 /**
623 * Sets the policy on whether to continue executing existing
624 * periodic tasks even when this executor has been {@code shutdown}.
625 * In this case, these tasks will only terminate upon
626 * {@code shutdownNow} or after setting the policy to
627 * {@code false} when already shutdown.
628 * This value is by default {@code false}.
629 *
630 * @param value if {@code true}, continue after shutdown, else don't
631 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
632 */
633 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
634 continueExistingPeriodicTasksAfterShutdown = value;
635 if (!value && isShutdown())
636 onShutdown();
637 }
638
639 /**
640 * Gets the policy on whether to continue executing existing
641 * periodic tasks even when this executor has been {@code shutdown}.
642 * In this case, these tasks will only terminate upon
643 * {@code shutdownNow} or after setting the policy to
644 * {@code false} when already shutdown.
645 * This value is by default {@code false}.
646 *
647 * @return {@code true} if will continue after shutdown
648 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
649 */
650 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
651 return continueExistingPeriodicTasksAfterShutdown;
652 }
653
654 /**
655 * Sets the policy on whether to execute existing delayed
656 * tasks even when this executor has been {@code shutdown}.
657 * In this case, these tasks will only terminate upon
658 * {@code shutdownNow}, or after setting the policy to
659 * {@code false} when already shutdown.
660 * This value is by default {@code true}.
661 *
662 * @param value if {@code true}, execute after shutdown, else don't
663 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
664 */
665 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
666 executeExistingDelayedTasksAfterShutdown = value;
667 if (!value && isShutdown())
668 onShutdown();
669 }
670
671 /**
672 * Gets the policy on whether to execute existing delayed
673 * tasks even when this executor has been {@code shutdown}.
674 * In this case, these tasks will only terminate upon
675 * {@code shutdownNow}, or after setting the policy to
676 * {@code false} when already shutdown.
677 * This value is by default {@code true}.
678 *
679 * @return {@code true} if will execute after shutdown
680 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
681 */
682 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
683 return executeExistingDelayedTasksAfterShutdown;
684 }
685
686 /**
687 * Sets the policy on whether cancelled tasks should be immediately
688 * removed from the work queue at time of cancellation. This value is
689 * by default {@code false}.
690 *
691 * @param value if {@code true}, remove on cancellation, else don't
692 * @see #getRemoveOnCancelPolicy
693 * @since 1.7
694 */
695 public void setRemoveOnCancelPolicy(boolean value) {
696 removeOnCancel = value;
697 }
698
699 /**
700 * Gets the policy on whether cancelled tasks should be immediately
701 * removed from the work queue at time of cancellation. This value is
702 * by default {@code false}.
703 *
704 * @return {@code true} if cancelled tasks are immediately removed
705 * from the queue
706 * @see #setRemoveOnCancelPolicy
707 * @since 1.7
708 */
709 public boolean getRemoveOnCancelPolicy() {
710 return removeOnCancel;
711 }
712
713 /**
714 * Initiates an orderly shutdown in which previously submitted
715 * tasks are executed, but no new tasks will be accepted.
716 * Invocation has no additional effect if already shut down.
717 *
718 * <p>This method does not wait for previously submitted tasks to
719 * complete execution. Use {@link #awaitTermination awaitTermination}
720 * to do that.
721 *
722 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
723 * has been set {@code false}, existing delayed tasks whose delays
724 * have not yet elapsed are cancelled. And unless the {@code
725 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
726 * {@code true}, future executions of existing periodic tasks will
727 * be cancelled.
728 *
729 * @throws SecurityException {@inheritDoc}
730 */
731 public void shutdown() {
732 super.shutdown();
733 }
734
735 /**
736 * Attempts to stop all actively executing tasks, halts the
737 * processing of waiting tasks, and returns a list of the tasks
738 * that were awaiting execution.
739 *
740 * <p>This method does not wait for actively executing tasks to
741 * terminate. Use {@link #awaitTermination awaitTermination} to
742 * do that.
743 *
744 * <p>There are no guarantees beyond best-effort attempts to stop
745 * processing actively executing tasks. This implementation
746 * cancels tasks via {@link Thread#interrupt}, so any task that
747 * fails to respond to interrupts may never terminate.
748 *
749 * @return list of tasks that never commenced execution.
750 * Each element of this list is a {@link ScheduledFuture},
751 * including those tasks submitted using {@code execute},
752 * which are for scheduling purposes used as the basis of a
753 * zero-delay {@code ScheduledFuture}.
754 * @throws SecurityException {@inheritDoc}
755 */
756 public List<Runnable> shutdownNow() {
757 return super.shutdownNow();
758 }
759
760 /**
761 * Returns the task queue used by this executor. Each element of
762 * this queue is a {@link ScheduledFuture}, including those
763 * tasks submitted using {@code execute} which are for scheduling
764 * purposes used as the basis of a zero-delay
765 * {@code ScheduledFuture}. Iteration over this queue is
766 * <em>not</em> guaranteed to traverse tasks in the order in
767 * which they will execute.
768 *
769 * @return the task queue
770 */
771 public BlockingQueue<Runnable> getQueue() {
772 return super.getQueue();
773 }
774
775 /**
776 * Specialized delay queue. To mesh with TPE declarations, this
777 * class must be declared as a BlockingQueue<Runnable> even though
778 * it can only hold RunnableScheduledFutures.
779 */
780 static class DelayedWorkQueue extends AbstractQueue<Runnable>
781 implements BlockingQueue<Runnable> {
782
783 /*
784 * A DelayedWorkQueue is based on a heap-based data structure
785 * like those in DelayQueue and PriorityQueue, except that
786 * every ScheduledFutureTask also records its index into the
787 * heap array. This eliminates the need to find a task upon
788 * cancellation, greatly speeding up removal (down from O(n)
789 * to O(log n)), and reducing garbage retention that would
790 * otherwise occur by waiting for the element to rise to top
791 * before clearing. But because the queue may also hold
792 * RunnableScheduledFutures that are not ScheduledFutureTasks,
793 * we are not guaranteed to have such indices available, in
794 * which case we fall back to linear search. (We expect that
795 * most tasks will not be decorated, and that the faster cases
796 * will be much more common.)
797 *
798 * All heap operations must record index changes -- mainly
799 * within siftUp and siftDown. Upon removal, a task's
800 * heapIndex is set to -1. Note that ScheduledFutureTasks can
801 * appear at most once in the queue (this need not be true for
802 * other kinds of tasks or work queues), so are uniquely
803 * identified by heapIndex.
804 */
805
806 private static final int INITIAL_CAPACITY = 16;
807 private RunnableScheduledFuture<?>[] queue =
808 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
809 private final ReentrantLock lock = new ReentrantLock();
810 private int size = 0;
811
812 /**
813 * Thread designated to wait for the task at the head of the
814 * queue. This variant of the Leader-Follower pattern
815 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
816 * minimize unnecessary timed waiting. When a thread becomes
817 * the leader, it waits only for the next delay to elapse, but
818 * other threads await indefinitely. The leader thread must
819 * signal some other thread before returning from take() or
820 * poll(...), unless some other thread becomes leader in the
821 * interim. Whenever the head of the queue is replaced with a
822 * task with an earlier expiration time, the leader field is
823 * invalidated by being reset to null, and some waiting
824 * thread, but not necessarily the current leader, is
825 * signalled. So waiting threads must be prepared to acquire
826 * and lose leadership while waiting.
827 */
828 private Thread leader = null;
829
830 /**
831 * Condition signalled when a newer task becomes available at the
832 * head of the queue or a new thread may need to become leader.
833 */
834 private final Condition available = lock.newCondition();
835
836 /**
837 * Sets f's heapIndex if it is a ScheduledFutureTask.
838 */
839 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
840 if (f instanceof ScheduledFutureTask)
841 ((ScheduledFutureTask)f).heapIndex = idx;
842 }
843
844 /**
845 * Sifts element added at bottom up to its heap-ordered spot.
846 * Call only when holding lock.
847 */
848 private void siftUp(int k, RunnableScheduledFuture<?> key) {
849 while (k > 0) {
850 int parent = (k - 1) >>> 1;
851 RunnableScheduledFuture<?> e = queue[parent];
852 if (key.compareTo(e) >= 0)
853 break;
854 queue[k] = e;
855 setIndex(e, k);
856 k = parent;
857 }
858 queue[k] = key;
859 setIndex(key, k);
860 }
861
862 /**
863 * Sifts element added at top down to its heap-ordered spot.
864 * Call only when holding lock.
865 */
866 private void siftDown(int k, RunnableScheduledFuture<?> key) {
867 int half = size >>> 1;
868 while (k < half) {
869 int child = (k << 1) + 1;
870 RunnableScheduledFuture<?> c = queue[child];
871 int right = child + 1;
872 if (right < size && c.compareTo(queue[right]) > 0)
873 c = queue[child = right];
874 if (key.compareTo(c) <= 0)
875 break;
876 queue[k] = c;
877 setIndex(c, k);
878 k = child;
879 }
880 queue[k] = key;
881 setIndex(key, k);
882 }
883
884 /**
885 * Resizes the heap array. Call only when holding lock.
886 */
887 private void grow() {
888 int oldCapacity = queue.length;
889 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
890 if (newCapacity < 0) // overflow
891 newCapacity = Integer.MAX_VALUE;
892 queue = Arrays.copyOf(queue, newCapacity);
893 }
894
895 /**
896 * Finds index of given object, or -1 if absent.
897 */
898 private int indexOf(Object x) {
899 if (x != null) {
900 if (x instanceof ScheduledFutureTask) {
901 int i = ((ScheduledFutureTask) x).heapIndex;
902 // Sanity check; x could conceivably be a
903 // ScheduledFutureTask from some other pool.
904 if (i >= 0 && i < size && queue[i] == x)
905 return i;
906 } else {
907 for (int i = 0; i < size; i++)
908 if (x.equals(queue[i]))
909 return i;
910 }
911 }
912 return -1;
913 }
914
915 public boolean contains(Object x) {
916 final ReentrantLock lock = this.lock;
917 lock.lock();
918 try {
919 return indexOf(x) != -1;
920 } finally {
921 lock.unlock();
922 }
923 }
924
925 public boolean remove(Object x) {
926 final ReentrantLock lock = this.lock;
927 lock.lock();
928 try {
929 int i = indexOf(x);
930 if (i < 0)
931 return false;
932
933 setIndex(queue[i], -1);
934 int s = --size;
935 RunnableScheduledFuture<?> replacement = queue[s];
936 queue[s] = null;
937 if (s != i) {
938 siftDown(i, replacement);
939 if (queue[i] == replacement)
940 siftUp(i, replacement);
941 }
942 return true;
943 } finally {
944 lock.unlock();
945 }
946 }
947
948 public int size() {
949 final ReentrantLock lock = this.lock;
950 lock.lock();
951 try {
952 return size;
953 } finally {
954 lock.unlock();
955 }
956 }
957
958 public boolean isEmpty() {
959 return size() == 0;
960 }
961
962 public int remainingCapacity() {
963 return Integer.MAX_VALUE;
964 }
965
966 public RunnableScheduledFuture<?> peek() {
967 final ReentrantLock lock = this.lock;
968 lock.lock();
969 try {
970 return queue[0];
971 } finally {
972 lock.unlock();
973 }
974 }
975
976 public boolean offer(Runnable x) {
977 if (x == null)
978 throw new NullPointerException();
979 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
980 final ReentrantLock lock = this.lock;
981 lock.lock();
982 try {
983 int i = size;
984 if (i >= queue.length)
985 grow();
986 size = i + 1;
987 if (i == 0) {
988 queue[0] = e;
989 setIndex(e, 0);
990 } else {
991 siftUp(i, e);
992 }
993 if (queue[0] == e) {
994 leader = null;
995 available.signal();
996 }
997 } finally {
998 lock.unlock();
999 }
1000 return true;
1001 }
1002
1003 public void put(Runnable e) {
1004 offer(e);
1005 }
1006
1007 public boolean add(Runnable e) {
1008 return offer(e);
1009 }
1010
1011 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1012 return offer(e);
1013 }
1014
1015 /**
1016 * Performs common bookkeeping for poll and take: Replaces
1017 * first element with last and sifts it down. Call only when
1018 * holding lock.
1019 * @param f the task to remove and return
1020 */
1021 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1022 int s = --size;
1023 RunnableScheduledFuture<?> x = queue[s];
1024 queue[s] = null;
1025 if (s != 0)
1026 siftDown(0, x);
1027 setIndex(f, -1);
1028 return f;
1029 }
1030
1031 public RunnableScheduledFuture<?> poll() {
1032 final ReentrantLock lock = this.lock;
1033 lock.lock();
1034 try {
1035 RunnableScheduledFuture<?> first = queue[0];
1036 if (first == null || first.getDelay(NANOSECONDS) > 0)
1037 return null;
1038 else
1039 return finishPoll(first);
1040 } finally {
1041 lock.unlock();
1042 }
1043 }
1044
1045 public RunnableScheduledFuture<?> take() throws InterruptedException {
1046 final ReentrantLock lock = this.lock;
1047 lock.lockInterruptibly();
1048 try {
1049 for (;;) {
1050 RunnableScheduledFuture<?> first = queue[0];
1051 if (first == null)
1052 available.await();
1053 else {
1054 long delay = first.getDelay(NANOSECONDS);
1055 if (delay <= 0)
1056 return finishPoll(first);
1057 first = null; // don't retain ref while waiting
1058 if (leader != null)
1059 available.await();
1060 else {
1061 Thread thisThread = Thread.currentThread();
1062 leader = thisThread;
1063 try {
1064 available.awaitNanos(delay);
1065 } finally {
1066 if (leader == thisThread)
1067 leader = null;
1068 }
1069 }
1070 }
1071 }
1072 } finally {
1073 if (leader == null && queue[0] != null)
1074 available.signal();
1075 lock.unlock();
1076 }
1077 }
1078
1079 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1080 throws InterruptedException {
1081 long nanos = unit.toNanos(timeout);
1082 final ReentrantLock lock = this.lock;
1083 lock.lockInterruptibly();
1084 try {
1085 for (;;) {
1086 RunnableScheduledFuture<?> first = queue[0];
1087 if (first == null) {
1088 if (nanos <= 0)
1089 return null;
1090 else
1091 nanos = available.awaitNanos(nanos);
1092 } else {
1093 long delay = first.getDelay(NANOSECONDS);
1094 if (delay <= 0)
1095 return finishPoll(first);
1096 if (nanos <= 0)
1097 return null;
1098 first = null; // don't retain ref while waiting
1099 if (nanos < delay || leader != null)
1100 nanos = available.awaitNanos(nanos);
1101 else {
1102 Thread thisThread = Thread.currentThread();
1103 leader = thisThread;
1104 try {
1105 long timeLeft = available.awaitNanos(delay);
1106 nanos -= delay - timeLeft;
1107 } finally {
1108 if (leader == thisThread)
1109 leader = null;
1110 }
1111 }
1112 }
1113 }
1114 } finally {
1115 if (leader == null && queue[0] != null)
1116 available.signal();
1117 lock.unlock();
1118 }
1119 }
1120
1121 public void clear() {
1122 final ReentrantLock lock = this.lock;
1123 lock.lock();
1124 try {
1125 for (int i = 0; i < size; i++) {
1126 RunnableScheduledFuture<?> t = queue[i];
1127 if (t != null) {
1128 queue[i] = null;
1129 setIndex(t, -1);
1130 }
1131 }
1132 size = 0;
1133 } finally {
1134 lock.unlock();
1135 }
1136 }
1137
1138 /**
1139 * Returns first element only if it is expired.
1140 * Used only by drainTo. Call only when holding lock.
1141 */
1142 private RunnableScheduledFuture<?> peekExpired() {
1143 // assert lock.isHeldByCurrentThread();
1144 RunnableScheduledFuture<?> first = queue[0];
1145 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1146 null : first;
1147 }
1148
1149 public int drainTo(Collection<? super Runnable> c) {
1150 if (c == null)
1151 throw new NullPointerException();
1152 if (c == this)
1153 throw new IllegalArgumentException();
1154 final ReentrantLock lock = this.lock;
1155 lock.lock();
1156 try {
1157 RunnableScheduledFuture<?> first;
1158 int n = 0;
1159 while ((first = peekExpired()) != null) {
1160 c.add(first); // In this order, in case add() throws.
1161 finishPoll(first);
1162 ++n;
1163 }
1164 return n;
1165 } finally {
1166 lock.unlock();
1167 }
1168 }
1169
1170 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1171 if (c == null)
1172 throw new NullPointerException();
1173 if (c == this)
1174 throw new IllegalArgumentException();
1175 if (maxElements <= 0)
1176 return 0;
1177 final ReentrantLock lock = this.lock;
1178 lock.lock();
1179 try {
1180 RunnableScheduledFuture<?> first;
1181 int n = 0;
1182 while (n < maxElements && (first = peekExpired()) != null) {
1183 c.add(first); // In this order, in case add() throws.
1184 finishPoll(first);
1185 ++n;
1186 }
1187 return n;
1188 } finally {
1189 lock.unlock();
1190 }
1191 }
1192
1193 public Object[] toArray() {
1194 final ReentrantLock lock = this.lock;
1195 lock.lock();
1196 try {
1197 return Arrays.copyOf(queue, size, Object[].class);
1198 } finally {
1199 lock.unlock();
1200 }
1201 }
1202
1203 @SuppressWarnings("unchecked")
1204 public <T> T[] toArray(T[] a) {
1205 final ReentrantLock lock = this.lock;
1206 lock.lock();
1207 try {
1208 if (a.length < size)
1209 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1210 System.arraycopy(queue, 0, a, 0, size);
1211 if (a.length > size)
1212 a[size] = null;
1213 return a;
1214 } finally {
1215 lock.unlock();
1216 }
1217 }
1218
1219 public Iterator<Runnable> iterator() {
1220 return new Itr(Arrays.copyOf(queue, size));
1221 }
1222
1223 /**
1224 * Snapshot iterator that works off copy of underlying q array.
1225 */
1226 private class Itr implements Iterator<Runnable> {
1227 final RunnableScheduledFuture[] array;
1228 int cursor = 0; // index of next element to return
1229 int lastRet = -1; // index of last element, or -1 if no such
1230
1231 Itr(RunnableScheduledFuture[] array) {
1232 this.array = array;
1233 }
1234
1235 public boolean hasNext() {
1236 return cursor < array.length;
1237 }
1238
1239 public Runnable next() {
1240 if (cursor >= array.length)
1241 throw new NoSuchElementException();
1242 lastRet = cursor;
1243 return array[cursor++];
1244 }
1245
1246 public void remove() {
1247 if (lastRet < 0)
1248 throw new IllegalStateException();
1249 DelayedWorkQueue.this.remove(array[lastRet]);
1250 lastRet = -1;
1251 }
1252 }
1253 }
1254 }