ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.9
Committed: Thu Nov 5 16:45:06 2015 UTC (8 years, 6 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.8: +60 -51 lines
Log Message:
sync from main to fix tck failure

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 /**
23 * A {@link ThreadPoolExecutor} that can additionally schedule
24 * commands to run after a given delay, or to execute periodically.
25 * This class is preferable to {@link java.util.Timer} when multiple
26 * worker threads are needed, or when the additional flexibility or
27 * capabilities of {@link ThreadPoolExecutor} (which this class
28 * extends) are required.
29 *
30 * <p>Delayed tasks execute no sooner than they are enabled, but
31 * without any real-time guarantees about when, after they are
32 * enabled, they will commence. Tasks scheduled for exactly the same
33 * execution time are enabled in first-in-first-out (FIFO) order of
34 * submission.
35 *
36 * <p>When a submitted task is cancelled before it is run, execution
37 * is suppressed. By default, such a cancelled task is not
38 * automatically removed from the work queue until its delay elapses.
39 * While this enables further inspection and monitoring, it may also
40 * cause unbounded retention of cancelled tasks. To avoid this, use
41 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42 * removed from the work queue at time of cancellation.
43 *
44 * <p>Successive executions of a periodic task scheduled via
45 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 * do not overlap. While different executions may be performed by
48 * different threads, the effects of prior executions
49 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 * those of subsequent ones.
51 *
52 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 * of the inherited tuning methods are not useful for it. In
54 * particular, because it acts as a fixed-sized pool using
55 * {@code corePoolSize} threads and an unbounded queue, adjustments
56 * to {@code maximumPoolSize} have no useful effect. Additionally, it
57 * is almost never a good idea to set {@code corePoolSize} to zero or
58 * use {@code allowCoreThreadTimeOut} because this may leave the pool
59 * without threads to handle tasks once they become eligible to run.
60 *
61 * <p><b>Extension notes:</b> This class overrides the
62 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 * {@link AbstractExecutorService#submit(Runnable) submit}
64 * methods to generate internal {@link ScheduledFuture} objects to
65 * control per-task delays and scheduling. To preserve
66 * functionality, any further overrides of these methods in
67 * subclasses must invoke superclass versions, which effectively
68 * disables additional task customization. However, this class
69 * provides alternative protected extension method
70 * {@code decorateTask} (one version each for {@code Runnable} and
71 * {@code Callable}) that can be used to customize the concrete task
72 * types used to execute commands entered via {@code execute},
73 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74 * and {@code scheduleWithFixedDelay}. By default, a
75 * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 * {@link FutureTask}. However, this may be modified or replaced using
77 * subclasses of the form:
78 *
79 * <pre> {@code
80 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81 *
82 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 *
84 * protected <V> RunnableScheduledFuture<V> decorateTask(
85 * Runnable r, RunnableScheduledFuture<V> task) {
86 * return new CustomTask<V>(r, task);
87 * }
88 *
89 * protected <V> RunnableScheduledFuture<V> decorateTask(
90 * Callable<V> c, RunnableScheduledFuture<V> task) {
91 * return new CustomTask<V>(c, task);
92 * }
93 * // ... add constructors, etc.
94 * }}</pre>
95 *
96 * @since 1.5
97 * @author Doug Lea
98 */
99 public class ScheduledThreadPoolExecutor
100 extends ThreadPoolExecutor
101 implements ScheduledExecutorService {
102
103 /*
104 * This class specializes ThreadPoolExecutor implementation by
105 *
106 * 1. Using a custom task type, ScheduledFutureTask for
107 * tasks, even those that don't require scheduling (i.e.,
108 * those submitted using ExecutorService execute, not
109 * ScheduledExecutorService methods) which are treated as
110 * delayed tasks with a delay of zero.
111 *
112 * 2. Using a custom queue (DelayedWorkQueue), a variant of
113 * unbounded DelayQueue. The lack of capacity constraint and
114 * the fact that corePoolSize and maximumPoolSize are
115 * effectively identical simplifies some execution mechanics
116 * (see delayedExecute) compared to ThreadPoolExecutor.
117 *
118 * 3. Supporting optional run-after-shutdown parameters, which
119 * leads to overrides of shutdown methods to remove and cancel
120 * tasks that should NOT be run after shutdown, as well as
121 * different recheck logic when task (re)submission overlaps
122 * with a shutdown.
123 *
124 * 4. Task decoration methods to allow interception and
125 * instrumentation, which are needed because subclasses cannot
126 * otherwise override submit methods to get this effect. These
127 * don't have any impact on pool control logic though.
128 */
129
130 /**
131 * False if should cancel/suppress periodic tasks on shutdown.
132 */
133 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
134
135 /**
136 * False if should cancel non-periodic tasks on shutdown.
137 */
138 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
139
140 /**
141 * True if ScheduledFutureTask.cancel should remove from queue.
142 */
143 volatile boolean removeOnCancel;
144
145 /**
146 * Sequence number to break scheduling ties, and in turn to
147 * guarantee FIFO order among tied entries.
148 */
149 private static final AtomicLong sequencer = new AtomicLong();
150
151 private class ScheduledFutureTask<V>
152 extends FutureTask<V> implements RunnableScheduledFuture<V> {
153
154 /** Sequence number to break ties FIFO */
155 private final long sequenceNumber;
156
157 /** The time the task is enabled to execute in nanoTime units */
158 private volatile long time;
159
160 /**
161 * Period in nanoseconds for repeating tasks.
162 * A positive value indicates fixed-rate execution.
163 * A negative value indicates fixed-delay execution.
164 * A value of 0 indicates a non-repeating (one-shot) task.
165 */
166 private final long period;
167
168 /** The actual task to be re-enqueued by reExecutePeriodic */
169 RunnableScheduledFuture<V> outerTask = this;
170
171 /**
172 * Index into delay queue, to support faster cancellation.
173 */
174 int heapIndex;
175
176 /**
177 * Creates a one-shot action with given nanoTime-based trigger time.
178 */
179 ScheduledFutureTask(Runnable r, V result, long triggerTime,
180 long sequenceNumber) {
181 super(r, result);
182 this.time = triggerTime;
183 this.period = 0;
184 this.sequenceNumber = sequenceNumber;
185 }
186
187 /**
188 * Creates a periodic action with given nanoTime-based initial
189 * trigger time and period.
190 */
191 ScheduledFutureTask(Runnable r, V result, long triggerTime,
192 long period, long sequenceNumber) {
193 super(r, result);
194 this.time = triggerTime;
195 this.period = period;
196 this.sequenceNumber = sequenceNumber;
197 }
198
199 /**
200 * Creates a one-shot action with given nanoTime-based trigger time.
201 */
202 ScheduledFutureTask(Callable<V> callable, long triggerTime,
203 long sequenceNumber) {
204 super(callable);
205 this.time = triggerTime;
206 this.period = 0;
207 this.sequenceNumber = sequenceNumber;
208 }
209
210 public long getDelay(TimeUnit unit) {
211 return unit.convert(time - System.nanoTime(), NANOSECONDS);
212 }
213
214 public int compareTo(Delayed other) {
215 if (other == this) // compare zero if same object
216 return 0;
217 if (other instanceof ScheduledFutureTask) {
218 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
219 long diff = time - x.time;
220 if (diff < 0)
221 return -1;
222 else if (diff > 0)
223 return 1;
224 else if (sequenceNumber < x.sequenceNumber)
225 return -1;
226 else
227 return 1;
228 }
229 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
230 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
231 }
232
233 /**
234 * Returns {@code true} if this is a periodic (not a one-shot) action.
235 *
236 * @return {@code true} if periodic
237 */
238 public boolean isPeriodic() {
239 return period != 0;
240 }
241
242 /**
243 * Sets the next time to run for a periodic task.
244 */
245 private void setNextRunTime() {
246 long p = period;
247 if (p > 0)
248 time += p;
249 else
250 time = triggerTime(-p);
251 }
252
253 public boolean cancel(boolean mayInterruptIfRunning) {
254 boolean cancelled = super.cancel(mayInterruptIfRunning);
255 if (cancelled && removeOnCancel && heapIndex >= 0)
256 remove(this);
257 return cancelled;
258 }
259
260 /**
261 * Overrides FutureTask version so as to reset/requeue if periodic.
262 */
263 public void run() {
264 boolean periodic = isPeriodic();
265 if (!canRunInCurrentRunState(periodic))
266 cancel(false);
267 else if (!periodic)
268 super.run();
269 else if (super.runAndReset()) {
270 setNextRunTime();
271 reExecutePeriodic(outerTask);
272 }
273 }
274 }
275
276 /**
277 * Returns true if can run a task given current run state
278 * and run-after-shutdown parameters.
279 *
280 * @param periodic true if this task periodic, false if delayed
281 */
282 boolean canRunInCurrentRunState(boolean periodic) {
283 return isRunningOrShutdown(periodic ?
284 continueExistingPeriodicTasksAfterShutdown :
285 executeExistingDelayedTasksAfterShutdown);
286 }
287
288 /**
289 * Main execution method for delayed or periodic tasks. If pool
290 * is shut down, rejects the task. Otherwise adds task to queue
291 * and starts a thread, if necessary, to run it. (We cannot
292 * prestart the thread to run the task because the task (probably)
293 * shouldn't be run yet.) If the pool is shut down while the task
294 * is being added, cancel and remove it if required by state and
295 * run-after-shutdown parameters.
296 *
297 * @param task the task
298 */
299 private void delayedExecute(RunnableScheduledFuture<?> task) {
300 if (isShutdown())
301 reject(task);
302 else {
303 super.getQueue().add(task);
304 if (isShutdown() &&
305 !canRunInCurrentRunState(task.isPeriodic()) &&
306 remove(task))
307 task.cancel(false);
308 else
309 ensurePrestart();
310 }
311 }
312
313 /**
314 * Requeues a periodic task unless current run state precludes it.
315 * Same idea as delayedExecute except drops task rather than rejecting.
316 *
317 * @param task the task
318 */
319 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
320 if (canRunInCurrentRunState(true)) {
321 super.getQueue().add(task);
322 if (!canRunInCurrentRunState(true) && remove(task))
323 task.cancel(false);
324 else
325 ensurePrestart();
326 }
327 }
328
329 /**
330 * Cancels and clears the queue of all tasks that should not be run
331 * due to shutdown policy. Invoked within super.shutdown.
332 */
333 @Override void onShutdown() {
334 BlockingQueue<Runnable> q = super.getQueue();
335 boolean keepDelayed =
336 getExecuteExistingDelayedTasksAfterShutdownPolicy();
337 boolean keepPeriodic =
338 getContinueExistingPeriodicTasksAfterShutdownPolicy();
339 if (!keepDelayed && !keepPeriodic) {
340 for (Object e : q.toArray())
341 if (e instanceof RunnableScheduledFuture<?>)
342 ((RunnableScheduledFuture<?>) e).cancel(false);
343 q.clear();
344 }
345 else {
346 // Traverse snapshot to avoid iterator exceptions
347 for (Object e : q.toArray()) {
348 if (e instanceof RunnableScheduledFuture) {
349 RunnableScheduledFuture<?> t =
350 (RunnableScheduledFuture<?>)e;
351 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
352 t.isCancelled()) { // also remove if already cancelled
353 if (q.remove(t))
354 t.cancel(false);
355 }
356 }
357 }
358 }
359 tryTerminate();
360 }
361
362 /**
363 * Modifies or replaces the task used to execute a runnable.
364 * This method can be used to override the concrete
365 * class used for managing internal tasks.
366 * The default implementation simply returns the given task.
367 *
368 * @param runnable the submitted Runnable
369 * @param task the task created to execute the runnable
370 * @param <V> the type of the task's result
371 * @return a task that can execute the runnable
372 * @since 1.6
373 */
374 protected <V> RunnableScheduledFuture<V> decorateTask(
375 Runnable runnable, RunnableScheduledFuture<V> task) {
376 return task;
377 }
378
379 /**
380 * Modifies or replaces the task used to execute a callable.
381 * This method can be used to override the concrete
382 * class used for managing internal tasks.
383 * The default implementation simply returns the given task.
384 *
385 * @param callable the submitted Callable
386 * @param task the task created to execute the callable
387 * @param <V> the type of the task's result
388 * @return a task that can execute the callable
389 * @since 1.6
390 */
391 protected <V> RunnableScheduledFuture<V> decorateTask(
392 Callable<V> callable, RunnableScheduledFuture<V> task) {
393 return task;
394 }
395
396 /**
397 * The default keep-alive time for pool threads.
398 *
399 * Normally, this value is unused because all pool threads will be
400 * core threads, but if a user creates a pool with a corePoolSize
401 * of zero (against our advice), we keep a thread alive as long as
402 * there are queued tasks. If the keep alive time is zero (the
403 * historic value), we end up hot-spinning in getTask, wasting a
404 * CPU. But on the other hand, if we set the value too high, and
405 * users create a one-shot pool which they don't cleanly shutdown,
406 * the pool's non-daemon threads will prevent JVM termination. A
407 * small but non-zero value (relative to a JVM's lifetime) seems
408 * best.
409 */
410 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
411
412 /**
413 * Creates a new {@code ScheduledThreadPoolExecutor} with the
414 * given core pool size.
415 *
416 * @param corePoolSize the number of threads to keep in the pool, even
417 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
418 * @throws IllegalArgumentException if {@code corePoolSize < 0}
419 */
420 public ScheduledThreadPoolExecutor(int corePoolSize) {
421 super(corePoolSize, Integer.MAX_VALUE,
422 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
423 new DelayedWorkQueue());
424 }
425
426 /**
427 * Creates a new {@code ScheduledThreadPoolExecutor} with the
428 * given initial parameters.
429 *
430 * @param corePoolSize the number of threads to keep in the pool, even
431 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
432 * @param threadFactory the factory to use when the executor
433 * creates a new thread
434 * @throws IllegalArgumentException if {@code corePoolSize < 0}
435 * @throws NullPointerException if {@code threadFactory} is null
436 */
437 public ScheduledThreadPoolExecutor(int corePoolSize,
438 ThreadFactory threadFactory) {
439 super(corePoolSize, Integer.MAX_VALUE,
440 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
441 new DelayedWorkQueue(), threadFactory);
442 }
443
444 /**
445 * Creates a new {@code ScheduledThreadPoolExecutor} with the
446 * given initial parameters.
447 *
448 * @param corePoolSize the number of threads to keep in the pool, even
449 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
450 * @param handler the handler to use when execution is blocked
451 * because the thread bounds and queue capacities are reached
452 * @throws IllegalArgumentException if {@code corePoolSize < 0}
453 * @throws NullPointerException if {@code handler} is null
454 */
455 public ScheduledThreadPoolExecutor(int corePoolSize,
456 RejectedExecutionHandler handler) {
457 super(corePoolSize, Integer.MAX_VALUE,
458 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
459 new DelayedWorkQueue(), handler);
460 }
461
462 /**
463 * Creates a new {@code ScheduledThreadPoolExecutor} with the
464 * given initial parameters.
465 *
466 * @param corePoolSize the number of threads to keep in the pool, even
467 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
468 * @param threadFactory the factory to use when the executor
469 * creates a new thread
470 * @param handler the handler to use when execution is blocked
471 * because the thread bounds and queue capacities are reached
472 * @throws IllegalArgumentException if {@code corePoolSize < 0}
473 * @throws NullPointerException if {@code threadFactory} or
474 * {@code handler} is null
475 */
476 public ScheduledThreadPoolExecutor(int corePoolSize,
477 ThreadFactory threadFactory,
478 RejectedExecutionHandler handler) {
479 super(corePoolSize, Integer.MAX_VALUE,
480 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
481 new DelayedWorkQueue(), threadFactory, handler);
482 }
483
484 /**
485 * Returns the nanoTime-based trigger time of a delayed action.
486 */
487 private long triggerTime(long delay, TimeUnit unit) {
488 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
489 }
490
491 /**
492 * Returns the nanoTime-based trigger time of a delayed action.
493 */
494 long triggerTime(long delay) {
495 return System.nanoTime() +
496 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
497 }
498
499 /**
500 * Constrains the values of all delays in the queue to be within
501 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
502 * This may occur if a task is eligible to be dequeued, but has
503 * not yet been, while some other task is added with a delay of
504 * Long.MAX_VALUE.
505 */
506 private long overflowFree(long delay) {
507 Delayed head = (Delayed) super.getQueue().peek();
508 if (head != null) {
509 long headDelay = head.getDelay(NANOSECONDS);
510 if (headDelay < 0 && (delay - headDelay < 0))
511 delay = Long.MAX_VALUE + headDelay;
512 }
513 return delay;
514 }
515
516 /**
517 * @throws RejectedExecutionException {@inheritDoc}
518 * @throws NullPointerException {@inheritDoc}
519 */
520 public ScheduledFuture<?> schedule(Runnable command,
521 long delay,
522 TimeUnit unit) {
523 if (command == null || unit == null)
524 throw new NullPointerException();
525 RunnableScheduledFuture<Void> t = decorateTask(command,
526 new ScheduledFutureTask<Void>(command, null,
527 triggerTime(delay, unit),
528 sequencer.getAndIncrement()));
529 delayedExecute(t);
530 return t;
531 }
532
533 /**
534 * @throws RejectedExecutionException {@inheritDoc}
535 * @throws NullPointerException {@inheritDoc}
536 */
537 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
538 long delay,
539 TimeUnit unit) {
540 if (callable == null || unit == null)
541 throw new NullPointerException();
542 RunnableScheduledFuture<V> t = decorateTask(callable,
543 new ScheduledFutureTask<V>(callable,
544 triggerTime(delay, unit),
545 sequencer.getAndIncrement()));
546 delayedExecute(t);
547 return t;
548 }
549
550 /**
551 * @throws RejectedExecutionException {@inheritDoc}
552 * @throws NullPointerException {@inheritDoc}
553 * @throws IllegalArgumentException {@inheritDoc}
554 */
555 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
556 long initialDelay,
557 long period,
558 TimeUnit unit) {
559 if (command == null || unit == null)
560 throw new NullPointerException();
561 if (period <= 0L)
562 throw new IllegalArgumentException();
563 ScheduledFutureTask<Void> sft =
564 new ScheduledFutureTask<Void>(command,
565 null,
566 triggerTime(initialDelay, unit),
567 unit.toNanos(period),
568 sequencer.getAndIncrement());
569 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
570 sft.outerTask = t;
571 delayedExecute(t);
572 return t;
573 }
574
575 /**
576 * @throws RejectedExecutionException {@inheritDoc}
577 * @throws NullPointerException {@inheritDoc}
578 * @throws IllegalArgumentException {@inheritDoc}
579 */
580 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
581 long initialDelay,
582 long delay,
583 TimeUnit unit) {
584 if (command == null || unit == null)
585 throw new NullPointerException();
586 if (delay <= 0L)
587 throw new IllegalArgumentException();
588 ScheduledFutureTask<Void> sft =
589 new ScheduledFutureTask<Void>(command,
590 null,
591 triggerTime(initialDelay, unit),
592 -unit.toNanos(delay),
593 sequencer.getAndIncrement());
594 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
595 sft.outerTask = t;
596 delayedExecute(t);
597 return t;
598 }
599
600 /**
601 * Executes {@code command} with zero required delay.
602 * This has effect equivalent to
603 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
604 * Note that inspections of the queue and of the list returned by
605 * {@code shutdownNow} will access the zero-delayed
606 * {@link ScheduledFuture}, not the {@code command} itself.
607 *
608 * <p>A consequence of the use of {@code ScheduledFuture} objects is
609 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
610 * called with a null second {@code Throwable} argument, even if the
611 * {@code command} terminated abruptly. Instead, the {@code Throwable}
612 * thrown by such a task can be obtained via {@link Future#get}.
613 *
614 * @throws RejectedExecutionException at discretion of
615 * {@code RejectedExecutionHandler}, if the task
616 * cannot be accepted for execution because the
617 * executor has been shut down
618 * @throws NullPointerException {@inheritDoc}
619 */
620 public void execute(Runnable command) {
621 schedule(command, 0, NANOSECONDS);
622 }
623
624 // Override AbstractExecutorService methods
625
626 /**
627 * @throws RejectedExecutionException {@inheritDoc}
628 * @throws NullPointerException {@inheritDoc}
629 */
630 public Future<?> submit(Runnable task) {
631 return schedule(task, 0, NANOSECONDS);
632 }
633
634 /**
635 * @throws RejectedExecutionException {@inheritDoc}
636 * @throws NullPointerException {@inheritDoc}
637 */
638 public <T> Future<T> submit(Runnable task, T result) {
639 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
640 }
641
642 /**
643 * @throws RejectedExecutionException {@inheritDoc}
644 * @throws NullPointerException {@inheritDoc}
645 */
646 public <T> Future<T> submit(Callable<T> task) {
647 return schedule(task, 0, NANOSECONDS);
648 }
649
650 /**
651 * Sets the policy on whether to continue executing existing
652 * periodic tasks even when this executor has been {@code shutdown}.
653 * In this case, these tasks will only terminate upon
654 * {@code shutdownNow} or after setting the policy to
655 * {@code false} when already shutdown.
656 * This value is by default {@code false}.
657 *
658 * @param value if {@code true}, continue after shutdown, else don't
659 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
660 */
661 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
662 continueExistingPeriodicTasksAfterShutdown = value;
663 if (!value && isShutdown())
664 onShutdown();
665 }
666
667 /**
668 * Gets the policy on whether to continue executing existing
669 * periodic tasks even when this executor has been {@code shutdown}.
670 * In this case, these tasks will only terminate upon
671 * {@code shutdownNow} or after setting the policy to
672 * {@code false} when already shutdown.
673 * This value is by default {@code false}.
674 *
675 * @return {@code true} if will continue after shutdown
676 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
677 */
678 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
679 return continueExistingPeriodicTasksAfterShutdown;
680 }
681
682 /**
683 * Sets the policy on whether to execute existing delayed
684 * tasks even when this executor has been {@code shutdown}.
685 * In this case, these tasks will only terminate upon
686 * {@code shutdownNow}, or after setting the policy to
687 * {@code false} when already shutdown.
688 * This value is by default {@code true}.
689 *
690 * @param value if {@code true}, execute after shutdown, else don't
691 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
692 */
693 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
694 executeExistingDelayedTasksAfterShutdown = value;
695 if (!value && isShutdown())
696 onShutdown();
697 }
698
699 /**
700 * Gets the policy on whether to execute existing delayed
701 * tasks even when this executor has been {@code shutdown}.
702 * In this case, these tasks will only terminate upon
703 * {@code shutdownNow}, or after setting the policy to
704 * {@code false} when already shutdown.
705 * This value is by default {@code true}.
706 *
707 * @return {@code true} if will execute after shutdown
708 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
709 */
710 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
711 return executeExistingDelayedTasksAfterShutdown;
712 }
713
714 /**
715 * Sets the policy on whether cancelled tasks should be immediately
716 * removed from the work queue at time of cancellation. This value is
717 * by default {@code false}.
718 *
719 * @param value if {@code true}, remove on cancellation, else don't
720 * @see #getRemoveOnCancelPolicy
721 * @since 1.7
722 */
723 public void setRemoveOnCancelPolicy(boolean value) {
724 removeOnCancel = value;
725 }
726
727 /**
728 * Gets the policy on whether cancelled tasks should be immediately
729 * removed from the work queue at time of cancellation. This value is
730 * by default {@code false}.
731 *
732 * @return {@code true} if cancelled tasks are immediately removed
733 * from the queue
734 * @see #setRemoveOnCancelPolicy
735 * @since 1.7
736 */
737 public boolean getRemoveOnCancelPolicy() {
738 return removeOnCancel;
739 }
740
741 /**
742 * Initiates an orderly shutdown in which previously submitted
743 * tasks are executed, but no new tasks will be accepted.
744 * Invocation has no additional effect if already shut down.
745 *
746 * <p>This method does not wait for previously submitted tasks to
747 * complete execution. Use {@link #awaitTermination awaitTermination}
748 * to do that.
749 *
750 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
751 * has been set {@code false}, existing delayed tasks whose delays
752 * have not yet elapsed are cancelled. And unless the {@code
753 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
754 * {@code true}, future executions of existing periodic tasks will
755 * be cancelled.
756 *
757 * @throws SecurityException {@inheritDoc}
758 */
759 public void shutdown() {
760 super.shutdown();
761 }
762
763 /**
764 * Attempts to stop all actively executing tasks, halts the
765 * processing of waiting tasks, and returns a list of the tasks
766 * that were awaiting execution. These tasks are drained (removed)
767 * from the task queue upon return from this method.
768 *
769 * <p>This method does not wait for actively executing tasks to
770 * terminate. Use {@link #awaitTermination awaitTermination} to
771 * do that.
772 *
773 * <p>There are no guarantees beyond best-effort attempts to stop
774 * processing actively executing tasks. This implementation
775 * interrupts tasks via {@link Thread#interrupt}; any task that
776 * fails to respond to interrupts may never terminate.
777 *
778 * @return list of tasks that never commenced execution.
779 * Each element of this list is a {@link ScheduledFuture}.
780 * For tasks submitted via one of the {@code schedule}
781 * methods, the element will be identical to the returned
782 * {@code ScheduledFuture}. For tasks submitted using
783 * {@link #execute execute}, the element will be a
784 * zero-delay {@code ScheduledFuture}.
785 * @throws SecurityException {@inheritDoc}
786 */
787 public List<Runnable> shutdownNow() {
788 return super.shutdownNow();
789 }
790
791 /**
792 * Returns the task queue used by this executor. Access to the
793 * task queue is intended primarily for debugging and monitoring.
794 * This queue may be in active use. Retrieving the task queue
795 * does not prevent queued tasks from executing.
796 *
797 * <p>Each element of this queue is a {@link ScheduledFuture}.
798 * For tasks submitted via one of the {@code schedule} methods, the
799 * element will be identical to the returned {@code ScheduledFuture}.
800 * For tasks submitted using {@link #execute execute}, the element
801 * will be a zero-delay {@code ScheduledFuture}.
802 *
803 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
804 * tasks in the order in which they will execute.
805 *
806 * @return the task queue
807 */
808 public BlockingQueue<Runnable> getQueue() {
809 return super.getQueue();
810 }
811
812 /**
813 * Specialized delay queue. To mesh with TPE declarations, this
814 * class must be declared as a BlockingQueue<Runnable> even though
815 * it can only hold RunnableScheduledFutures.
816 */
817 static class DelayedWorkQueue extends AbstractQueue<Runnable>
818 implements BlockingQueue<Runnable> {
819
820 /*
821 * A DelayedWorkQueue is based on a heap-based data structure
822 * like those in DelayQueue and PriorityQueue, except that
823 * every ScheduledFutureTask also records its index into the
824 * heap array. This eliminates the need to find a task upon
825 * cancellation, greatly speeding up removal (down from O(n)
826 * to O(log n)), and reducing garbage retention that would
827 * otherwise occur by waiting for the element to rise to top
828 * before clearing. But because the queue may also hold
829 * RunnableScheduledFutures that are not ScheduledFutureTasks,
830 * we are not guaranteed to have such indices available, in
831 * which case we fall back to linear search. (We expect that
832 * most tasks will not be decorated, and that the faster cases
833 * will be much more common.)
834 *
835 * All heap operations must record index changes -- mainly
836 * within siftUp and siftDown. Upon removal, a task's
837 * heapIndex is set to -1. Note that ScheduledFutureTasks can
838 * appear at most once in the queue (this need not be true for
839 * other kinds of tasks or work queues), so are uniquely
840 * identified by heapIndex.
841 */
842
843 private static final int INITIAL_CAPACITY = 16;
844 private RunnableScheduledFuture<?>[] queue =
845 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
846 private final ReentrantLock lock = new ReentrantLock();
847 private int size;
848
849 /**
850 * Thread designated to wait for the task at the head of the
851 * queue. This variant of the Leader-Follower pattern
852 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
853 * minimize unnecessary timed waiting. When a thread becomes
854 * the leader, it waits only for the next delay to elapse, but
855 * other threads await indefinitely. The leader thread must
856 * signal some other thread before returning from take() or
857 * poll(...), unless some other thread becomes leader in the
858 * interim. Whenever the head of the queue is replaced with a
859 * task with an earlier expiration time, the leader field is
860 * invalidated by being reset to null, and some waiting
861 * thread, but not necessarily the current leader, is
862 * signalled. So waiting threads must be prepared to acquire
863 * and lose leadership while waiting.
864 */
865 private Thread leader;
866
867 /**
868 * Condition signalled when a newer task becomes available at the
869 * head of the queue or a new thread may need to become leader.
870 */
871 private final Condition available = lock.newCondition();
872
873 /**
874 * Sets f's heapIndex if it is a ScheduledFutureTask.
875 */
876 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
877 if (f instanceof ScheduledFutureTask)
878 ((ScheduledFutureTask)f).heapIndex = idx;
879 }
880
881 /**
882 * Sifts element added at bottom up to its heap-ordered spot.
883 * Call only when holding lock.
884 */
885 private void siftUp(int k, RunnableScheduledFuture<?> key) {
886 while (k > 0) {
887 int parent = (k - 1) >>> 1;
888 RunnableScheduledFuture<?> e = queue[parent];
889 if (key.compareTo(e) >= 0)
890 break;
891 queue[k] = e;
892 setIndex(e, k);
893 k = parent;
894 }
895 queue[k] = key;
896 setIndex(key, k);
897 }
898
899 /**
900 * Sifts element added at top down to its heap-ordered spot.
901 * Call only when holding lock.
902 */
903 private void siftDown(int k, RunnableScheduledFuture<?> key) {
904 int half = size >>> 1;
905 while (k < half) {
906 int child = (k << 1) + 1;
907 RunnableScheduledFuture<?> c = queue[child];
908 int right = child + 1;
909 if (right < size && c.compareTo(queue[right]) > 0)
910 c = queue[child = right];
911 if (key.compareTo(c) <= 0)
912 break;
913 queue[k] = c;
914 setIndex(c, k);
915 k = child;
916 }
917 queue[k] = key;
918 setIndex(key, k);
919 }
920
921 /**
922 * Resizes the heap array. Call only when holding lock.
923 */
924 private void grow() {
925 int oldCapacity = queue.length;
926 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
927 if (newCapacity < 0) // overflow
928 newCapacity = Integer.MAX_VALUE;
929 queue = Arrays.copyOf(queue, newCapacity);
930 }
931
932 /**
933 * Finds index of given object, or -1 if absent.
934 */
935 private int indexOf(Object x) {
936 if (x != null) {
937 if (x instanceof ScheduledFutureTask) {
938 int i = ((ScheduledFutureTask) x).heapIndex;
939 // Sanity check; x could conceivably be a
940 // ScheduledFutureTask from some other pool.
941 if (i >= 0 && i < size && queue[i] == x)
942 return i;
943 } else {
944 for (int i = 0; i < size; i++)
945 if (x.equals(queue[i]))
946 return i;
947 }
948 }
949 return -1;
950 }
951
952 public boolean contains(Object x) {
953 final ReentrantLock lock = this.lock;
954 lock.lock();
955 try {
956 return indexOf(x) != -1;
957 } finally {
958 lock.unlock();
959 }
960 }
961
962 public boolean remove(Object x) {
963 final ReentrantLock lock = this.lock;
964 lock.lock();
965 try {
966 int i = indexOf(x);
967 if (i < 0)
968 return false;
969
970 setIndex(queue[i], -1);
971 int s = --size;
972 RunnableScheduledFuture<?> replacement = queue[s];
973 queue[s] = null;
974 if (s != i) {
975 siftDown(i, replacement);
976 if (queue[i] == replacement)
977 siftUp(i, replacement);
978 }
979 return true;
980 } finally {
981 lock.unlock();
982 }
983 }
984
985 public int size() {
986 final ReentrantLock lock = this.lock;
987 lock.lock();
988 try {
989 return size;
990 } finally {
991 lock.unlock();
992 }
993 }
994
995 public boolean isEmpty() {
996 return size() == 0;
997 }
998
999 public int remainingCapacity() {
1000 return Integer.MAX_VALUE;
1001 }
1002
1003 public RunnableScheduledFuture<?> peek() {
1004 final ReentrantLock lock = this.lock;
1005 lock.lock();
1006 try {
1007 return queue[0];
1008 } finally {
1009 lock.unlock();
1010 }
1011 }
1012
1013 public boolean offer(Runnable x) {
1014 if (x == null)
1015 throw new NullPointerException();
1016 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1017 final ReentrantLock lock = this.lock;
1018 lock.lock();
1019 try {
1020 int i = size;
1021 if (i >= queue.length)
1022 grow();
1023 size = i + 1;
1024 if (i == 0) {
1025 queue[0] = e;
1026 setIndex(e, 0);
1027 } else {
1028 siftUp(i, e);
1029 }
1030 if (queue[0] == e) {
1031 leader = null;
1032 available.signal();
1033 }
1034 } finally {
1035 lock.unlock();
1036 }
1037 return true;
1038 }
1039
1040 public void put(Runnable e) {
1041 offer(e);
1042 }
1043
1044 public boolean add(Runnable e) {
1045 return offer(e);
1046 }
1047
1048 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1049 return offer(e);
1050 }
1051
1052 /**
1053 * Performs common bookkeeping for poll and take: Replaces
1054 * first element with last and sifts it down. Call only when
1055 * holding lock.
1056 * @param f the task to remove and return
1057 */
1058 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1059 int s = --size;
1060 RunnableScheduledFuture<?> x = queue[s];
1061 queue[s] = null;
1062 if (s != 0)
1063 siftDown(0, x);
1064 setIndex(f, -1);
1065 return f;
1066 }
1067
1068 public RunnableScheduledFuture<?> poll() {
1069 final ReentrantLock lock = this.lock;
1070 lock.lock();
1071 try {
1072 RunnableScheduledFuture<?> first = queue[0];
1073 return (first == null || first.getDelay(NANOSECONDS) > 0)
1074 ? null
1075 : finishPoll(first);
1076 } finally {
1077 lock.unlock();
1078 }
1079 }
1080
1081 public RunnableScheduledFuture<?> take() throws InterruptedException {
1082 final ReentrantLock lock = this.lock;
1083 lock.lockInterruptibly();
1084 try {
1085 for (;;) {
1086 RunnableScheduledFuture<?> first = queue[0];
1087 if (first == null)
1088 available.await();
1089 else {
1090 long delay = first.getDelay(NANOSECONDS);
1091 if (delay <= 0L)
1092 return finishPoll(first);
1093 first = null; // don't retain ref while waiting
1094 if (leader != null)
1095 available.await();
1096 else {
1097 Thread thisThread = Thread.currentThread();
1098 leader = thisThread;
1099 try {
1100 available.awaitNanos(delay);
1101 } finally {
1102 if (leader == thisThread)
1103 leader = null;
1104 }
1105 }
1106 }
1107 }
1108 } finally {
1109 if (leader == null && queue[0] != null)
1110 available.signal();
1111 lock.unlock();
1112 }
1113 }
1114
1115 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1116 throws InterruptedException {
1117 long nanos = unit.toNanos(timeout);
1118 final ReentrantLock lock = this.lock;
1119 lock.lockInterruptibly();
1120 try {
1121 for (;;) {
1122 RunnableScheduledFuture<?> first = queue[0];
1123 if (first == null) {
1124 if (nanos <= 0L)
1125 return null;
1126 else
1127 nanos = available.awaitNanos(nanos);
1128 } else {
1129 long delay = first.getDelay(NANOSECONDS);
1130 if (delay <= 0L)
1131 return finishPoll(first);
1132 if (nanos <= 0L)
1133 return null;
1134 first = null; // don't retain ref while waiting
1135 if (nanos < delay || leader != null)
1136 nanos = available.awaitNanos(nanos);
1137 else {
1138 Thread thisThread = Thread.currentThread();
1139 leader = thisThread;
1140 try {
1141 long timeLeft = available.awaitNanos(delay);
1142 nanos -= delay - timeLeft;
1143 } finally {
1144 if (leader == thisThread)
1145 leader = null;
1146 }
1147 }
1148 }
1149 }
1150 } finally {
1151 if (leader == null && queue[0] != null)
1152 available.signal();
1153 lock.unlock();
1154 }
1155 }
1156
1157 public void clear() {
1158 final ReentrantLock lock = this.lock;
1159 lock.lock();
1160 try {
1161 for (int i = 0; i < size; i++) {
1162 RunnableScheduledFuture<?> t = queue[i];
1163 if (t != null) {
1164 queue[i] = null;
1165 setIndex(t, -1);
1166 }
1167 }
1168 size = 0;
1169 } finally {
1170 lock.unlock();
1171 }
1172 }
1173
1174 /**
1175 * Returns first element only if it is expired.
1176 * Used only by drainTo. Call only when holding lock.
1177 */
1178 private RunnableScheduledFuture<?> peekExpired() {
1179 // assert lock.isHeldByCurrentThread();
1180 RunnableScheduledFuture<?> first = queue[0];
1181 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1182 null : first;
1183 }
1184
1185 public int drainTo(Collection<? super Runnable> c) {
1186 if (c == null)
1187 throw new NullPointerException();
1188 if (c == this)
1189 throw new IllegalArgumentException();
1190 final ReentrantLock lock = this.lock;
1191 lock.lock();
1192 try {
1193 RunnableScheduledFuture<?> first;
1194 int n = 0;
1195 while ((first = peekExpired()) != null) {
1196 c.add(first); // In this order, in case add() throws.
1197 finishPoll(first);
1198 ++n;
1199 }
1200 return n;
1201 } finally {
1202 lock.unlock();
1203 }
1204 }
1205
1206 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1207 if (c == null)
1208 throw new NullPointerException();
1209 if (c == this)
1210 throw new IllegalArgumentException();
1211 if (maxElements <= 0)
1212 return 0;
1213 final ReentrantLock lock = this.lock;
1214 lock.lock();
1215 try {
1216 RunnableScheduledFuture<?> first;
1217 int n = 0;
1218 while (n < maxElements && (first = peekExpired()) != null) {
1219 c.add(first); // In this order, in case add() throws.
1220 finishPoll(first);
1221 ++n;
1222 }
1223 return n;
1224 } finally {
1225 lock.unlock();
1226 }
1227 }
1228
1229 public Object[] toArray() {
1230 final ReentrantLock lock = this.lock;
1231 lock.lock();
1232 try {
1233 return Arrays.copyOf(queue, size, Object[].class);
1234 } finally {
1235 lock.unlock();
1236 }
1237 }
1238
1239 @SuppressWarnings("unchecked")
1240 public <T> T[] toArray(T[] a) {
1241 final ReentrantLock lock = this.lock;
1242 lock.lock();
1243 try {
1244 if (a.length < size)
1245 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1246 System.arraycopy(queue, 0, a, 0, size);
1247 if (a.length > size)
1248 a[size] = null;
1249 return a;
1250 } finally {
1251 lock.unlock();
1252 }
1253 }
1254
1255 public Iterator<Runnable> iterator() {
1256 return new Itr(Arrays.copyOf(queue, size));
1257 }
1258
1259 /**
1260 * Snapshot iterator that works off copy of underlying q array.
1261 */
1262 private class Itr implements Iterator<Runnable> {
1263 final RunnableScheduledFuture<?>[] array;
1264 int cursor; // index of next element to return; initially 0
1265 int lastRet = -1; // index of last element returned; -1 if no such
1266
1267 Itr(RunnableScheduledFuture<?>[] array) {
1268 this.array = array;
1269 }
1270
1271 public boolean hasNext() {
1272 return cursor < array.length;
1273 }
1274
1275 public Runnable next() {
1276 if (cursor >= array.length)
1277 throw new NoSuchElementException();
1278 lastRet = cursor;
1279 return array[cursor++];
1280 }
1281
1282 public void remove() {
1283 if (lastRet < 0)
1284 throw new IllegalStateException();
1285 DelayedWorkQueue.this.remove(array[lastRet]);
1286 lastRet = -1;
1287 }
1288 }
1289 }
1290 }