ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.111
Committed: Thu Mar 30 20:38:51 2017 UTC (7 years, 1 month ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.110: +55 -36 lines
Log Message:
rework the spec for periodic task execution after shutdown

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Objects;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Condition;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 /**
24 * A {@link ThreadPoolExecutor} that can additionally schedule
25 * commands to run after a given delay, or to execute periodically.
26 * This class is preferable to {@link java.util.Timer} when multiple
27 * worker threads are needed, or when the additional flexibility or
28 * capabilities of {@link ThreadPoolExecutor} (which this class
29 * extends) are required.
30 *
31 * <p>Delayed tasks execute no sooner than they are enabled, but
32 * without any real-time guarantees about when, after they are
33 * enabled, they will commence. Tasks scheduled for exactly the same
34 * execution time are enabled in first-in-first-out (FIFO) order of
35 * submission.
36 *
37 * <p>When a submitted task is cancelled before it is run, execution
38 * is suppressed. By default, such a cancelled task is not
39 * automatically removed from the work queue until its delay elapses.
40 * While this enables further inspection and monitoring, it may also
41 * cause unbounded retention of cancelled tasks. To avoid this, use
42 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43 * removed from the work queue at time of cancellation.
44 *
45 * <p>Successive executions of a periodic task scheduled via
46 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48 * do not overlap. While different executions may be performed by
49 * different threads, the effects of prior executions
50 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 * those of subsequent ones.
52 *
53 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
54 * of the inherited tuning methods are not useful for it. In
55 * particular, because it acts as a fixed-sized pool using
56 * {@code corePoolSize} threads and an unbounded queue, adjustments
57 * to {@code maximumPoolSize} have no useful effect. Additionally, it
58 * is almost never a good idea to set {@code corePoolSize} to zero or
59 * use {@code allowCoreThreadTimeOut} because this may leave the pool
60 * without threads to handle tasks once they become eligible to run.
61 *
62 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
63 * this class uses {@link Executors#defaultThreadFactory} as the
64 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
65 * as the default rejected execution handler.
66 *
67 * <p><b>Extension notes:</b> This class overrides the
68 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
69 * {@link AbstractExecutorService#submit(Runnable) submit}
70 * methods to generate internal {@link ScheduledFuture} objects to
71 * control per-task delays and scheduling. To preserve
72 * functionality, any further overrides of these methods in
73 * subclasses must invoke superclass versions, which effectively
74 * disables additional task customization. However, this class
75 * provides alternative protected extension method
76 * {@code decorateTask} (one version each for {@code Runnable} and
77 * {@code Callable}) that can be used to customize the concrete task
78 * types used to execute commands entered via {@code execute},
79 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
80 * and {@code scheduleWithFixedDelay}. By default, a
81 * {@code ScheduledThreadPoolExecutor} uses a task type extending
82 * {@link FutureTask}. However, this may be modified or replaced using
83 * subclasses of the form:
84 *
85 * <pre> {@code
86 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
87 *
88 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
89 *
90 * protected <V> RunnableScheduledFuture<V> decorateTask(
91 * Runnable r, RunnableScheduledFuture<V> task) {
92 * return new CustomTask<V>(r, task);
93 * }
94 *
95 * protected <V> RunnableScheduledFuture<V> decorateTask(
96 * Callable<V> c, RunnableScheduledFuture<V> task) {
97 * return new CustomTask<V>(c, task);
98 * }
99 * // ... add constructors, etc.
100 * }}</pre>
101 *
102 * @since 1.5
103 * @author Doug Lea
104 */
105 public class ScheduledThreadPoolExecutor
106 extends ThreadPoolExecutor
107 implements ScheduledExecutorService {
108
109 /*
110 * This class specializes ThreadPoolExecutor implementation by
111 *
112 * 1. Using a custom task type ScheduledFutureTask, even for tasks
113 * that don't require scheduling because they are submitted
114 * using ExecutorService rather than ScheduledExecutorService
115 * methods, which are treated as tasks with a delay of zero.
116 *
117 * 2. Using a custom queue (DelayedWorkQueue), a variant of
118 * unbounded DelayQueue. The lack of capacity constraint and
119 * the fact that corePoolSize and maximumPoolSize are
120 * effectively identical simplifies some execution mechanics
121 * (see delayedExecute) compared to ThreadPoolExecutor.
122 *
123 * 3. Supporting optional run-after-shutdown parameters, which
124 * leads to overrides of shutdown methods to remove and cancel
125 * tasks that should NOT be run after shutdown, as well as
126 * different recheck logic when task (re)submission overlaps
127 * with a shutdown.
128 *
129 * 4. Task decoration methods to allow interception and
130 * instrumentation, which are needed because subclasses cannot
131 * otherwise override submit methods to get this effect. These
132 * don't have any impact on pool control logic though.
133 */
134
135 /**
136 * False if should cancel/suppress periodic tasks on shutdown.
137 */
138 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
139
140 /**
141 * False if should cancel non-periodic not-yet-expired tasks on shutdown.
142 */
143 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
144
145 /**
146 * True if ScheduledFutureTask.cancel should remove from queue.
147 */
148 volatile boolean removeOnCancel;
149
150 /**
151 * Sequence number to break scheduling ties, and in turn to
152 * guarantee FIFO order among tied entries.
153 */
154 private static final AtomicLong sequencer = new AtomicLong();
155
156 private class ScheduledFutureTask<V>
157 extends FutureTask<V> implements RunnableScheduledFuture<V> {
158
159 /** Sequence number to break ties FIFO */
160 private final long sequenceNumber;
161
162 /** The nanoTime-based time when the task is enabled to execute. */
163 private volatile long time;
164
165 /**
166 * Period for repeating tasks, in nanoseconds.
167 * A positive value indicates fixed-rate execution.
168 * A negative value indicates fixed-delay execution.
169 * A value of 0 indicates a non-repeating (one-shot) task.
170 */
171 private final long period;
172
173 /** The actual task to be re-enqueued by reExecutePeriodic */
174 RunnableScheduledFuture<V> outerTask = this;
175
176 /**
177 * Index into delay queue, to support faster cancellation.
178 */
179 int heapIndex;
180
181 /**
182 * Creates a one-shot action with given nanoTime-based trigger time.
183 */
184 ScheduledFutureTask(Runnable r, V result, long triggerTime,
185 long sequenceNumber) {
186 super(r, result);
187 this.time = triggerTime;
188 this.period = 0;
189 this.sequenceNumber = sequenceNumber;
190 }
191
192 /**
193 * Creates a periodic action with given nanoTime-based initial
194 * trigger time and period.
195 */
196 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197 long period, long sequenceNumber) {
198 super(r, result);
199 this.time = triggerTime;
200 this.period = period;
201 this.sequenceNumber = sequenceNumber;
202 }
203
204 /**
205 * Creates a one-shot action with given nanoTime-based trigger time.
206 */
207 ScheduledFutureTask(Callable<V> callable, long triggerTime,
208 long sequenceNumber) {
209 super(callable);
210 this.time = triggerTime;
211 this.period = 0;
212 this.sequenceNumber = sequenceNumber;
213 }
214
215 public long getDelay(TimeUnit unit) {
216 return unit.convert(time - System.nanoTime(), NANOSECONDS);
217 }
218
219 public int compareTo(Delayed other) {
220 if (other == this) // compare zero if same object
221 return 0;
222 if (other instanceof ScheduledFutureTask) {
223 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
224 long diff = time - x.time;
225 if (diff < 0)
226 return -1;
227 else if (diff > 0)
228 return 1;
229 else if (sequenceNumber < x.sequenceNumber)
230 return -1;
231 else
232 return 1;
233 }
234 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
235 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
236 }
237
238 /**
239 * Returns {@code true} if this is a periodic (not a one-shot) action.
240 *
241 * @return {@code true} if periodic
242 */
243 public boolean isPeriodic() {
244 return period != 0;
245 }
246
247 /**
248 * Sets the next time to run for a periodic task.
249 */
250 private void setNextRunTime() {
251 long p = period;
252 if (p > 0)
253 time += p;
254 else
255 time = triggerTime(-p);
256 }
257
258 public boolean cancel(boolean mayInterruptIfRunning) {
259 // The racy read of heapIndex below is benign:
260 // if heapIndex < 0, then OOTA guarantees that we have surely
261 // been removed; else we recheck under lock in remove()
262 boolean cancelled = super.cancel(mayInterruptIfRunning);
263 if (cancelled && removeOnCancel && heapIndex >= 0)
264 remove(this);
265 return cancelled;
266 }
267
268 /**
269 * Overrides FutureTask version so as to reset/requeue if periodic.
270 */
271 public void run() {
272 if (!canRunInCurrentRunState(this))
273 cancel(false);
274 else if (!isPeriodic())
275 super.run();
276 else if (super.runAndReset()) {
277 setNextRunTime();
278 reExecutePeriodic(outerTask);
279 }
280 }
281 }
282
283 /**
284 * Returns true if can run a task given current run state and
285 * run-after-shutdown parameters.
286 */
287 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
288 if (!isShutdown())
289 return true;
290 if (isStopped())
291 return false;
292 return task.isPeriodic()
293 ? continueExistingPeriodicTasksAfterShutdown
294 : (executeExistingDelayedTasksAfterShutdown
295 || task.getDelay(NANOSECONDS) <= 0);
296 }
297
298 /**
299 * Main execution method for delayed or periodic tasks. If pool
300 * is shut down, rejects the task. Otherwise adds task to queue
301 * and starts a thread, if necessary, to run it. (We cannot
302 * prestart the thread to run the task because the task (probably)
303 * shouldn't be run yet.) If the pool is shut down while the task
304 * is being added, cancel and remove it if required by state and
305 * run-after-shutdown parameters.
306 *
307 * @param task the task
308 */
309 private void delayedExecute(RunnableScheduledFuture<?> task) {
310 if (isShutdown())
311 reject(task);
312 else {
313 super.getQueue().add(task);
314 if (!canRunInCurrentRunState(task) && remove(task))
315 task.cancel(false);
316 else
317 ensurePrestart();
318 }
319 }
320
321 /**
322 * Requeues a periodic task unless current run state precludes it.
323 * Same idea as delayedExecute except drops task rather than rejecting.
324 *
325 * @param task the task
326 */
327 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
328 if (canRunInCurrentRunState(task)) {
329 super.getQueue().add(task);
330 if (canRunInCurrentRunState(task) || !remove(task)) {
331 ensurePrestart();
332 return;
333 }
334 }
335 task.cancel(false);
336 }
337
338 /**
339 * Cancels and clears the queue of all tasks that should not be run
340 * due to shutdown policy. Invoked within super.shutdown.
341 */
342 @Override void onShutdown() {
343 BlockingQueue<Runnable> q = super.getQueue();
344 boolean keepDelayed =
345 getExecuteExistingDelayedTasksAfterShutdownPolicy();
346 boolean keepPeriodic =
347 getContinueExistingPeriodicTasksAfterShutdownPolicy();
348 // Traverse snapshot to avoid iterator exceptions
349 // TODO: implement and use efficient removeIf
350 // super.getQueue().removeIf(...);
351 for (Object e : q.toArray()) {
352 if (e instanceof RunnableScheduledFuture) {
353 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
354 if ((t.isPeriodic()
355 ? !keepPeriodic
356 : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
357 || t.isCancelled()) { // also remove if already cancelled
358 if (q.remove(t))
359 t.cancel(false);
360 }
361 }
362 }
363 tryTerminate();
364 }
365
366 /**
367 * Modifies or replaces the task used to execute a runnable.
368 * This method can be used to override the concrete
369 * class used for managing internal tasks.
370 * The default implementation simply returns the given task.
371 *
372 * @param runnable the submitted Runnable
373 * @param task the task created to execute the runnable
374 * @param <V> the type of the task's result
375 * @return a task that can execute the runnable
376 * @since 1.6
377 */
378 protected <V> RunnableScheduledFuture<V> decorateTask(
379 Runnable runnable, RunnableScheduledFuture<V> task) {
380 return task;
381 }
382
383 /**
384 * Modifies or replaces the task used to execute a callable.
385 * This method can be used to override the concrete
386 * class used for managing internal tasks.
387 * The default implementation simply returns the given task.
388 *
389 * @param callable the submitted Callable
390 * @param task the task created to execute the callable
391 * @param <V> the type of the task's result
392 * @return a task that can execute the callable
393 * @since 1.6
394 */
395 protected <V> RunnableScheduledFuture<V> decorateTask(
396 Callable<V> callable, RunnableScheduledFuture<V> task) {
397 return task;
398 }
399
400 /**
401 * The default keep-alive time for pool threads.
402 *
403 * Normally, this value is unused because all pool threads will be
404 * core threads, but if a user creates a pool with a corePoolSize
405 * of zero (against our advice), we keep a thread alive as long as
406 * there are queued tasks. If the keep alive time is zero (the
407 * historic value), we end up hot-spinning in getTask, wasting a
408 * CPU. But on the other hand, if we set the value too high, and
409 * users create a one-shot pool which they don't cleanly shutdown,
410 * the pool's non-daemon threads will prevent JVM termination. A
411 * small but non-zero value (relative to a JVM's lifetime) seems
412 * best.
413 */
414 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
415
416 /**
417 * Creates a new {@code ScheduledThreadPoolExecutor} with the
418 * given core pool size.
419 *
420 * @param corePoolSize the number of threads to keep in the pool, even
421 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
422 * @throws IllegalArgumentException if {@code corePoolSize < 0}
423 */
424 public ScheduledThreadPoolExecutor(int corePoolSize) {
425 super(corePoolSize, Integer.MAX_VALUE,
426 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
427 new DelayedWorkQueue());
428 }
429
430 /**
431 * Creates a new {@code ScheduledThreadPoolExecutor} with the
432 * given initial parameters.
433 *
434 * @param corePoolSize the number of threads to keep in the pool, even
435 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
436 * @param threadFactory the factory to use when the executor
437 * creates a new thread
438 * @throws IllegalArgumentException if {@code corePoolSize < 0}
439 * @throws NullPointerException if {@code threadFactory} is null
440 */
441 public ScheduledThreadPoolExecutor(int corePoolSize,
442 ThreadFactory threadFactory) {
443 super(corePoolSize, Integer.MAX_VALUE,
444 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
445 new DelayedWorkQueue(), threadFactory);
446 }
447
448 /**
449 * Creates a new {@code ScheduledThreadPoolExecutor} with the
450 * given initial parameters.
451 *
452 * @param corePoolSize the number of threads to keep in the pool, even
453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 * @param handler the handler to use when execution is blocked
455 * because the thread bounds and queue capacities are reached
456 * @throws IllegalArgumentException if {@code corePoolSize < 0}
457 * @throws NullPointerException if {@code handler} is null
458 */
459 public ScheduledThreadPoolExecutor(int corePoolSize,
460 RejectedExecutionHandler handler) {
461 super(corePoolSize, Integer.MAX_VALUE,
462 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463 new DelayedWorkQueue(), handler);
464 }
465
466 /**
467 * Creates a new {@code ScheduledThreadPoolExecutor} with the
468 * given initial parameters.
469 *
470 * @param corePoolSize the number of threads to keep in the pool, even
471 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
472 * @param threadFactory the factory to use when the executor
473 * creates a new thread
474 * @param handler the handler to use when execution is blocked
475 * because the thread bounds and queue capacities are reached
476 * @throws IllegalArgumentException if {@code corePoolSize < 0}
477 * @throws NullPointerException if {@code threadFactory} or
478 * {@code handler} is null
479 */
480 public ScheduledThreadPoolExecutor(int corePoolSize,
481 ThreadFactory threadFactory,
482 RejectedExecutionHandler handler) {
483 super(corePoolSize, Integer.MAX_VALUE,
484 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
485 new DelayedWorkQueue(), threadFactory, handler);
486 }
487
488 /**
489 * Returns the nanoTime-based trigger time of a delayed action.
490 */
491 private long triggerTime(long delay, TimeUnit unit) {
492 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493 }
494
495 /**
496 * Returns the nanoTime-based trigger time of a delayed action.
497 */
498 long triggerTime(long delay) {
499 return System.nanoTime() +
500 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501 }
502
503 /**
504 * Constrains the values of all delays in the queue to be within
505 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506 * This may occur if a task is eligible to be dequeued, but has
507 * not yet been, while some other task is added with a delay of
508 * Long.MAX_VALUE.
509 */
510 private long overflowFree(long delay) {
511 Delayed head = (Delayed) super.getQueue().peek();
512 if (head != null) {
513 long headDelay = head.getDelay(NANOSECONDS);
514 if (headDelay < 0 && (delay - headDelay < 0))
515 delay = Long.MAX_VALUE + headDelay;
516 }
517 return delay;
518 }
519
520 /**
521 * @throws RejectedExecutionException {@inheritDoc}
522 * @throws NullPointerException {@inheritDoc}
523 */
524 public ScheduledFuture<?> schedule(Runnable command,
525 long delay,
526 TimeUnit unit) {
527 if (command == null || unit == null)
528 throw new NullPointerException();
529 RunnableScheduledFuture<Void> t = decorateTask(command,
530 new ScheduledFutureTask<Void>(command, null,
531 triggerTime(delay, unit),
532 sequencer.getAndIncrement()));
533 delayedExecute(t);
534 return t;
535 }
536
537 /**
538 * @throws RejectedExecutionException {@inheritDoc}
539 * @throws NullPointerException {@inheritDoc}
540 */
541 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542 long delay,
543 TimeUnit unit) {
544 if (callable == null || unit == null)
545 throw new NullPointerException();
546 RunnableScheduledFuture<V> t = decorateTask(callable,
547 new ScheduledFutureTask<V>(callable,
548 triggerTime(delay, unit),
549 sequencer.getAndIncrement()));
550 delayedExecute(t);
551 return t;
552 }
553
554 /**
555 * Submits a periodic action that becomes enabled first after the
556 * given initial delay, and subsequently with the given period;
557 * that is, executions will commence after
558 * {@code initialDelay}, then {@code initialDelay + period}, then
559 * {@code initialDelay + 2 * period}, and so on.
560 *
561 * <p>The sequence of task executions continues indefinitely until
562 * one of the following exceptional completions occur:
563 * <ul>
564 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
565 * via the returned future.
566 * <li>Method {@link #shutdown} is called and the {@linkplain
567 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
568 * whether to continue after shutdown} is not set true, or method
569 * {@link #shutdownNow} is called; also resulting in task
570 * cancellation.
571 * <li>An execution of the task throws an exception. In this case
572 * calling {@link Future#get() get} on the returned future will throw
573 * {@link ExecutionException}, holding the exception as its cause.
574 * </ul>
575 * Subsequent executions are suppressed. Subsequent calls to
576 * {@link Future#isDone isDone()} on the returned future will
577 * return {@code true}.
578 *
579 * <p>If any execution of this task takes longer than its period, then
580 * subsequent executions may start late, but will not concurrently
581 * execute.
582 *
583 * @throws RejectedExecutionException {@inheritDoc}
584 * @throws NullPointerException {@inheritDoc}
585 * @throws IllegalArgumentException {@inheritDoc}
586 */
587 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
588 long initialDelay,
589 long period,
590 TimeUnit unit) {
591 if (command == null || unit == null)
592 throw new NullPointerException();
593 if (period <= 0L)
594 throw new IllegalArgumentException();
595 ScheduledFutureTask<Void> sft =
596 new ScheduledFutureTask<Void>(command,
597 null,
598 triggerTime(initialDelay, unit),
599 unit.toNanos(period),
600 sequencer.getAndIncrement());
601 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
602 sft.outerTask = t;
603 delayedExecute(t);
604 return t;
605 }
606
607 /**
608 * Submits a periodic action that becomes enabled first after the
609 * given initial delay, and subsequently with the given delay
610 * between the termination of one execution and the commencement of
611 * the next.
612 *
613 * <p>The sequence of task executions continues indefinitely until
614 * one of the following exceptional completions occur:
615 * <ul>
616 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
617 * via the returned future.
618 * <li>Method {@link #shutdown} is called and the {@linkplain
619 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
620 * whether to continue after shutdown} is not set true, or method
621 * {@link #shutdownNow} is called; also resulting in task
622 * cancellation.
623 * <li>An execution of the task throws an exception. In this case
624 * calling {@link Future#get() get} on the returned future will throw
625 * {@link ExecutionException}, holding the exception as its cause.
626 * </ul>
627 * Subsequent executions are suppressed. Subsequent calls to
628 * {@link Future#isDone isDone()} on the returned future will
629 * return {@code true}.
630 *
631 * @throws RejectedExecutionException {@inheritDoc}
632 * @throws NullPointerException {@inheritDoc}
633 * @throws IllegalArgumentException {@inheritDoc}
634 */
635 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
636 long initialDelay,
637 long delay,
638 TimeUnit unit) {
639 if (command == null || unit == null)
640 throw new NullPointerException();
641 if (delay <= 0L)
642 throw new IllegalArgumentException();
643 ScheduledFutureTask<Void> sft =
644 new ScheduledFutureTask<Void>(command,
645 null,
646 triggerTime(initialDelay, unit),
647 -unit.toNanos(delay),
648 sequencer.getAndIncrement());
649 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
650 sft.outerTask = t;
651 delayedExecute(t);
652 return t;
653 }
654
655 /**
656 * Executes {@code command} with zero required delay.
657 * This has effect equivalent to
658 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
659 * Note that inspections of the queue and of the list returned by
660 * {@code shutdownNow} will access the zero-delayed
661 * {@link ScheduledFuture}, not the {@code command} itself.
662 *
663 * <p>A consequence of the use of {@code ScheduledFuture} objects is
664 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
665 * called with a null second {@code Throwable} argument, even if the
666 * {@code command} terminated abruptly. Instead, the {@code Throwable}
667 * thrown by such a task can be obtained via {@link Future#get}.
668 *
669 * @throws RejectedExecutionException at discretion of
670 * {@code RejectedExecutionHandler}, if the task
671 * cannot be accepted for execution because the
672 * executor has been shut down
673 * @throws NullPointerException {@inheritDoc}
674 */
675 public void execute(Runnable command) {
676 schedule(command, 0, NANOSECONDS);
677 }
678
679 // Override AbstractExecutorService methods
680
681 /**
682 * @throws RejectedExecutionException {@inheritDoc}
683 * @throws NullPointerException {@inheritDoc}
684 */
685 public Future<?> submit(Runnable task) {
686 return schedule(task, 0, NANOSECONDS);
687 }
688
689 /**
690 * @throws RejectedExecutionException {@inheritDoc}
691 * @throws NullPointerException {@inheritDoc}
692 */
693 public <T> Future<T> submit(Runnable task, T result) {
694 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
695 }
696
697 /**
698 * @throws RejectedExecutionException {@inheritDoc}
699 * @throws NullPointerException {@inheritDoc}
700 */
701 public <T> Future<T> submit(Callable<T> task) {
702 return schedule(task, 0, NANOSECONDS);
703 }
704
705 /**
706 * Sets the policy on whether to continue executing existing
707 * periodic tasks even when this executor has been {@code shutdown}.
708 * In this case, executions will continue until {@code shutdownNow}
709 * or the policy is set to {@code false} when already shutdown.
710 * This value is by default {@code false}.
711 *
712 * @param value if {@code true}, continue after shutdown, else don't
713 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
714 */
715 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
716 continueExistingPeriodicTasksAfterShutdown = value;
717 if (!value && isShutdown())
718 onShutdown();
719 }
720
721 /**
722 * Gets the policy on whether to continue executing existing
723 * periodic tasks even when this executor has been {@code shutdown}.
724 * In this case, executions will continue until {@code shutdownNow}
725 * or the policy is set to {@code false} when already shutdown.
726 * This value is by default {@code false}.
727 *
728 * @return {@code true} if will continue after shutdown
729 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
730 */
731 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
732 return continueExistingPeriodicTasksAfterShutdown;
733 }
734
735 /**
736 * Sets the policy on whether to execute existing delayed
737 * tasks even when this executor has been {@code shutdown}.
738 * In this case, these tasks will only terminate upon
739 * {@code shutdownNow}, or after setting the policy to
740 * {@code false} when already shutdown.
741 * This value is by default {@code true}.
742 *
743 * @param value if {@code true}, execute after shutdown, else don't
744 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
745 */
746 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
747 executeExistingDelayedTasksAfterShutdown = value;
748 if (!value && isShutdown())
749 onShutdown();
750 }
751
752 /**
753 * Gets the policy on whether to execute existing delayed
754 * tasks even when this executor has been {@code shutdown}.
755 * In this case, these tasks will only terminate upon
756 * {@code shutdownNow}, or after setting the policy to
757 * {@code false} when already shutdown.
758 * This value is by default {@code true}.
759 *
760 * @return {@code true} if will execute after shutdown
761 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
762 */
763 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
764 return executeExistingDelayedTasksAfterShutdown;
765 }
766
767 /**
768 * Sets the policy on whether cancelled tasks should be immediately
769 * removed from the work queue at time of cancellation. This value is
770 * by default {@code false}.
771 *
772 * @param value if {@code true}, remove on cancellation, else don't
773 * @see #getRemoveOnCancelPolicy
774 * @since 1.7
775 */
776 public void setRemoveOnCancelPolicy(boolean value) {
777 removeOnCancel = value;
778 }
779
780 /**
781 * Gets the policy on whether cancelled tasks should be immediately
782 * removed from the work queue at time of cancellation. This value is
783 * by default {@code false}.
784 *
785 * @return {@code true} if cancelled tasks are immediately removed
786 * from the queue
787 * @see #setRemoveOnCancelPolicy
788 * @since 1.7
789 */
790 public boolean getRemoveOnCancelPolicy() {
791 return removeOnCancel;
792 }
793
794 /**
795 * Initiates an orderly shutdown in which previously submitted
796 * tasks are executed, but no new tasks will be accepted.
797 * Invocation has no additional effect if already shut down.
798 *
799 * <p>This method does not wait for previously submitted tasks to
800 * complete execution. Use {@link #awaitTermination awaitTermination}
801 * to do that.
802 *
803 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
804 * has been set {@code false}, existing delayed tasks whose delays
805 * have not yet elapsed are cancelled. And unless the {@code
806 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
807 * {@code true}, future executions of existing periodic tasks will
808 * be cancelled.
809 *
810 * @throws SecurityException {@inheritDoc}
811 */
812 public void shutdown() {
813 super.shutdown();
814 }
815
816 /**
817 * Attempts to stop all actively executing tasks, halts the
818 * processing of waiting tasks, and returns a list of the tasks
819 * that were awaiting execution. These tasks are drained (removed)
820 * from the task queue upon return from this method.
821 *
822 * <p>This method does not wait for actively executing tasks to
823 * terminate. Use {@link #awaitTermination awaitTermination} to
824 * do that.
825 *
826 * <p>There are no guarantees beyond best-effort attempts to stop
827 * processing actively executing tasks. This implementation
828 * interrupts tasks via {@link Thread#interrupt}; any task that
829 * fails to respond to interrupts may never terminate.
830 *
831 * @return list of tasks that never commenced execution.
832 * Each element of this list is a {@link ScheduledFuture}.
833 * For tasks submitted via one of the {@code schedule}
834 * methods, the element will be identical to the returned
835 * {@code ScheduledFuture}. For tasks submitted using
836 * {@link #execute execute}, the element will be a
837 * zero-delay {@code ScheduledFuture}.
838 * @throws SecurityException {@inheritDoc}
839 */
840 public List<Runnable> shutdownNow() {
841 return super.shutdownNow();
842 }
843
844 /**
845 * Returns the task queue used by this executor. Access to the
846 * task queue is intended primarily for debugging and monitoring.
847 * This queue may be in active use. Retrieving the task queue
848 * does not prevent queued tasks from executing.
849 *
850 * <p>Each element of this queue is a {@link ScheduledFuture}.
851 * For tasks submitted via one of the {@code schedule} methods, the
852 * element will be identical to the returned {@code ScheduledFuture}.
853 * For tasks submitted using {@link #execute execute}, the element
854 * will be a zero-delay {@code ScheduledFuture}.
855 *
856 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
857 * tasks in the order in which they will execute.
858 *
859 * @return the task queue
860 */
861 public BlockingQueue<Runnable> getQueue() {
862 return super.getQueue();
863 }
864
865 /**
866 * Specialized delay queue. To mesh with TPE declarations, this
867 * class must be declared as a BlockingQueue<Runnable> even though
868 * it can only hold RunnableScheduledFutures.
869 */
870 static class DelayedWorkQueue extends AbstractQueue<Runnable>
871 implements BlockingQueue<Runnable> {
872
873 /*
874 * A DelayedWorkQueue is based on a heap-based data structure
875 * like those in DelayQueue and PriorityQueue, except that
876 * every ScheduledFutureTask also records its index into the
877 * heap array. This eliminates the need to find a task upon
878 * cancellation, greatly speeding up removal (down from O(n)
879 * to O(log n)), and reducing garbage retention that would
880 * otherwise occur by waiting for the element to rise to top
881 * before clearing. But because the queue may also hold
882 * RunnableScheduledFutures that are not ScheduledFutureTasks,
883 * we are not guaranteed to have such indices available, in
884 * which case we fall back to linear search. (We expect that
885 * most tasks will not be decorated, and that the faster cases
886 * will be much more common.)
887 *
888 * All heap operations must record index changes -- mainly
889 * within siftUp and siftDown. Upon removal, a task's
890 * heapIndex is set to -1. Note that ScheduledFutureTasks can
891 * appear at most once in the queue (this need not be true for
892 * other kinds of tasks or work queues), so are uniquely
893 * identified by heapIndex.
894 */
895
896 private static final int INITIAL_CAPACITY = 16;
897 private RunnableScheduledFuture<?>[] queue =
898 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
899 private final ReentrantLock lock = new ReentrantLock();
900 private int size;
901
902 /**
903 * Thread designated to wait for the task at the head of the
904 * queue. This variant of the Leader-Follower pattern
905 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
906 * minimize unnecessary timed waiting. When a thread becomes
907 * the leader, it waits only for the next delay to elapse, but
908 * other threads await indefinitely. The leader thread must
909 * signal some other thread before returning from take() or
910 * poll(...), unless some other thread becomes leader in the
911 * interim. Whenever the head of the queue is replaced with a
912 * task with an earlier expiration time, the leader field is
913 * invalidated by being reset to null, and some waiting
914 * thread, but not necessarily the current leader, is
915 * signalled. So waiting threads must be prepared to acquire
916 * and lose leadership while waiting.
917 */
918 private Thread leader;
919
920 /**
921 * Condition signalled when a newer task becomes available at the
922 * head of the queue or a new thread may need to become leader.
923 */
924 private final Condition available = lock.newCondition();
925
926 /**
927 * Sets f's heapIndex if it is a ScheduledFutureTask.
928 */
929 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
930 if (f instanceof ScheduledFutureTask)
931 ((ScheduledFutureTask)f).heapIndex = idx;
932 }
933
934 /**
935 * Sifts element added at bottom up to its heap-ordered spot.
936 * Call only when holding lock.
937 */
938 private void siftUp(int k, RunnableScheduledFuture<?> key) {
939 while (k > 0) {
940 int parent = (k - 1) >>> 1;
941 RunnableScheduledFuture<?> e = queue[parent];
942 if (key.compareTo(e) >= 0)
943 break;
944 queue[k] = e;
945 setIndex(e, k);
946 k = parent;
947 }
948 queue[k] = key;
949 setIndex(key, k);
950 }
951
952 /**
953 * Sifts element added at top down to its heap-ordered spot.
954 * Call only when holding lock.
955 */
956 private void siftDown(int k, RunnableScheduledFuture<?> key) {
957 int half = size >>> 1;
958 while (k < half) {
959 int child = (k << 1) + 1;
960 RunnableScheduledFuture<?> c = queue[child];
961 int right = child + 1;
962 if (right < size && c.compareTo(queue[right]) > 0)
963 c = queue[child = right];
964 if (key.compareTo(c) <= 0)
965 break;
966 queue[k] = c;
967 setIndex(c, k);
968 k = child;
969 }
970 queue[k] = key;
971 setIndex(key, k);
972 }
973
974 /**
975 * Resizes the heap array. Call only when holding lock.
976 */
977 private void grow() {
978 int oldCapacity = queue.length;
979 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
980 if (newCapacity < 0) // overflow
981 newCapacity = Integer.MAX_VALUE;
982 queue = Arrays.copyOf(queue, newCapacity);
983 }
984
985 /**
986 * Finds index of given object, or -1 if absent.
987 */
988 private int indexOf(Object x) {
989 if (x != null) {
990 if (x instanceof ScheduledFutureTask) {
991 int i = ((ScheduledFutureTask) x).heapIndex;
992 // Sanity check; x could conceivably be a
993 // ScheduledFutureTask from some other pool.
994 if (i >= 0 && i < size && queue[i] == x)
995 return i;
996 } else {
997 for (int i = 0; i < size; i++)
998 if (x.equals(queue[i]))
999 return i;
1000 }
1001 }
1002 return -1;
1003 }
1004
1005 public boolean contains(Object x) {
1006 final ReentrantLock lock = this.lock;
1007 lock.lock();
1008 try {
1009 return indexOf(x) != -1;
1010 } finally {
1011 lock.unlock();
1012 }
1013 }
1014
1015 public boolean remove(Object x) {
1016 final ReentrantLock lock = this.lock;
1017 lock.lock();
1018 try {
1019 int i = indexOf(x);
1020 if (i < 0)
1021 return false;
1022
1023 setIndex(queue[i], -1);
1024 int s = --size;
1025 RunnableScheduledFuture<?> replacement = queue[s];
1026 queue[s] = null;
1027 if (s != i) {
1028 siftDown(i, replacement);
1029 if (queue[i] == replacement)
1030 siftUp(i, replacement);
1031 }
1032 return true;
1033 } finally {
1034 lock.unlock();
1035 }
1036 }
1037
1038 public int size() {
1039 final ReentrantLock lock = this.lock;
1040 lock.lock();
1041 try {
1042 return size;
1043 } finally {
1044 lock.unlock();
1045 }
1046 }
1047
1048 public boolean isEmpty() {
1049 return size() == 0;
1050 }
1051
1052 public int remainingCapacity() {
1053 return Integer.MAX_VALUE;
1054 }
1055
1056 public RunnableScheduledFuture<?> peek() {
1057 final ReentrantLock lock = this.lock;
1058 lock.lock();
1059 try {
1060 return queue[0];
1061 } finally {
1062 lock.unlock();
1063 }
1064 }
1065
1066 public boolean offer(Runnable x) {
1067 if (x == null)
1068 throw new NullPointerException();
1069 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1070 final ReentrantLock lock = this.lock;
1071 lock.lock();
1072 try {
1073 int i = size;
1074 if (i >= queue.length)
1075 grow();
1076 size = i + 1;
1077 if (i == 0) {
1078 queue[0] = e;
1079 setIndex(e, 0);
1080 } else {
1081 siftUp(i, e);
1082 }
1083 if (queue[0] == e) {
1084 leader = null;
1085 available.signal();
1086 }
1087 } finally {
1088 lock.unlock();
1089 }
1090 return true;
1091 }
1092
1093 public void put(Runnable e) {
1094 offer(e);
1095 }
1096
1097 public boolean add(Runnable e) {
1098 return offer(e);
1099 }
1100
1101 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1102 return offer(e);
1103 }
1104
1105 /**
1106 * Performs common bookkeeping for poll and take: Replaces
1107 * first element with last and sifts it down. Call only when
1108 * holding lock.
1109 * @param f the task to remove and return
1110 */
1111 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1112 int s = --size;
1113 RunnableScheduledFuture<?> x = queue[s];
1114 queue[s] = null;
1115 if (s != 0)
1116 siftDown(0, x);
1117 setIndex(f, -1);
1118 return f;
1119 }
1120
1121 public RunnableScheduledFuture<?> poll() {
1122 final ReentrantLock lock = this.lock;
1123 lock.lock();
1124 try {
1125 RunnableScheduledFuture<?> first = queue[0];
1126 return (first == null || first.getDelay(NANOSECONDS) > 0)
1127 ? null
1128 : finishPoll(first);
1129 } finally {
1130 lock.unlock();
1131 }
1132 }
1133
1134 public RunnableScheduledFuture<?> take() throws InterruptedException {
1135 final ReentrantLock lock = this.lock;
1136 lock.lockInterruptibly();
1137 try {
1138 for (;;) {
1139 RunnableScheduledFuture<?> first = queue[0];
1140 if (first == null)
1141 available.await();
1142 else {
1143 long delay = first.getDelay(NANOSECONDS);
1144 if (delay <= 0L)
1145 return finishPoll(first);
1146 first = null; // don't retain ref while waiting
1147 if (leader != null)
1148 available.await();
1149 else {
1150 Thread thisThread = Thread.currentThread();
1151 leader = thisThread;
1152 try {
1153 available.awaitNanos(delay);
1154 } finally {
1155 if (leader == thisThread)
1156 leader = null;
1157 }
1158 }
1159 }
1160 }
1161 } finally {
1162 if (leader == null && queue[0] != null)
1163 available.signal();
1164 lock.unlock();
1165 }
1166 }
1167
1168 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1169 throws InterruptedException {
1170 long nanos = unit.toNanos(timeout);
1171 final ReentrantLock lock = this.lock;
1172 lock.lockInterruptibly();
1173 try {
1174 for (;;) {
1175 RunnableScheduledFuture<?> first = queue[0];
1176 if (first == null) {
1177 if (nanos <= 0L)
1178 return null;
1179 else
1180 nanos = available.awaitNanos(nanos);
1181 } else {
1182 long delay = first.getDelay(NANOSECONDS);
1183 if (delay <= 0L)
1184 return finishPoll(first);
1185 if (nanos <= 0L)
1186 return null;
1187 first = null; // don't retain ref while waiting
1188 if (nanos < delay || leader != null)
1189 nanos = available.awaitNanos(nanos);
1190 else {
1191 Thread thisThread = Thread.currentThread();
1192 leader = thisThread;
1193 try {
1194 long timeLeft = available.awaitNanos(delay);
1195 nanos -= delay - timeLeft;
1196 } finally {
1197 if (leader == thisThread)
1198 leader = null;
1199 }
1200 }
1201 }
1202 }
1203 } finally {
1204 if (leader == null && queue[0] != null)
1205 available.signal();
1206 lock.unlock();
1207 }
1208 }
1209
1210 public void clear() {
1211 final ReentrantLock lock = this.lock;
1212 lock.lock();
1213 try {
1214 for (int i = 0; i < size; i++) {
1215 RunnableScheduledFuture<?> t = queue[i];
1216 if (t != null) {
1217 queue[i] = null;
1218 setIndex(t, -1);
1219 }
1220 }
1221 size = 0;
1222 } finally {
1223 lock.unlock();
1224 }
1225 }
1226
1227 public int drainTo(Collection<? super Runnable> c) {
1228 return drainTo(c, Integer.MAX_VALUE);
1229 }
1230
1231 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1232 Objects.requireNonNull(c);
1233 if (c == this)
1234 throw new IllegalArgumentException();
1235 if (maxElements <= 0)
1236 return 0;
1237 final ReentrantLock lock = this.lock;
1238 lock.lock();
1239 try {
1240 int n = 0;
1241 for (RunnableScheduledFuture<?> first;
1242 n < maxElements
1243 && (first = queue[0]) != null
1244 && first.getDelay(NANOSECONDS) <= 0;) {
1245 c.add(first); // In this order, in case add() throws.
1246 finishPoll(first);
1247 ++n;
1248 }
1249 return n;
1250 } finally {
1251 lock.unlock();
1252 }
1253 }
1254
1255 public Object[] toArray() {
1256 final ReentrantLock lock = this.lock;
1257 lock.lock();
1258 try {
1259 return Arrays.copyOf(queue, size, Object[].class);
1260 } finally {
1261 lock.unlock();
1262 }
1263 }
1264
1265 @SuppressWarnings("unchecked")
1266 public <T> T[] toArray(T[] a) {
1267 final ReentrantLock lock = this.lock;
1268 lock.lock();
1269 try {
1270 if (a.length < size)
1271 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1272 System.arraycopy(queue, 0, a, 0, size);
1273 if (a.length > size)
1274 a[size] = null;
1275 return a;
1276 } finally {
1277 lock.unlock();
1278 }
1279 }
1280
1281 public Iterator<Runnable> iterator() {
1282 final ReentrantLock lock = this.lock;
1283 lock.lock();
1284 try {
1285 return new Itr(Arrays.copyOf(queue, size));
1286 } finally {
1287 lock.unlock();
1288 }
1289 }
1290
1291 /**
1292 * Snapshot iterator that works off copy of underlying q array.
1293 */
1294 private class Itr implements Iterator<Runnable> {
1295 final RunnableScheduledFuture<?>[] array;
1296 int cursor; // index of next element to return; initially 0
1297 int lastRet = -1; // index of last element returned; -1 if no such
1298
1299 Itr(RunnableScheduledFuture<?>[] array) {
1300 this.array = array;
1301 }
1302
1303 public boolean hasNext() {
1304 return cursor < array.length;
1305 }
1306
1307 public Runnable next() {
1308 if (cursor >= array.length)
1309 throw new NoSuchElementException();
1310 return array[lastRet = cursor++];
1311 }
1312
1313 public void remove() {
1314 if (lastRet < 0)
1315 throw new IllegalStateException();
1316 DelayedWorkQueue.this.remove(array[lastRet]);
1317 lastRet = -1;
1318 }
1319 }
1320 }
1321 }