ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.107
Committed: Tue Mar 28 18:13:10 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.106: +31 -35 lines
Log Message:
fix 8177632 and 8176254

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Objects;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Condition;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 /**
24 * A {@link ThreadPoolExecutor} that can additionally schedule
25 * commands to run after a given delay, or to execute periodically.
26 * This class is preferable to {@link java.util.Timer} when multiple
27 * worker threads are needed, or when the additional flexibility or
28 * capabilities of {@link ThreadPoolExecutor} (which this class
29 * extends) are required.
30 *
31 * <p>Delayed tasks execute no sooner than they are enabled, but
32 * without any real-time guarantees about when, after they are
33 * enabled, they will commence. Tasks scheduled for exactly the same
34 * execution time are enabled in first-in-first-out (FIFO) order of
35 * submission.
36 *
37 * <p>When a submitted task is cancelled before it is run, execution
38 * is suppressed. By default, such a cancelled task is not
39 * automatically removed from the work queue until its delay elapses.
40 * While this enables further inspection and monitoring, it may also
41 * cause unbounded retention of cancelled tasks. To avoid this, use
42 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43 * removed from the work queue at time of cancellation.
44 *
45 * <p>Successive executions of a periodic task scheduled via
46 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48 * do not overlap. While different executions may be performed by
49 * different threads, the effects of prior executions
50 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 * those of subsequent ones.
52 *
53 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
54 * of the inherited tuning methods are not useful for it. In
55 * particular, because it acts as a fixed-sized pool using
56 * {@code corePoolSize} threads and an unbounded queue, adjustments
57 * to {@code maximumPoolSize} have no useful effect. Additionally, it
58 * is almost never a good idea to set {@code corePoolSize} to zero or
59 * use {@code allowCoreThreadTimeOut} because this may leave the pool
60 * without threads to handle tasks once they become eligible to run.
61 *
62 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
63 * this class uses {@link Executors#defaultThreadFactory} as the
64 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
65 * as the default rejected execution handler.
66 *
67 * <p><b>Extension notes:</b> This class overrides the
68 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
69 * {@link AbstractExecutorService#submit(Runnable) submit}
70 * methods to generate internal {@link ScheduledFuture} objects to
71 * control per-task delays and scheduling. To preserve
72 * functionality, any further overrides of these methods in
73 * subclasses must invoke superclass versions, which effectively
74 * disables additional task customization. However, this class
75 * provides alternative protected extension method
76 * {@code decorateTask} (one version each for {@code Runnable} and
77 * {@code Callable}) that can be used to customize the concrete task
78 * types used to execute commands entered via {@code execute},
79 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
80 * and {@code scheduleWithFixedDelay}. By default, a
81 * {@code ScheduledThreadPoolExecutor} uses a task type extending
82 * {@link FutureTask}. However, this may be modified or replaced using
83 * subclasses of the form:
84 *
85 * <pre> {@code
86 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
87 *
88 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
89 *
90 * protected <V> RunnableScheduledFuture<V> decorateTask(
91 * Runnable r, RunnableScheduledFuture<V> task) {
92 * return new CustomTask<V>(r, task);
93 * }
94 *
95 * protected <V> RunnableScheduledFuture<V> decorateTask(
96 * Callable<V> c, RunnableScheduledFuture<V> task) {
97 * return new CustomTask<V>(c, task);
98 * }
99 * // ... add constructors, etc.
100 * }}</pre>
101 *
102 * @since 1.5
103 * @author Doug Lea
104 */
105 public class ScheduledThreadPoolExecutor
106 extends ThreadPoolExecutor
107 implements ScheduledExecutorService {
108
109 /*
110 * This class specializes ThreadPoolExecutor implementation by
111 *
112 * 1. Using a custom task type ScheduledFutureTask, even for tasks
113 * that don't require scheduling because they are submitted
114 * using ExecutorService rather than ScheduledExecutorService
115 * methods, which are treated as tasks with a delay of zero.
116 *
117 * 2. Using a custom queue (DelayedWorkQueue), a variant of
118 * unbounded DelayQueue. The lack of capacity constraint and
119 * the fact that corePoolSize and maximumPoolSize are
120 * effectively identical simplifies some execution mechanics
121 * (see delayedExecute) compared to ThreadPoolExecutor.
122 *
123 * 3. Supporting optional run-after-shutdown parameters, which
124 * leads to overrides of shutdown methods to remove and cancel
125 * tasks that should NOT be run after shutdown, as well as
126 * different recheck logic when task (re)submission overlaps
127 * with a shutdown.
128 *
129 * 4. Task decoration methods to allow interception and
130 * instrumentation, which are needed because subclasses cannot
131 * otherwise override submit methods to get this effect. These
132 * don't have any impact on pool control logic though.
133 */
134
135 /**
136 * False if should cancel/suppress periodic tasks on shutdown.
137 */
138 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
139
140 /**
141 * False if should cancel non-periodic tasks on shutdown.
142 */
143 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
144
145 /**
146 * True if ScheduledFutureTask.cancel should remove from queue.
147 */
148 volatile boolean removeOnCancel;
149
150 /**
151 * Sequence number to break scheduling ties, and in turn to
152 * guarantee FIFO order among tied entries.
153 */
154 private static final AtomicLong sequencer = new AtomicLong();
155
156 private class ScheduledFutureTask<V>
157 extends FutureTask<V> implements RunnableScheduledFuture<V> {
158
159 /** Sequence number to break ties FIFO */
160 private final long sequenceNumber;
161
162 /** The nanoTime-based time when the task is enabled to execute. */
163 private volatile long time;
164
165 /**
166 * Period for repeating tasks, in nanoseconds.
167 * A positive value indicates fixed-rate execution.
168 * A negative value indicates fixed-delay execution.
169 * A value of 0 indicates a non-repeating (one-shot) task.
170 */
171 private final long period;
172
173 /** The actual task to be re-enqueued by reExecutePeriodic */
174 RunnableScheduledFuture<V> outerTask = this;
175
176 /**
177 * Index into delay queue, to support faster cancellation.
178 */
179 int heapIndex;
180
181 /**
182 * Creates a one-shot action with given nanoTime-based trigger time.
183 */
184 ScheduledFutureTask(Runnable r, V result, long triggerTime,
185 long sequenceNumber) {
186 super(r, result);
187 this.time = triggerTime;
188 this.period = 0;
189 this.sequenceNumber = sequenceNumber;
190 }
191
192 /**
193 * Creates a periodic action with given nanoTime-based initial
194 * trigger time and period.
195 */
196 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197 long period, long sequenceNumber) {
198 super(r, result);
199 this.time = triggerTime;
200 this.period = period;
201 this.sequenceNumber = sequenceNumber;
202 }
203
204 /**
205 * Creates a one-shot action with given nanoTime-based trigger time.
206 */
207 ScheduledFutureTask(Callable<V> callable, long triggerTime,
208 long sequenceNumber) {
209 super(callable);
210 this.time = triggerTime;
211 this.period = 0;
212 this.sequenceNumber = sequenceNumber;
213 }
214
215 public long getDelay(TimeUnit unit) {
216 return unit.convert(time - System.nanoTime(), NANOSECONDS);
217 }
218
219 public int compareTo(Delayed other) {
220 if (other == this) // compare zero if same object
221 return 0;
222 if (other instanceof ScheduledFutureTask) {
223 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
224 long diff = time - x.time;
225 if (diff < 0)
226 return -1;
227 else if (diff > 0)
228 return 1;
229 else if (sequenceNumber < x.sequenceNumber)
230 return -1;
231 else
232 return 1;
233 }
234 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
235 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
236 }
237
238 /**
239 * Returns {@code true} if this is a periodic (not a one-shot) action.
240 *
241 * @return {@code true} if periodic
242 */
243 public boolean isPeriodic() {
244 return period != 0;
245 }
246
247 /**
248 * Sets the next time to run for a periodic task.
249 */
250 private void setNextRunTime() {
251 long p = period;
252 if (p > 0)
253 time += p;
254 else
255 time = triggerTime(-p);
256 }
257
258 public boolean cancel(boolean mayInterruptIfRunning) {
259 // The racy read of heapIndex below is benign:
260 // if heapIndex < 0, then OOTA guarantees that we have surely
261 // been removed; else we recheck under lock in remove()
262 boolean cancelled = super.cancel(mayInterruptIfRunning);
263 if (cancelled && removeOnCancel && heapIndex >= 0)
264 remove(this);
265 return cancelled;
266 }
267
268 /**
269 * Overrides FutureTask version so as to reset/requeue if periodic.
270 */
271 public void run() {
272 if (!canRunInCurrentRunState(this))
273 cancel(false);
274 else if (!isPeriodic())
275 super.run();
276 else if (super.runAndReset()) {
277 setNextRunTime();
278 reExecutePeriodic(outerTask);
279 }
280 }
281 }
282
283 /**
284 * Returns true if can run a task given current run state and
285 * run-after-shutdown parameters.
286 */
287 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
288 if (!isShutdown())
289 return true;
290 if (isStopped())
291 return false;
292 return task.isPeriodic()
293 ? continueExistingPeriodicTasksAfterShutdown
294 : (executeExistingDelayedTasksAfterShutdown
295 || task.getDelay(NANOSECONDS) <= 0);
296 }
297
298 /**
299 * Main execution method for delayed or periodic tasks. If pool
300 * is shut down, rejects the task. Otherwise adds task to queue
301 * and starts a thread, if necessary, to run it. (We cannot
302 * prestart the thread to run the task because the task (probably)
303 * shouldn't be run yet.) If the pool is shut down while the task
304 * is being added, cancel and remove it if required by state and
305 * run-after-shutdown parameters.
306 *
307 * @param task the task
308 */
309 private void delayedExecute(RunnableScheduledFuture<?> task) {
310 if (isShutdown())
311 reject(task);
312 else {
313 super.getQueue().add(task);
314 if (!canRunInCurrentRunState(task) && remove(task))
315 task.cancel(false);
316 else
317 ensurePrestart();
318 }
319 }
320
321 /**
322 * Requeues a periodic task unless current run state precludes it.
323 * Same idea as delayedExecute except drops task rather than rejecting.
324 *
325 * @param task the task
326 */
327 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
328 if (canRunInCurrentRunState(task)) {
329 super.getQueue().add(task);
330 if (canRunInCurrentRunState(task) || !remove(task)) {
331 ensurePrestart();
332 return;
333 }
334 }
335 task.cancel(false);
336 }
337
338 /**
339 * Cancels and clears the queue of all tasks that should not be run
340 * due to shutdown policy. Invoked within super.shutdown.
341 */
342 @Override void onShutdown() {
343 BlockingQueue<Runnable> q = super.getQueue();
344 boolean keepDelayed =
345 getExecuteExistingDelayedTasksAfterShutdownPolicy();
346 boolean keepPeriodic =
347 getContinueExistingPeriodicTasksAfterShutdownPolicy();
348 // Traverse snapshot to avoid iterator exceptions
349 // TODO: implement and use efficient removeIf
350 // super.getQueue().removeIf(...);
351 for (Object e : q.toArray()) {
352 if (e instanceof RunnableScheduledFuture) {
353 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
354 if ((t.isPeriodic()
355 ? !keepPeriodic
356 : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
357 || t.isCancelled()) { // also remove if already cancelled
358 if (q.remove(t))
359 t.cancel(false);
360 }
361 }
362 }
363 tryTerminate();
364 }
365
366 /**
367 * Modifies or replaces the task used to execute a runnable.
368 * This method can be used to override the concrete
369 * class used for managing internal tasks.
370 * The default implementation simply returns the given task.
371 *
372 * @param runnable the submitted Runnable
373 * @param task the task created to execute the runnable
374 * @param <V> the type of the task's result
375 * @return a task that can execute the runnable
376 * @since 1.6
377 */
378 protected <V> RunnableScheduledFuture<V> decorateTask(
379 Runnable runnable, RunnableScheduledFuture<V> task) {
380 return task;
381 }
382
383 /**
384 * Modifies or replaces the task used to execute a callable.
385 * This method can be used to override the concrete
386 * class used for managing internal tasks.
387 * The default implementation simply returns the given task.
388 *
389 * @param callable the submitted Callable
390 * @param task the task created to execute the callable
391 * @param <V> the type of the task's result
392 * @return a task that can execute the callable
393 * @since 1.6
394 */
395 protected <V> RunnableScheduledFuture<V> decorateTask(
396 Callable<V> callable, RunnableScheduledFuture<V> task) {
397 return task;
398 }
399
400 /**
401 * The default keep-alive time for pool threads.
402 *
403 * Normally, this value is unused because all pool threads will be
404 * core threads, but if a user creates a pool with a corePoolSize
405 * of zero (against our advice), we keep a thread alive as long as
406 * there are queued tasks. If the keep alive time is zero (the
407 * historic value), we end up hot-spinning in getTask, wasting a
408 * CPU. But on the other hand, if we set the value too high, and
409 * users create a one-shot pool which they don't cleanly shutdown,
410 * the pool's non-daemon threads will prevent JVM termination. A
411 * small but non-zero value (relative to a JVM's lifetime) seems
412 * best.
413 */
414 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
415
416 /**
417 * Creates a new {@code ScheduledThreadPoolExecutor} with the
418 * given core pool size.
419 *
420 * @param corePoolSize the number of threads to keep in the pool, even
421 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
422 * @throws IllegalArgumentException if {@code corePoolSize < 0}
423 */
424 public ScheduledThreadPoolExecutor(int corePoolSize) {
425 super(corePoolSize, Integer.MAX_VALUE,
426 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
427 new DelayedWorkQueue());
428 }
429
430 /**
431 * Creates a new {@code ScheduledThreadPoolExecutor} with the
432 * given initial parameters.
433 *
434 * @param corePoolSize the number of threads to keep in the pool, even
435 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
436 * @param threadFactory the factory to use when the executor
437 * creates a new thread
438 * @throws IllegalArgumentException if {@code corePoolSize < 0}
439 * @throws NullPointerException if {@code threadFactory} is null
440 */
441 public ScheduledThreadPoolExecutor(int corePoolSize,
442 ThreadFactory threadFactory) {
443 super(corePoolSize, Integer.MAX_VALUE,
444 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
445 new DelayedWorkQueue(), threadFactory);
446 }
447
448 /**
449 * Creates a new {@code ScheduledThreadPoolExecutor} with the
450 * given initial parameters.
451 *
452 * @param corePoolSize the number of threads to keep in the pool, even
453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 * @param handler the handler to use when execution is blocked
455 * because the thread bounds and queue capacities are reached
456 * @throws IllegalArgumentException if {@code corePoolSize < 0}
457 * @throws NullPointerException if {@code handler} is null
458 */
459 public ScheduledThreadPoolExecutor(int corePoolSize,
460 RejectedExecutionHandler handler) {
461 super(corePoolSize, Integer.MAX_VALUE,
462 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463 new DelayedWorkQueue(), handler);
464 }
465
466 /**
467 * Creates a new {@code ScheduledThreadPoolExecutor} with the
468 * given initial parameters.
469 *
470 * @param corePoolSize the number of threads to keep in the pool, even
471 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
472 * @param threadFactory the factory to use when the executor
473 * creates a new thread
474 * @param handler the handler to use when execution is blocked
475 * because the thread bounds and queue capacities are reached
476 * @throws IllegalArgumentException if {@code corePoolSize < 0}
477 * @throws NullPointerException if {@code threadFactory} or
478 * {@code handler} is null
479 */
480 public ScheduledThreadPoolExecutor(int corePoolSize,
481 ThreadFactory threadFactory,
482 RejectedExecutionHandler handler) {
483 super(corePoolSize, Integer.MAX_VALUE,
484 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
485 new DelayedWorkQueue(), threadFactory, handler);
486 }
487
488 /**
489 * Returns the nanoTime-based trigger time of a delayed action.
490 */
491 private long triggerTime(long delay, TimeUnit unit) {
492 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493 }
494
495 /**
496 * Returns the nanoTime-based trigger time of a delayed action.
497 */
498 long triggerTime(long delay) {
499 return System.nanoTime() +
500 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501 }
502
503 /**
504 * Constrains the values of all delays in the queue to be within
505 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506 * This may occur if a task is eligible to be dequeued, but has
507 * not yet been, while some other task is added with a delay of
508 * Long.MAX_VALUE.
509 */
510 private long overflowFree(long delay) {
511 Delayed head = (Delayed) super.getQueue().peek();
512 if (head != null) {
513 long headDelay = head.getDelay(NANOSECONDS);
514 if (headDelay < 0 && (delay - headDelay < 0))
515 delay = Long.MAX_VALUE + headDelay;
516 }
517 return delay;
518 }
519
520 /**
521 * @throws RejectedExecutionException {@inheritDoc}
522 * @throws NullPointerException {@inheritDoc}
523 */
524 public ScheduledFuture<?> schedule(Runnable command,
525 long delay,
526 TimeUnit unit) {
527 if (command == null || unit == null)
528 throw new NullPointerException();
529 RunnableScheduledFuture<Void> t = decorateTask(command,
530 new ScheduledFutureTask<Void>(command, null,
531 triggerTime(delay, unit),
532 sequencer.getAndIncrement()));
533 delayedExecute(t);
534 return t;
535 }
536
537 /**
538 * @throws RejectedExecutionException {@inheritDoc}
539 * @throws NullPointerException {@inheritDoc}
540 */
541 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542 long delay,
543 TimeUnit unit) {
544 if (callable == null || unit == null)
545 throw new NullPointerException();
546 RunnableScheduledFuture<V> t = decorateTask(callable,
547 new ScheduledFutureTask<V>(callable,
548 triggerTime(delay, unit),
549 sequencer.getAndIncrement()));
550 delayedExecute(t);
551 return t;
552 }
553
554 /**
555 * @throws RejectedExecutionException {@inheritDoc}
556 * @throws NullPointerException {@inheritDoc}
557 * @throws IllegalArgumentException {@inheritDoc}
558 */
559 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
560 long initialDelay,
561 long period,
562 TimeUnit unit) {
563 if (command == null || unit == null)
564 throw new NullPointerException();
565 if (period <= 0L)
566 throw new IllegalArgumentException();
567 ScheduledFutureTask<Void> sft =
568 new ScheduledFutureTask<Void>(command,
569 null,
570 triggerTime(initialDelay, unit),
571 unit.toNanos(period),
572 sequencer.getAndIncrement());
573 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
574 sft.outerTask = t;
575 delayedExecute(t);
576 return t;
577 }
578
579 /**
580 * @throws RejectedExecutionException {@inheritDoc}
581 * @throws NullPointerException {@inheritDoc}
582 * @throws IllegalArgumentException {@inheritDoc}
583 */
584 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
585 long initialDelay,
586 long delay,
587 TimeUnit unit) {
588 if (command == null || unit == null)
589 throw new NullPointerException();
590 if (delay <= 0L)
591 throw new IllegalArgumentException();
592 ScheduledFutureTask<Void> sft =
593 new ScheduledFutureTask<Void>(command,
594 null,
595 triggerTime(initialDelay, unit),
596 -unit.toNanos(delay),
597 sequencer.getAndIncrement());
598 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
599 sft.outerTask = t;
600 delayedExecute(t);
601 return t;
602 }
603
604 /**
605 * Executes {@code command} with zero required delay.
606 * This has effect equivalent to
607 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
608 * Note that inspections of the queue and of the list returned by
609 * {@code shutdownNow} will access the zero-delayed
610 * {@link ScheduledFuture}, not the {@code command} itself.
611 *
612 * <p>A consequence of the use of {@code ScheduledFuture} objects is
613 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
614 * called with a null second {@code Throwable} argument, even if the
615 * {@code command} terminated abruptly. Instead, the {@code Throwable}
616 * thrown by such a task can be obtained via {@link Future#get}.
617 *
618 * @throws RejectedExecutionException at discretion of
619 * {@code RejectedExecutionHandler}, if the task
620 * cannot be accepted for execution because the
621 * executor has been shut down
622 * @throws NullPointerException {@inheritDoc}
623 */
624 public void execute(Runnable command) {
625 schedule(command, 0, NANOSECONDS);
626 }
627
628 // Override AbstractExecutorService methods
629
630 /**
631 * @throws RejectedExecutionException {@inheritDoc}
632 * @throws NullPointerException {@inheritDoc}
633 */
634 public Future<?> submit(Runnable task) {
635 return schedule(task, 0, NANOSECONDS);
636 }
637
638 /**
639 * @throws RejectedExecutionException {@inheritDoc}
640 * @throws NullPointerException {@inheritDoc}
641 */
642 public <T> Future<T> submit(Runnable task, T result) {
643 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
644 }
645
646 /**
647 * @throws RejectedExecutionException {@inheritDoc}
648 * @throws NullPointerException {@inheritDoc}
649 */
650 public <T> Future<T> submit(Callable<T> task) {
651 return schedule(task, 0, NANOSECONDS);
652 }
653
654 /**
655 * Sets the policy on whether to continue executing existing
656 * periodic tasks even when this executor has been {@code shutdown}.
657 * In this case, these tasks will only terminate upon
658 * {@code shutdownNow} or after setting the policy to
659 * {@code false} when already shutdown.
660 * This value is by default {@code false}.
661 *
662 * @param value if {@code true}, continue after shutdown, else don't
663 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
664 */
665 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
666 continueExistingPeriodicTasksAfterShutdown = value;
667 if (!value && isShutdown())
668 onShutdown();
669 }
670
671 /**
672 * Gets the policy on whether to continue executing existing
673 * periodic tasks even when this executor has been {@code shutdown}.
674 * In this case, these tasks will only terminate upon
675 * {@code shutdownNow} or after setting the policy to
676 * {@code false} when already shutdown.
677 * This value is by default {@code false}.
678 *
679 * @return {@code true} if will continue after shutdown
680 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
681 */
682 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
683 return continueExistingPeriodicTasksAfterShutdown;
684 }
685
686 /**
687 * Sets the policy on whether to execute existing delayed
688 * tasks even when this executor has been {@code shutdown}.
689 * In this case, these tasks will only terminate upon
690 * {@code shutdownNow}, or after setting the policy to
691 * {@code false} when already shutdown.
692 * This value is by default {@code true}.
693 *
694 * @param value if {@code true}, execute after shutdown, else don't
695 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
696 */
697 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
698 executeExistingDelayedTasksAfterShutdown = value;
699 if (!value && isShutdown())
700 onShutdown();
701 }
702
703 /**
704 * Gets the policy on whether to execute existing delayed
705 * tasks even when this executor has been {@code shutdown}.
706 * In this case, these tasks will only terminate upon
707 * {@code shutdownNow}, or after setting the policy to
708 * {@code false} when already shutdown.
709 * This value is by default {@code true}.
710 *
711 * @return {@code true} if will execute after shutdown
712 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
713 */
714 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
715 return executeExistingDelayedTasksAfterShutdown;
716 }
717
718 /**
719 * Sets the policy on whether cancelled tasks should be immediately
720 * removed from the work queue at time of cancellation. This value is
721 * by default {@code false}.
722 *
723 * @param value if {@code true}, remove on cancellation, else don't
724 * @see #getRemoveOnCancelPolicy
725 * @since 1.7
726 */
727 public void setRemoveOnCancelPolicy(boolean value) {
728 removeOnCancel = value;
729 }
730
731 /**
732 * Gets the policy on whether cancelled tasks should be immediately
733 * removed from the work queue at time of cancellation. This value is
734 * by default {@code false}.
735 *
736 * @return {@code true} if cancelled tasks are immediately removed
737 * from the queue
738 * @see #setRemoveOnCancelPolicy
739 * @since 1.7
740 */
741 public boolean getRemoveOnCancelPolicy() {
742 return removeOnCancel;
743 }
744
745 /**
746 * Initiates an orderly shutdown in which previously submitted
747 * tasks are executed, but no new tasks will be accepted.
748 * Invocation has no additional effect if already shut down.
749 *
750 * <p>This method does not wait for previously submitted tasks to
751 * complete execution. Use {@link #awaitTermination awaitTermination}
752 * to do that.
753 *
754 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
755 * has been set {@code false}, existing delayed tasks whose delays
756 * have not yet elapsed are cancelled. And unless the {@code
757 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
758 * {@code true}, future executions of existing periodic tasks will
759 * be cancelled.
760 *
761 * @throws SecurityException {@inheritDoc}
762 */
763 public void shutdown() {
764 super.shutdown();
765 }
766
767 /**
768 * Attempts to stop all actively executing tasks, halts the
769 * processing of waiting tasks, and returns a list of the tasks
770 * that were awaiting execution. These tasks are drained (removed)
771 * from the task queue upon return from this method.
772 *
773 * <p>This method does not wait for actively executing tasks to
774 * terminate. Use {@link #awaitTermination awaitTermination} to
775 * do that.
776 *
777 * <p>There are no guarantees beyond best-effort attempts to stop
778 * processing actively executing tasks. This implementation
779 * interrupts tasks via {@link Thread#interrupt}; any task that
780 * fails to respond to interrupts may never terminate.
781 *
782 * @return list of tasks that never commenced execution.
783 * Each element of this list is a {@link ScheduledFuture}.
784 * For tasks submitted via one of the {@code schedule}
785 * methods, the element will be identical to the returned
786 * {@code ScheduledFuture}. For tasks submitted using
787 * {@link #execute execute}, the element will be a
788 * zero-delay {@code ScheduledFuture}.
789 * @throws SecurityException {@inheritDoc}
790 */
791 public List<Runnable> shutdownNow() {
792 return super.shutdownNow();
793 }
794
795 /**
796 * Returns the task queue used by this executor. Access to the
797 * task queue is intended primarily for debugging and monitoring.
798 * This queue may be in active use. Retrieving the task queue
799 * does not prevent queued tasks from executing.
800 *
801 * <p>Each element of this queue is a {@link ScheduledFuture}.
802 * For tasks submitted via one of the {@code schedule} methods, the
803 * element will be identical to the returned {@code ScheduledFuture}.
804 * For tasks submitted using {@link #execute execute}, the element
805 * will be a zero-delay {@code ScheduledFuture}.
806 *
807 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
808 * tasks in the order in which they will execute.
809 *
810 * @return the task queue
811 */
812 public BlockingQueue<Runnable> getQueue() {
813 return super.getQueue();
814 }
815
816 /**
817 * Specialized delay queue. To mesh with TPE declarations, this
818 * class must be declared as a BlockingQueue<Runnable> even though
819 * it can only hold RunnableScheduledFutures.
820 */
821 static class DelayedWorkQueue extends AbstractQueue<Runnable>
822 implements BlockingQueue<Runnable> {
823
824 /*
825 * A DelayedWorkQueue is based on a heap-based data structure
826 * like those in DelayQueue and PriorityQueue, except that
827 * every ScheduledFutureTask also records its index into the
828 * heap array. This eliminates the need to find a task upon
829 * cancellation, greatly speeding up removal (down from O(n)
830 * to O(log n)), and reducing garbage retention that would
831 * otherwise occur by waiting for the element to rise to top
832 * before clearing. But because the queue may also hold
833 * RunnableScheduledFutures that are not ScheduledFutureTasks,
834 * we are not guaranteed to have such indices available, in
835 * which case we fall back to linear search. (We expect that
836 * most tasks will not be decorated, and that the faster cases
837 * will be much more common.)
838 *
839 * All heap operations must record index changes -- mainly
840 * within siftUp and siftDown. Upon removal, a task's
841 * heapIndex is set to -1. Note that ScheduledFutureTasks can
842 * appear at most once in the queue (this need not be true for
843 * other kinds of tasks or work queues), so are uniquely
844 * identified by heapIndex.
845 */
846
847 private static final int INITIAL_CAPACITY = 16;
848 private RunnableScheduledFuture<?>[] queue =
849 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
850 private final ReentrantLock lock = new ReentrantLock();
851 private int size;
852
853 /**
854 * Thread designated to wait for the task at the head of the
855 * queue. This variant of the Leader-Follower pattern
856 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
857 * minimize unnecessary timed waiting. When a thread becomes
858 * the leader, it waits only for the next delay to elapse, but
859 * other threads await indefinitely. The leader thread must
860 * signal some other thread before returning from take() or
861 * poll(...), unless some other thread becomes leader in the
862 * interim. Whenever the head of the queue is replaced with a
863 * task with an earlier expiration time, the leader field is
864 * invalidated by being reset to null, and some waiting
865 * thread, but not necessarily the current leader, is
866 * signalled. So waiting threads must be prepared to acquire
867 * and lose leadership while waiting.
868 */
869 private Thread leader;
870
871 /**
872 * Condition signalled when a newer task becomes available at the
873 * head of the queue or a new thread may need to become leader.
874 */
875 private final Condition available = lock.newCondition();
876
877 /**
878 * Sets f's heapIndex if it is a ScheduledFutureTask.
879 */
880 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
881 if (f instanceof ScheduledFutureTask)
882 ((ScheduledFutureTask)f).heapIndex = idx;
883 }
884
885 /**
886 * Sifts element added at bottom up to its heap-ordered spot.
887 * Call only when holding lock.
888 */
889 private void siftUp(int k, RunnableScheduledFuture<?> key) {
890 while (k > 0) {
891 int parent = (k - 1) >>> 1;
892 RunnableScheduledFuture<?> e = queue[parent];
893 if (key.compareTo(e) >= 0)
894 break;
895 queue[k] = e;
896 setIndex(e, k);
897 k = parent;
898 }
899 queue[k] = key;
900 setIndex(key, k);
901 }
902
903 /**
904 * Sifts element added at top down to its heap-ordered spot.
905 * Call only when holding lock.
906 */
907 private void siftDown(int k, RunnableScheduledFuture<?> key) {
908 int half = size >>> 1;
909 while (k < half) {
910 int child = (k << 1) + 1;
911 RunnableScheduledFuture<?> c = queue[child];
912 int right = child + 1;
913 if (right < size && c.compareTo(queue[right]) > 0)
914 c = queue[child = right];
915 if (key.compareTo(c) <= 0)
916 break;
917 queue[k] = c;
918 setIndex(c, k);
919 k = child;
920 }
921 queue[k] = key;
922 setIndex(key, k);
923 }
924
925 /**
926 * Resizes the heap array. Call only when holding lock.
927 */
928 private void grow() {
929 int oldCapacity = queue.length;
930 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
931 if (newCapacity < 0) // overflow
932 newCapacity = Integer.MAX_VALUE;
933 queue = Arrays.copyOf(queue, newCapacity);
934 }
935
936 /**
937 * Finds index of given object, or -1 if absent.
938 */
939 private int indexOf(Object x) {
940 if (x != null) {
941 if (x instanceof ScheduledFutureTask) {
942 int i = ((ScheduledFutureTask) x).heapIndex;
943 // Sanity check; x could conceivably be a
944 // ScheduledFutureTask from some other pool.
945 if (i >= 0 && i < size && queue[i] == x)
946 return i;
947 } else {
948 for (int i = 0; i < size; i++)
949 if (x.equals(queue[i]))
950 return i;
951 }
952 }
953 return -1;
954 }
955
956 public boolean contains(Object x) {
957 final ReentrantLock lock = this.lock;
958 lock.lock();
959 try {
960 return indexOf(x) != -1;
961 } finally {
962 lock.unlock();
963 }
964 }
965
966 public boolean remove(Object x) {
967 final ReentrantLock lock = this.lock;
968 lock.lock();
969 try {
970 int i = indexOf(x);
971 if (i < 0)
972 return false;
973
974 setIndex(queue[i], -1);
975 int s = --size;
976 RunnableScheduledFuture<?> replacement = queue[s];
977 queue[s] = null;
978 if (s != i) {
979 siftDown(i, replacement);
980 if (queue[i] == replacement)
981 siftUp(i, replacement);
982 }
983 return true;
984 } finally {
985 lock.unlock();
986 }
987 }
988
989 public int size() {
990 final ReentrantLock lock = this.lock;
991 lock.lock();
992 try {
993 return size;
994 } finally {
995 lock.unlock();
996 }
997 }
998
999 public boolean isEmpty() {
1000 return size() == 0;
1001 }
1002
1003 public int remainingCapacity() {
1004 return Integer.MAX_VALUE;
1005 }
1006
1007 public RunnableScheduledFuture<?> peek() {
1008 final ReentrantLock lock = this.lock;
1009 lock.lock();
1010 try {
1011 return queue[0];
1012 } finally {
1013 lock.unlock();
1014 }
1015 }
1016
1017 public boolean offer(Runnable x) {
1018 if (x == null)
1019 throw new NullPointerException();
1020 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1021 final ReentrantLock lock = this.lock;
1022 lock.lock();
1023 try {
1024 int i = size;
1025 if (i >= queue.length)
1026 grow();
1027 size = i + 1;
1028 if (i == 0) {
1029 queue[0] = e;
1030 setIndex(e, 0);
1031 } else {
1032 siftUp(i, e);
1033 }
1034 if (queue[0] == e) {
1035 leader = null;
1036 available.signal();
1037 }
1038 } finally {
1039 lock.unlock();
1040 }
1041 return true;
1042 }
1043
1044 public void put(Runnable e) {
1045 offer(e);
1046 }
1047
1048 public boolean add(Runnable e) {
1049 return offer(e);
1050 }
1051
1052 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1053 return offer(e);
1054 }
1055
1056 /**
1057 * Performs common bookkeeping for poll and take: Replaces
1058 * first element with last and sifts it down. Call only when
1059 * holding lock.
1060 * @param f the task to remove and return
1061 */
1062 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1063 int s = --size;
1064 RunnableScheduledFuture<?> x = queue[s];
1065 queue[s] = null;
1066 if (s != 0)
1067 siftDown(0, x);
1068 setIndex(f, -1);
1069 return f;
1070 }
1071
1072 public RunnableScheduledFuture<?> poll() {
1073 final ReentrantLock lock = this.lock;
1074 lock.lock();
1075 try {
1076 RunnableScheduledFuture<?> first = queue[0];
1077 return (first == null || first.getDelay(NANOSECONDS) > 0)
1078 ? null
1079 : finishPoll(first);
1080 } finally {
1081 lock.unlock();
1082 }
1083 }
1084
1085 public RunnableScheduledFuture<?> take() throws InterruptedException {
1086 final ReentrantLock lock = this.lock;
1087 lock.lockInterruptibly();
1088 try {
1089 for (;;) {
1090 RunnableScheduledFuture<?> first = queue[0];
1091 if (first == null)
1092 available.await();
1093 else {
1094 long delay = first.getDelay(NANOSECONDS);
1095 if (delay <= 0L)
1096 return finishPoll(first);
1097 first = null; // don't retain ref while waiting
1098 if (leader != null)
1099 available.await();
1100 else {
1101 Thread thisThread = Thread.currentThread();
1102 leader = thisThread;
1103 try {
1104 available.awaitNanos(delay);
1105 } finally {
1106 if (leader == thisThread)
1107 leader = null;
1108 }
1109 }
1110 }
1111 }
1112 } finally {
1113 if (leader == null && queue[0] != null)
1114 available.signal();
1115 lock.unlock();
1116 }
1117 }
1118
1119 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1120 throws InterruptedException {
1121 long nanos = unit.toNanos(timeout);
1122 final ReentrantLock lock = this.lock;
1123 lock.lockInterruptibly();
1124 try {
1125 for (;;) {
1126 RunnableScheduledFuture<?> first = queue[0];
1127 if (first == null) {
1128 if (nanos <= 0L)
1129 return null;
1130 else
1131 nanos = available.awaitNanos(nanos);
1132 } else {
1133 long delay = first.getDelay(NANOSECONDS);
1134 if (delay <= 0L)
1135 return finishPoll(first);
1136 if (nanos <= 0L)
1137 return null;
1138 first = null; // don't retain ref while waiting
1139 if (nanos < delay || leader != null)
1140 nanos = available.awaitNanos(nanos);
1141 else {
1142 Thread thisThread = Thread.currentThread();
1143 leader = thisThread;
1144 try {
1145 long timeLeft = available.awaitNanos(delay);
1146 nanos -= delay - timeLeft;
1147 } finally {
1148 if (leader == thisThread)
1149 leader = null;
1150 }
1151 }
1152 }
1153 }
1154 } finally {
1155 if (leader == null && queue[0] != null)
1156 available.signal();
1157 lock.unlock();
1158 }
1159 }
1160
1161 public void clear() {
1162 final ReentrantLock lock = this.lock;
1163 lock.lock();
1164 try {
1165 for (int i = 0; i < size; i++) {
1166 RunnableScheduledFuture<?> t = queue[i];
1167 if (t != null) {
1168 queue[i] = null;
1169 setIndex(t, -1);
1170 }
1171 }
1172 size = 0;
1173 } finally {
1174 lock.unlock();
1175 }
1176 }
1177
1178 public int drainTo(Collection<? super Runnable> c) {
1179 return drainTo(c, Integer.MAX_VALUE);
1180 }
1181
1182 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1183 Objects.requireNonNull(c);
1184 if (c == this)
1185 throw new IllegalArgumentException();
1186 if (maxElements <= 0)
1187 return 0;
1188 final ReentrantLock lock = this.lock;
1189 lock.lock();
1190 try {
1191 int n = 0;
1192 for (RunnableScheduledFuture<?> first;
1193 n < maxElements
1194 && (first = queue[0]) != null
1195 && first.getDelay(NANOSECONDS) <= 0;) {
1196 c.add(first); // In this order, in case add() throws.
1197 finishPoll(first);
1198 ++n;
1199 }
1200 return n;
1201 } finally {
1202 lock.unlock();
1203 }
1204 }
1205
1206 public Object[] toArray() {
1207 final ReentrantLock lock = this.lock;
1208 lock.lock();
1209 try {
1210 return Arrays.copyOf(queue, size, Object[].class);
1211 } finally {
1212 lock.unlock();
1213 }
1214 }
1215
1216 @SuppressWarnings("unchecked")
1217 public <T> T[] toArray(T[] a) {
1218 final ReentrantLock lock = this.lock;
1219 lock.lock();
1220 try {
1221 if (a.length < size)
1222 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1223 System.arraycopy(queue, 0, a, 0, size);
1224 if (a.length > size)
1225 a[size] = null;
1226 return a;
1227 } finally {
1228 lock.unlock();
1229 }
1230 }
1231
1232 public Iterator<Runnable> iterator() {
1233 final ReentrantLock lock = this.lock;
1234 lock.lock();
1235 try {
1236 return new Itr(Arrays.copyOf(queue, size));
1237 } finally {
1238 lock.unlock();
1239 }
1240 }
1241
1242 /**
1243 * Snapshot iterator that works off copy of underlying q array.
1244 */
1245 private class Itr implements Iterator<Runnable> {
1246 final RunnableScheduledFuture<?>[] array;
1247 int cursor; // index of next element to return; initially 0
1248 int lastRet = -1; // index of last element returned; -1 if no such
1249
1250 Itr(RunnableScheduledFuture<?>[] array) {
1251 this.array = array;
1252 }
1253
1254 public boolean hasNext() {
1255 return cursor < array.length;
1256 }
1257
1258 public Runnable next() {
1259 if (cursor >= array.length)
1260 throw new NoSuchElementException();
1261 return array[lastRet = cursor++];
1262 }
1263
1264 public void remove() {
1265 if (lastRet < 0)
1266 throw new IllegalStateException();
1267 DelayedWorkQueue.this.remove(array[lastRet]);
1268 lastRet = -1;
1269 }
1270 }
1271 }
1272 }