ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.108
Committed: Tue Mar 28 23:09:10 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.107: +32 -6 lines
Log Message:
clarify behavior of periodic tasks

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Objects;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Condition;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 /**
24 * A {@link ThreadPoolExecutor} that can additionally schedule
25 * commands to run after a given delay, or to execute periodically.
26 * This class is preferable to {@link java.util.Timer} when multiple
27 * worker threads are needed, or when the additional flexibility or
28 * capabilities of {@link ThreadPoolExecutor} (which this class
29 * extends) are required.
30 *
31 * <p>Delayed tasks execute no sooner than they are enabled, but
32 * without any real-time guarantees about when, after they are
33 * enabled, they will commence. Tasks scheduled for exactly the same
34 * execution time are enabled in first-in-first-out (FIFO) order of
35 * submission.
36 *
37 * <p>When a submitted task is cancelled before it is run, execution
38 * is suppressed. By default, such a cancelled task is not
39 * automatically removed from the work queue until its delay elapses.
40 * While this enables further inspection and monitoring, it may also
41 * cause unbounded retention of cancelled tasks. To avoid this, use
42 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43 * removed from the work queue at time of cancellation.
44 *
45 * <p>Successive executions of a periodic task scheduled via
46 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48 * do not overlap. While different executions may be performed by
49 * different threads, the effects of prior executions
50 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 * those of subsequent ones.
52 *
53 * <p>Calling {@link Future#get} on a periodic task's future will
54 * never return normally. If an execution of a periodic task
55 * throws an exception, further executions are suppressed and
56 * {@code get()} will throw an {@link ExecutionException} holding the
57 * exception as its cause. Otherwise, {@code get()} will block and
58 * executions continue indefinitely until the task is cancelled, when
59 * it will throw {@link CancellationException}. Such cancellations
60 * occur when the future is cancelled, on {@link #shutdownNow}, or on
61 * {@link #shutdown} unless the {@linkplain
62 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
63 * whether to continue after shutdown} is set true.
64 *
65 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
66 * of the inherited tuning methods are not useful for it. In
67 * particular, because it acts as a fixed-sized pool using
68 * {@code corePoolSize} threads and an unbounded queue, adjustments
69 * to {@code maximumPoolSize} have no useful effect. Additionally, it
70 * is almost never a good idea to set {@code corePoolSize} to zero or
71 * use {@code allowCoreThreadTimeOut} because this may leave the pool
72 * without threads to handle tasks once they become eligible to run.
73 *
74 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
75 * this class uses {@link Executors#defaultThreadFactory} as the
76 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
77 * as the default rejected execution handler.
78 *
79 * <p><b>Extension notes:</b> This class overrides the
80 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
81 * {@link AbstractExecutorService#submit(Runnable) submit}
82 * methods to generate internal {@link ScheduledFuture} objects to
83 * control per-task delays and scheduling. To preserve
84 * functionality, any further overrides of these methods in
85 * subclasses must invoke superclass versions, which effectively
86 * disables additional task customization. However, this class
87 * provides alternative protected extension method
88 * {@code decorateTask} (one version each for {@code Runnable} and
89 * {@code Callable}) that can be used to customize the concrete task
90 * types used to execute commands entered via {@code execute},
91 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
92 * and {@code scheduleWithFixedDelay}. By default, a
93 * {@code ScheduledThreadPoolExecutor} uses a task type extending
94 * {@link FutureTask}. However, this may be modified or replaced using
95 * subclasses of the form:
96 *
97 * <pre> {@code
98 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
99 *
100 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
101 *
102 * protected <V> RunnableScheduledFuture<V> decorateTask(
103 * Runnable r, RunnableScheduledFuture<V> task) {
104 * return new CustomTask<V>(r, task);
105 * }
106 *
107 * protected <V> RunnableScheduledFuture<V> decorateTask(
108 * Callable<V> c, RunnableScheduledFuture<V> task) {
109 * return new CustomTask<V>(c, task);
110 * }
111 * // ... add constructors, etc.
112 * }}</pre>
113 *
114 * @since 1.5
115 * @author Doug Lea
116 */
117 public class ScheduledThreadPoolExecutor
118 extends ThreadPoolExecutor
119 implements ScheduledExecutorService {
120
121 /*
122 * This class specializes ThreadPoolExecutor implementation by
123 *
124 * 1. Using a custom task type ScheduledFutureTask, even for tasks
125 * that don't require scheduling because they are submitted
126 * using ExecutorService rather than ScheduledExecutorService
127 * methods, which are treated as tasks with a delay of zero.
128 *
129 * 2. Using a custom queue (DelayedWorkQueue), a variant of
130 * unbounded DelayQueue. The lack of capacity constraint and
131 * the fact that corePoolSize and maximumPoolSize are
132 * effectively identical simplifies some execution mechanics
133 * (see delayedExecute) compared to ThreadPoolExecutor.
134 *
135 * 3. Supporting optional run-after-shutdown parameters, which
136 * leads to overrides of shutdown methods to remove and cancel
137 * tasks that should NOT be run after shutdown, as well as
138 * different recheck logic when task (re)submission overlaps
139 * with a shutdown.
140 *
141 * 4. Task decoration methods to allow interception and
142 * instrumentation, which are needed because subclasses cannot
143 * otherwise override submit methods to get this effect. These
144 * don't have any impact on pool control logic though.
145 */
146
147 /**
148 * False if should cancel/suppress periodic tasks on shutdown.
149 */
150 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
151
152 /**
153 * False if should cancel non-periodic tasks on shutdown.
154 */
155 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
156
157 /**
158 * True if ScheduledFutureTask.cancel should remove from queue.
159 */
160 volatile boolean removeOnCancel;
161
162 /**
163 * Sequence number to break scheduling ties, and in turn to
164 * guarantee FIFO order among tied entries.
165 */
166 private static final AtomicLong sequencer = new AtomicLong();
167
168 private class ScheduledFutureTask<V>
169 extends FutureTask<V> implements RunnableScheduledFuture<V> {
170
171 /** Sequence number to break ties FIFO */
172 private final long sequenceNumber;
173
174 /** The nanoTime-based time when the task is enabled to execute. */
175 private volatile long time;
176
177 /**
178 * Period for repeating tasks, in nanoseconds.
179 * A positive value indicates fixed-rate execution.
180 * A negative value indicates fixed-delay execution.
181 * A value of 0 indicates a non-repeating (one-shot) task.
182 */
183 private final long period;
184
185 /** The actual task to be re-enqueued by reExecutePeriodic */
186 RunnableScheduledFuture<V> outerTask = this;
187
188 /**
189 * Index into delay queue, to support faster cancellation.
190 */
191 int heapIndex;
192
193 /**
194 * Creates a one-shot action with given nanoTime-based trigger time.
195 */
196 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197 long sequenceNumber) {
198 super(r, result);
199 this.time = triggerTime;
200 this.period = 0;
201 this.sequenceNumber = sequenceNumber;
202 }
203
204 /**
205 * Creates a periodic action with given nanoTime-based initial
206 * trigger time and period.
207 */
208 ScheduledFutureTask(Runnable r, V result, long triggerTime,
209 long period, long sequenceNumber) {
210 super(r, result);
211 this.time = triggerTime;
212 this.period = period;
213 this.sequenceNumber = sequenceNumber;
214 }
215
216 /**
217 * Creates a one-shot action with given nanoTime-based trigger time.
218 */
219 ScheduledFutureTask(Callable<V> callable, long triggerTime,
220 long sequenceNumber) {
221 super(callable);
222 this.time = triggerTime;
223 this.period = 0;
224 this.sequenceNumber = sequenceNumber;
225 }
226
227 public long getDelay(TimeUnit unit) {
228 return unit.convert(time - System.nanoTime(), NANOSECONDS);
229 }
230
231 public int compareTo(Delayed other) {
232 if (other == this) // compare zero if same object
233 return 0;
234 if (other instanceof ScheduledFutureTask) {
235 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
236 long diff = time - x.time;
237 if (diff < 0)
238 return -1;
239 else if (diff > 0)
240 return 1;
241 else if (sequenceNumber < x.sequenceNumber)
242 return -1;
243 else
244 return 1;
245 }
246 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
247 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
248 }
249
250 /**
251 * Returns {@code true} if this is a periodic (not a one-shot) action.
252 *
253 * @return {@code true} if periodic
254 */
255 public boolean isPeriodic() {
256 return period != 0;
257 }
258
259 /**
260 * Sets the next time to run for a periodic task.
261 */
262 private void setNextRunTime() {
263 long p = period;
264 if (p > 0)
265 time += p;
266 else
267 time = triggerTime(-p);
268 }
269
270 public boolean cancel(boolean mayInterruptIfRunning) {
271 // The racy read of heapIndex below is benign:
272 // if heapIndex < 0, then OOTA guarantees that we have surely
273 // been removed; else we recheck under lock in remove()
274 boolean cancelled = super.cancel(mayInterruptIfRunning);
275 if (cancelled && removeOnCancel && heapIndex >= 0)
276 remove(this);
277 return cancelled;
278 }
279
280 /**
281 * Overrides FutureTask version so as to reset/requeue if periodic.
282 */
283 public void run() {
284 if (!canRunInCurrentRunState(this))
285 cancel(false);
286 else if (!isPeriodic())
287 super.run();
288 else if (super.runAndReset()) {
289 setNextRunTime();
290 reExecutePeriodic(outerTask);
291 }
292 }
293 }
294
295 /**
296 * Returns true if can run a task given current run state and
297 * run-after-shutdown parameters.
298 */
299 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
300 if (!isShutdown())
301 return true;
302 if (isStopped())
303 return false;
304 return task.isPeriodic()
305 ? continueExistingPeriodicTasksAfterShutdown
306 : (executeExistingDelayedTasksAfterShutdown
307 || task.getDelay(NANOSECONDS) <= 0);
308 }
309
310 /**
311 * Main execution method for delayed or periodic tasks. If pool
312 * is shut down, rejects the task. Otherwise adds task to queue
313 * and starts a thread, if necessary, to run it. (We cannot
314 * prestart the thread to run the task because the task (probably)
315 * shouldn't be run yet.) If the pool is shut down while the task
316 * is being added, cancel and remove it if required by state and
317 * run-after-shutdown parameters.
318 *
319 * @param task the task
320 */
321 private void delayedExecute(RunnableScheduledFuture<?> task) {
322 if (isShutdown())
323 reject(task);
324 else {
325 super.getQueue().add(task);
326 if (!canRunInCurrentRunState(task) && remove(task))
327 task.cancel(false);
328 else
329 ensurePrestart();
330 }
331 }
332
333 /**
334 * Requeues a periodic task unless current run state precludes it.
335 * Same idea as delayedExecute except drops task rather than rejecting.
336 *
337 * @param task the task
338 */
339 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
340 if (canRunInCurrentRunState(task)) {
341 super.getQueue().add(task);
342 if (canRunInCurrentRunState(task) || !remove(task)) {
343 ensurePrestart();
344 return;
345 }
346 }
347 task.cancel(false);
348 }
349
350 /**
351 * Cancels and clears the queue of all tasks that should not be run
352 * due to shutdown policy. Invoked within super.shutdown.
353 */
354 @Override void onShutdown() {
355 BlockingQueue<Runnable> q = super.getQueue();
356 boolean keepDelayed =
357 getExecuteExistingDelayedTasksAfterShutdownPolicy();
358 boolean keepPeriodic =
359 getContinueExistingPeriodicTasksAfterShutdownPolicy();
360 // Traverse snapshot to avoid iterator exceptions
361 // TODO: implement and use efficient removeIf
362 // super.getQueue().removeIf(...);
363 for (Object e : q.toArray()) {
364 if (e instanceof RunnableScheduledFuture) {
365 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
366 if ((t.isPeriodic()
367 ? !keepPeriodic
368 : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
369 || t.isCancelled()) { // also remove if already cancelled
370 if (q.remove(t))
371 t.cancel(false);
372 }
373 }
374 }
375 tryTerminate();
376 }
377
378 /**
379 * Modifies or replaces the task used to execute a runnable.
380 * This method can be used to override the concrete
381 * class used for managing internal tasks.
382 * The default implementation simply returns the given task.
383 *
384 * @param runnable the submitted Runnable
385 * @param task the task created to execute the runnable
386 * @param <V> the type of the task's result
387 * @return a task that can execute the runnable
388 * @since 1.6
389 */
390 protected <V> RunnableScheduledFuture<V> decorateTask(
391 Runnable runnable, RunnableScheduledFuture<V> task) {
392 return task;
393 }
394
395 /**
396 * Modifies or replaces the task used to execute a callable.
397 * This method can be used to override the concrete
398 * class used for managing internal tasks.
399 * The default implementation simply returns the given task.
400 *
401 * @param callable the submitted Callable
402 * @param task the task created to execute the callable
403 * @param <V> the type of the task's result
404 * @return a task that can execute the callable
405 * @since 1.6
406 */
407 protected <V> RunnableScheduledFuture<V> decorateTask(
408 Callable<V> callable, RunnableScheduledFuture<V> task) {
409 return task;
410 }
411
412 /**
413 * The default keep-alive time for pool threads.
414 *
415 * Normally, this value is unused because all pool threads will be
416 * core threads, but if a user creates a pool with a corePoolSize
417 * of zero (against our advice), we keep a thread alive as long as
418 * there are queued tasks. If the keep alive time is zero (the
419 * historic value), we end up hot-spinning in getTask, wasting a
420 * CPU. But on the other hand, if we set the value too high, and
421 * users create a one-shot pool which they don't cleanly shutdown,
422 * the pool's non-daemon threads will prevent JVM termination. A
423 * small but non-zero value (relative to a JVM's lifetime) seems
424 * best.
425 */
426 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
427
428 /**
429 * Creates a new {@code ScheduledThreadPoolExecutor} with the
430 * given core pool size.
431 *
432 * @param corePoolSize the number of threads to keep in the pool, even
433 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
434 * @throws IllegalArgumentException if {@code corePoolSize < 0}
435 */
436 public ScheduledThreadPoolExecutor(int corePoolSize) {
437 super(corePoolSize, Integer.MAX_VALUE,
438 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
439 new DelayedWorkQueue());
440 }
441
442 /**
443 * Creates a new {@code ScheduledThreadPoolExecutor} with the
444 * given initial parameters.
445 *
446 * @param corePoolSize the number of threads to keep in the pool, even
447 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
448 * @param threadFactory the factory to use when the executor
449 * creates a new thread
450 * @throws IllegalArgumentException if {@code corePoolSize < 0}
451 * @throws NullPointerException if {@code threadFactory} is null
452 */
453 public ScheduledThreadPoolExecutor(int corePoolSize,
454 ThreadFactory threadFactory) {
455 super(corePoolSize, Integer.MAX_VALUE,
456 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
457 new DelayedWorkQueue(), threadFactory);
458 }
459
460 /**
461 * Creates a new {@code ScheduledThreadPoolExecutor} with the
462 * given initial parameters.
463 *
464 * @param corePoolSize the number of threads to keep in the pool, even
465 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
466 * @param handler the handler to use when execution is blocked
467 * because the thread bounds and queue capacities are reached
468 * @throws IllegalArgumentException if {@code corePoolSize < 0}
469 * @throws NullPointerException if {@code handler} is null
470 */
471 public ScheduledThreadPoolExecutor(int corePoolSize,
472 RejectedExecutionHandler handler) {
473 super(corePoolSize, Integer.MAX_VALUE,
474 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
475 new DelayedWorkQueue(), handler);
476 }
477
478 /**
479 * Creates a new {@code ScheduledThreadPoolExecutor} with the
480 * given initial parameters.
481 *
482 * @param corePoolSize the number of threads to keep in the pool, even
483 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
484 * @param threadFactory the factory to use when the executor
485 * creates a new thread
486 * @param handler the handler to use when execution is blocked
487 * because the thread bounds and queue capacities are reached
488 * @throws IllegalArgumentException if {@code corePoolSize < 0}
489 * @throws NullPointerException if {@code threadFactory} or
490 * {@code handler} is null
491 */
492 public ScheduledThreadPoolExecutor(int corePoolSize,
493 ThreadFactory threadFactory,
494 RejectedExecutionHandler handler) {
495 super(corePoolSize, Integer.MAX_VALUE,
496 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
497 new DelayedWorkQueue(), threadFactory, handler);
498 }
499
500 /**
501 * Returns the nanoTime-based trigger time of a delayed action.
502 */
503 private long triggerTime(long delay, TimeUnit unit) {
504 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
505 }
506
507 /**
508 * Returns the nanoTime-based trigger time of a delayed action.
509 */
510 long triggerTime(long delay) {
511 return System.nanoTime() +
512 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
513 }
514
515 /**
516 * Constrains the values of all delays in the queue to be within
517 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
518 * This may occur if a task is eligible to be dequeued, but has
519 * not yet been, while some other task is added with a delay of
520 * Long.MAX_VALUE.
521 */
522 private long overflowFree(long delay) {
523 Delayed head = (Delayed) super.getQueue().peek();
524 if (head != null) {
525 long headDelay = head.getDelay(NANOSECONDS);
526 if (headDelay < 0 && (delay - headDelay < 0))
527 delay = Long.MAX_VALUE + headDelay;
528 }
529 return delay;
530 }
531
532 /**
533 * @throws RejectedExecutionException {@inheritDoc}
534 * @throws NullPointerException {@inheritDoc}
535 */
536 public ScheduledFuture<?> schedule(Runnable command,
537 long delay,
538 TimeUnit unit) {
539 if (command == null || unit == null)
540 throw new NullPointerException();
541 RunnableScheduledFuture<Void> t = decorateTask(command,
542 new ScheduledFutureTask<Void>(command, null,
543 triggerTime(delay, unit),
544 sequencer.getAndIncrement()));
545 delayedExecute(t);
546 return t;
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 */
553 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
554 long delay,
555 TimeUnit unit) {
556 if (callable == null || unit == null)
557 throw new NullPointerException();
558 RunnableScheduledFuture<V> t = decorateTask(callable,
559 new ScheduledFutureTask<V>(callable,
560 triggerTime(delay, unit),
561 sequencer.getAndIncrement()));
562 delayedExecute(t);
563 return t;
564 }
565
566 /**
567 * @throws RejectedExecutionException {@inheritDoc}
568 * @throws NullPointerException {@inheritDoc}
569 * @throws IllegalArgumentException {@inheritDoc}
570 */
571 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
572 long initialDelay,
573 long period,
574 TimeUnit unit) {
575 if (command == null || unit == null)
576 throw new NullPointerException();
577 if (period <= 0L)
578 throw new IllegalArgumentException();
579 ScheduledFutureTask<Void> sft =
580 new ScheduledFutureTask<Void>(command,
581 null,
582 triggerTime(initialDelay, unit),
583 unit.toNanos(period),
584 sequencer.getAndIncrement());
585 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
586 sft.outerTask = t;
587 delayedExecute(t);
588 return t;
589 }
590
591 /**
592 * @throws RejectedExecutionException {@inheritDoc}
593 * @throws NullPointerException {@inheritDoc}
594 * @throws IllegalArgumentException {@inheritDoc}
595 */
596 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
597 long initialDelay,
598 long delay,
599 TimeUnit unit) {
600 if (command == null || unit == null)
601 throw new NullPointerException();
602 if (delay <= 0L)
603 throw new IllegalArgumentException();
604 ScheduledFutureTask<Void> sft =
605 new ScheduledFutureTask<Void>(command,
606 null,
607 triggerTime(initialDelay, unit),
608 -unit.toNanos(delay),
609 sequencer.getAndIncrement());
610 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
611 sft.outerTask = t;
612 delayedExecute(t);
613 return t;
614 }
615
616 /**
617 * Executes {@code command} with zero required delay.
618 * This has effect equivalent to
619 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
620 * Note that inspections of the queue and of the list returned by
621 * {@code shutdownNow} will access the zero-delayed
622 * {@link ScheduledFuture}, not the {@code command} itself.
623 *
624 * <p>A consequence of the use of {@code ScheduledFuture} objects is
625 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
626 * called with a null second {@code Throwable} argument, even if the
627 * {@code command} terminated abruptly. Instead, the {@code Throwable}
628 * thrown by such a task can be obtained via {@link Future#get}.
629 *
630 * @throws RejectedExecutionException at discretion of
631 * {@code RejectedExecutionHandler}, if the task
632 * cannot be accepted for execution because the
633 * executor has been shut down
634 * @throws NullPointerException {@inheritDoc}
635 */
636 public void execute(Runnable command) {
637 schedule(command, 0, NANOSECONDS);
638 }
639
640 // Override AbstractExecutorService methods
641
642 /**
643 * @throws RejectedExecutionException {@inheritDoc}
644 * @throws NullPointerException {@inheritDoc}
645 */
646 public Future<?> submit(Runnable task) {
647 return schedule(task, 0, NANOSECONDS);
648 }
649
650 /**
651 * @throws RejectedExecutionException {@inheritDoc}
652 * @throws NullPointerException {@inheritDoc}
653 */
654 public <T> Future<T> submit(Runnable task, T result) {
655 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
656 }
657
658 /**
659 * @throws RejectedExecutionException {@inheritDoc}
660 * @throws NullPointerException {@inheritDoc}
661 */
662 public <T> Future<T> submit(Callable<T> task) {
663 return schedule(task, 0, NANOSECONDS);
664 }
665
666 /**
667 * Sets the policy on whether to continue executing existing
668 * periodic tasks even when this executor has been {@code shutdown}.
669 * This value is by default {@code false}.
670 *
671 * <p>If the policy is set to {@code true}, periodic tasks will
672 * continue executing until one of the following happens:
673 *
674 * <ul>
675 * <li>{@link #shutdownNow} is called
676 * <li>the policy is set to {@code false} when already shutdown
677 * <li>the task is {@linkplain Future#cancel cancelled}
678 * <li>an execution of the task terminates with an exception
679 * </ul>
680 *
681 * @param value if {@code true}, continue after shutdown, else don't
682 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
683 */
684 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
685 continueExistingPeriodicTasksAfterShutdown = value;
686 if (!value && isShutdown())
687 onShutdown();
688 }
689
690 /**
691 * Gets the policy on whether to continue executing existing
692 * periodic tasks even when this executor has been {@code shutdown}.
693 * This value is by default {@code false}.
694 *
695 * <p>If the policy is set to {@code true}, periodic tasks will
696 * continue executing until one of the following happens:
697 *
698 * <ul>
699 * <li>{@link #shutdownNow} is called
700 * <li>the policy is set to {@code false} when already shutdown
701 * <li>the task is {@linkplain Future#cancel cancelled}
702 * <li>an execution of the task terminates with an exception
703 * </ul>
704 *
705 * @return {@code true} if will continue after shutdown
706 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
707 */
708 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
709 return continueExistingPeriodicTasksAfterShutdown;
710 }
711
712 /**
713 * Sets the policy on whether to execute existing delayed
714 * tasks even when this executor has been {@code shutdown}.
715 * In this case, these tasks will only terminate upon
716 * {@code shutdownNow}, or after setting the policy to
717 * {@code false} when already shutdown.
718 * This value is by default {@code true}.
719 *
720 * @param value if {@code true}, execute after shutdown, else don't
721 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
722 */
723 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
724 executeExistingDelayedTasksAfterShutdown = value;
725 if (!value && isShutdown())
726 onShutdown();
727 }
728
729 /**
730 * Gets the policy on whether to execute existing delayed
731 * tasks even when this executor has been {@code shutdown}.
732 * In this case, these tasks will only terminate upon
733 * {@code shutdownNow}, or after setting the policy to
734 * {@code false} when already shutdown.
735 * This value is by default {@code true}.
736 *
737 * @return {@code true} if will execute after shutdown
738 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
739 */
740 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
741 return executeExistingDelayedTasksAfterShutdown;
742 }
743
744 /**
745 * Sets the policy on whether cancelled tasks should be immediately
746 * removed from the work queue at time of cancellation. This value is
747 * by default {@code false}.
748 *
749 * @param value if {@code true}, remove on cancellation, else don't
750 * @see #getRemoveOnCancelPolicy
751 * @since 1.7
752 */
753 public void setRemoveOnCancelPolicy(boolean value) {
754 removeOnCancel = value;
755 }
756
757 /**
758 * Gets the policy on whether cancelled tasks should be immediately
759 * removed from the work queue at time of cancellation. This value is
760 * by default {@code false}.
761 *
762 * @return {@code true} if cancelled tasks are immediately removed
763 * from the queue
764 * @see #setRemoveOnCancelPolicy
765 * @since 1.7
766 */
767 public boolean getRemoveOnCancelPolicy() {
768 return removeOnCancel;
769 }
770
771 /**
772 * Initiates an orderly shutdown in which previously submitted
773 * tasks are executed, but no new tasks will be accepted.
774 * Invocation has no additional effect if already shut down.
775 *
776 * <p>This method does not wait for previously submitted tasks to
777 * complete execution. Use {@link #awaitTermination awaitTermination}
778 * to do that.
779 *
780 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
781 * has been set {@code false}, existing delayed tasks whose delays
782 * have not yet elapsed are cancelled. And unless the {@code
783 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
784 * {@code true}, future executions of existing periodic tasks will
785 * be cancelled.
786 *
787 * @throws SecurityException {@inheritDoc}
788 */
789 public void shutdown() {
790 super.shutdown();
791 }
792
793 /**
794 * Attempts to stop all actively executing tasks, halts the
795 * processing of waiting tasks, and returns a list of the tasks
796 * that were awaiting execution. These tasks are drained (removed)
797 * from the task queue upon return from this method.
798 *
799 * <p>This method does not wait for actively executing tasks to
800 * terminate. Use {@link #awaitTermination awaitTermination} to
801 * do that.
802 *
803 * <p>There are no guarantees beyond best-effort attempts to stop
804 * processing actively executing tasks. This implementation
805 * interrupts tasks via {@link Thread#interrupt}; any task that
806 * fails to respond to interrupts may never terminate.
807 *
808 * @return list of tasks that never commenced execution.
809 * Each element of this list is a {@link ScheduledFuture}.
810 * For tasks submitted via one of the {@code schedule}
811 * methods, the element will be identical to the returned
812 * {@code ScheduledFuture}. For tasks submitted using
813 * {@link #execute execute}, the element will be a
814 * zero-delay {@code ScheduledFuture}.
815 * @throws SecurityException {@inheritDoc}
816 */
817 public List<Runnable> shutdownNow() {
818 return super.shutdownNow();
819 }
820
821 /**
822 * Returns the task queue used by this executor. Access to the
823 * task queue is intended primarily for debugging and monitoring.
824 * This queue may be in active use. Retrieving the task queue
825 * does not prevent queued tasks from executing.
826 *
827 * <p>Each element of this queue is a {@link ScheduledFuture}.
828 * For tasks submitted via one of the {@code schedule} methods, the
829 * element will be identical to the returned {@code ScheduledFuture}.
830 * For tasks submitted using {@link #execute execute}, the element
831 * will be a zero-delay {@code ScheduledFuture}.
832 *
833 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
834 * tasks in the order in which they will execute.
835 *
836 * @return the task queue
837 */
838 public BlockingQueue<Runnable> getQueue() {
839 return super.getQueue();
840 }
841
842 /**
843 * Specialized delay queue. To mesh with TPE declarations, this
844 * class must be declared as a BlockingQueue<Runnable> even though
845 * it can only hold RunnableScheduledFutures.
846 */
847 static class DelayedWorkQueue extends AbstractQueue<Runnable>
848 implements BlockingQueue<Runnable> {
849
850 /*
851 * A DelayedWorkQueue is based on a heap-based data structure
852 * like those in DelayQueue and PriorityQueue, except that
853 * every ScheduledFutureTask also records its index into the
854 * heap array. This eliminates the need to find a task upon
855 * cancellation, greatly speeding up removal (down from O(n)
856 * to O(log n)), and reducing garbage retention that would
857 * otherwise occur by waiting for the element to rise to top
858 * before clearing. But because the queue may also hold
859 * RunnableScheduledFutures that are not ScheduledFutureTasks,
860 * we are not guaranteed to have such indices available, in
861 * which case we fall back to linear search. (We expect that
862 * most tasks will not be decorated, and that the faster cases
863 * will be much more common.)
864 *
865 * All heap operations must record index changes -- mainly
866 * within siftUp and siftDown. Upon removal, a task's
867 * heapIndex is set to -1. Note that ScheduledFutureTasks can
868 * appear at most once in the queue (this need not be true for
869 * other kinds of tasks or work queues), so are uniquely
870 * identified by heapIndex.
871 */
872
873 private static final int INITIAL_CAPACITY = 16;
874 private RunnableScheduledFuture<?>[] queue =
875 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
876 private final ReentrantLock lock = new ReentrantLock();
877 private int size;
878
879 /**
880 * Thread designated to wait for the task at the head of the
881 * queue. This variant of the Leader-Follower pattern
882 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
883 * minimize unnecessary timed waiting. When a thread becomes
884 * the leader, it waits only for the next delay to elapse, but
885 * other threads await indefinitely. The leader thread must
886 * signal some other thread before returning from take() or
887 * poll(...), unless some other thread becomes leader in the
888 * interim. Whenever the head of the queue is replaced with a
889 * task with an earlier expiration time, the leader field is
890 * invalidated by being reset to null, and some waiting
891 * thread, but not necessarily the current leader, is
892 * signalled. So waiting threads must be prepared to acquire
893 * and lose leadership while waiting.
894 */
895 private Thread leader;
896
897 /**
898 * Condition signalled when a newer task becomes available at the
899 * head of the queue or a new thread may need to become leader.
900 */
901 private final Condition available = lock.newCondition();
902
903 /**
904 * Sets f's heapIndex if it is a ScheduledFutureTask.
905 */
906 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
907 if (f instanceof ScheduledFutureTask)
908 ((ScheduledFutureTask)f).heapIndex = idx;
909 }
910
911 /**
912 * Sifts element added at bottom up to its heap-ordered spot.
913 * Call only when holding lock.
914 */
915 private void siftUp(int k, RunnableScheduledFuture<?> key) {
916 while (k > 0) {
917 int parent = (k - 1) >>> 1;
918 RunnableScheduledFuture<?> e = queue[parent];
919 if (key.compareTo(e) >= 0)
920 break;
921 queue[k] = e;
922 setIndex(e, k);
923 k = parent;
924 }
925 queue[k] = key;
926 setIndex(key, k);
927 }
928
929 /**
930 * Sifts element added at top down to its heap-ordered spot.
931 * Call only when holding lock.
932 */
933 private void siftDown(int k, RunnableScheduledFuture<?> key) {
934 int half = size >>> 1;
935 while (k < half) {
936 int child = (k << 1) + 1;
937 RunnableScheduledFuture<?> c = queue[child];
938 int right = child + 1;
939 if (right < size && c.compareTo(queue[right]) > 0)
940 c = queue[child = right];
941 if (key.compareTo(c) <= 0)
942 break;
943 queue[k] = c;
944 setIndex(c, k);
945 k = child;
946 }
947 queue[k] = key;
948 setIndex(key, k);
949 }
950
951 /**
952 * Resizes the heap array. Call only when holding lock.
953 */
954 private void grow() {
955 int oldCapacity = queue.length;
956 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
957 if (newCapacity < 0) // overflow
958 newCapacity = Integer.MAX_VALUE;
959 queue = Arrays.copyOf(queue, newCapacity);
960 }
961
962 /**
963 * Finds index of given object, or -1 if absent.
964 */
965 private int indexOf(Object x) {
966 if (x != null) {
967 if (x instanceof ScheduledFutureTask) {
968 int i = ((ScheduledFutureTask) x).heapIndex;
969 // Sanity check; x could conceivably be a
970 // ScheduledFutureTask from some other pool.
971 if (i >= 0 && i < size && queue[i] == x)
972 return i;
973 } else {
974 for (int i = 0; i < size; i++)
975 if (x.equals(queue[i]))
976 return i;
977 }
978 }
979 return -1;
980 }
981
982 public boolean contains(Object x) {
983 final ReentrantLock lock = this.lock;
984 lock.lock();
985 try {
986 return indexOf(x) != -1;
987 } finally {
988 lock.unlock();
989 }
990 }
991
992 public boolean remove(Object x) {
993 final ReentrantLock lock = this.lock;
994 lock.lock();
995 try {
996 int i = indexOf(x);
997 if (i < 0)
998 return false;
999
1000 setIndex(queue[i], -1);
1001 int s = --size;
1002 RunnableScheduledFuture<?> replacement = queue[s];
1003 queue[s] = null;
1004 if (s != i) {
1005 siftDown(i, replacement);
1006 if (queue[i] == replacement)
1007 siftUp(i, replacement);
1008 }
1009 return true;
1010 } finally {
1011 lock.unlock();
1012 }
1013 }
1014
1015 public int size() {
1016 final ReentrantLock lock = this.lock;
1017 lock.lock();
1018 try {
1019 return size;
1020 } finally {
1021 lock.unlock();
1022 }
1023 }
1024
1025 public boolean isEmpty() {
1026 return size() == 0;
1027 }
1028
1029 public int remainingCapacity() {
1030 return Integer.MAX_VALUE;
1031 }
1032
1033 public RunnableScheduledFuture<?> peek() {
1034 final ReentrantLock lock = this.lock;
1035 lock.lock();
1036 try {
1037 return queue[0];
1038 } finally {
1039 lock.unlock();
1040 }
1041 }
1042
1043 public boolean offer(Runnable x) {
1044 if (x == null)
1045 throw new NullPointerException();
1046 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1047 final ReentrantLock lock = this.lock;
1048 lock.lock();
1049 try {
1050 int i = size;
1051 if (i >= queue.length)
1052 grow();
1053 size = i + 1;
1054 if (i == 0) {
1055 queue[0] = e;
1056 setIndex(e, 0);
1057 } else {
1058 siftUp(i, e);
1059 }
1060 if (queue[0] == e) {
1061 leader = null;
1062 available.signal();
1063 }
1064 } finally {
1065 lock.unlock();
1066 }
1067 return true;
1068 }
1069
1070 public void put(Runnable e) {
1071 offer(e);
1072 }
1073
1074 public boolean add(Runnable e) {
1075 return offer(e);
1076 }
1077
1078 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1079 return offer(e);
1080 }
1081
1082 /**
1083 * Performs common bookkeeping for poll and take: Replaces
1084 * first element with last and sifts it down. Call only when
1085 * holding lock.
1086 * @param f the task to remove and return
1087 */
1088 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1089 int s = --size;
1090 RunnableScheduledFuture<?> x = queue[s];
1091 queue[s] = null;
1092 if (s != 0)
1093 siftDown(0, x);
1094 setIndex(f, -1);
1095 return f;
1096 }
1097
1098 public RunnableScheduledFuture<?> poll() {
1099 final ReentrantLock lock = this.lock;
1100 lock.lock();
1101 try {
1102 RunnableScheduledFuture<?> first = queue[0];
1103 return (first == null || first.getDelay(NANOSECONDS) > 0)
1104 ? null
1105 : finishPoll(first);
1106 } finally {
1107 lock.unlock();
1108 }
1109 }
1110
1111 public RunnableScheduledFuture<?> take() throws InterruptedException {
1112 final ReentrantLock lock = this.lock;
1113 lock.lockInterruptibly();
1114 try {
1115 for (;;) {
1116 RunnableScheduledFuture<?> first = queue[0];
1117 if (first == null)
1118 available.await();
1119 else {
1120 long delay = first.getDelay(NANOSECONDS);
1121 if (delay <= 0L)
1122 return finishPoll(first);
1123 first = null; // don't retain ref while waiting
1124 if (leader != null)
1125 available.await();
1126 else {
1127 Thread thisThread = Thread.currentThread();
1128 leader = thisThread;
1129 try {
1130 available.awaitNanos(delay);
1131 } finally {
1132 if (leader == thisThread)
1133 leader = null;
1134 }
1135 }
1136 }
1137 }
1138 } finally {
1139 if (leader == null && queue[0] != null)
1140 available.signal();
1141 lock.unlock();
1142 }
1143 }
1144
1145 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1146 throws InterruptedException {
1147 long nanos = unit.toNanos(timeout);
1148 final ReentrantLock lock = this.lock;
1149 lock.lockInterruptibly();
1150 try {
1151 for (;;) {
1152 RunnableScheduledFuture<?> first = queue[0];
1153 if (first == null) {
1154 if (nanos <= 0L)
1155 return null;
1156 else
1157 nanos = available.awaitNanos(nanos);
1158 } else {
1159 long delay = first.getDelay(NANOSECONDS);
1160 if (delay <= 0L)
1161 return finishPoll(first);
1162 if (nanos <= 0L)
1163 return null;
1164 first = null; // don't retain ref while waiting
1165 if (nanos < delay || leader != null)
1166 nanos = available.awaitNanos(nanos);
1167 else {
1168 Thread thisThread = Thread.currentThread();
1169 leader = thisThread;
1170 try {
1171 long timeLeft = available.awaitNanos(delay);
1172 nanos -= delay - timeLeft;
1173 } finally {
1174 if (leader == thisThread)
1175 leader = null;
1176 }
1177 }
1178 }
1179 }
1180 } finally {
1181 if (leader == null && queue[0] != null)
1182 available.signal();
1183 lock.unlock();
1184 }
1185 }
1186
1187 public void clear() {
1188 final ReentrantLock lock = this.lock;
1189 lock.lock();
1190 try {
1191 for (int i = 0; i < size; i++) {
1192 RunnableScheduledFuture<?> t = queue[i];
1193 if (t != null) {
1194 queue[i] = null;
1195 setIndex(t, -1);
1196 }
1197 }
1198 size = 0;
1199 } finally {
1200 lock.unlock();
1201 }
1202 }
1203
1204 public int drainTo(Collection<? super Runnable> c) {
1205 return drainTo(c, Integer.MAX_VALUE);
1206 }
1207
1208 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1209 Objects.requireNonNull(c);
1210 if (c == this)
1211 throw new IllegalArgumentException();
1212 if (maxElements <= 0)
1213 return 0;
1214 final ReentrantLock lock = this.lock;
1215 lock.lock();
1216 try {
1217 int n = 0;
1218 for (RunnableScheduledFuture<?> first;
1219 n < maxElements
1220 && (first = queue[0]) != null
1221 && first.getDelay(NANOSECONDS) <= 0;) {
1222 c.add(first); // In this order, in case add() throws.
1223 finishPoll(first);
1224 ++n;
1225 }
1226 return n;
1227 } finally {
1228 lock.unlock();
1229 }
1230 }
1231
1232 public Object[] toArray() {
1233 final ReentrantLock lock = this.lock;
1234 lock.lock();
1235 try {
1236 return Arrays.copyOf(queue, size, Object[].class);
1237 } finally {
1238 lock.unlock();
1239 }
1240 }
1241
1242 @SuppressWarnings("unchecked")
1243 public <T> T[] toArray(T[] a) {
1244 final ReentrantLock lock = this.lock;
1245 lock.lock();
1246 try {
1247 if (a.length < size)
1248 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1249 System.arraycopy(queue, 0, a, 0, size);
1250 if (a.length > size)
1251 a[size] = null;
1252 return a;
1253 } finally {
1254 lock.unlock();
1255 }
1256 }
1257
1258 public Iterator<Runnable> iterator() {
1259 final ReentrantLock lock = this.lock;
1260 lock.lock();
1261 try {
1262 return new Itr(Arrays.copyOf(queue, size));
1263 } finally {
1264 lock.unlock();
1265 }
1266 }
1267
1268 /**
1269 * Snapshot iterator that works off copy of underlying q array.
1270 */
1271 private class Itr implements Iterator<Runnable> {
1272 final RunnableScheduledFuture<?>[] array;
1273 int cursor; // index of next element to return; initially 0
1274 int lastRet = -1; // index of last element returned; -1 if no such
1275
1276 Itr(RunnableScheduledFuture<?>[] array) {
1277 this.array = array;
1278 }
1279
1280 public boolean hasNext() {
1281 return cursor < array.length;
1282 }
1283
1284 public Runnable next() {
1285 if (cursor >= array.length)
1286 throw new NoSuchElementException();
1287 return array[lastRet = cursor++];
1288 }
1289
1290 public void remove() {
1291 if (lastRet < 0)
1292 throw new IllegalStateException();
1293 DelayedWorkQueue.this.remove(array[lastRet]);
1294 lastRet = -1;
1295 }
1296 }
1297 }
1298 }