ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.110
Committed: Thu Mar 30 20:11:50 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.109: +1 -1 lines
Log Message:
add "not-yet-expired" to internal spec for clarity

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Objects;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Condition;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 /**
24 * A {@link ThreadPoolExecutor} that can additionally schedule
25 * commands to run after a given delay, or to execute periodically.
26 * This class is preferable to {@link java.util.Timer} when multiple
27 * worker threads are needed, or when the additional flexibility or
28 * capabilities of {@link ThreadPoolExecutor} (which this class
29 * extends) are required.
30 *
31 * <p>Delayed tasks execute no sooner than they are enabled, but
32 * without any real-time guarantees about when, after they are
33 * enabled, they will commence. Tasks scheduled for exactly the same
34 * execution time are enabled in first-in-first-out (FIFO) order of
35 * submission.
36 *
37 * <p>When a submitted task is cancelled before it is run, execution
38 * is suppressed. By default, such a cancelled task is not
39 * automatically removed from the work queue until its delay elapses.
40 * While this enables further inspection and monitoring, it may also
41 * cause unbounded retention of cancelled tasks. To avoid this, use
42 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43 * removed from the work queue at time of cancellation.
44 *
45 * <p>Successive executions of a periodic task scheduled via
46 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48 * do not overlap. While different executions may be performed by
49 * different threads, the effects of prior executions
50 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 * those of subsequent ones.
52 *
53 * <p>Executions of a periodic task continue until one of the
54 * following happens, when further executions are suppressed and the
55 * task's future is completed:
56 * <ul>
57 * <li>{@link #shutdown} is called and the {@linkplain
58 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
59 * whether to continue after shutdown} is not set true)
60 * <li>{@link #shutdownNow} is called
61 * <li>the task's future is {@linkplain Future#cancel cancelled}
62 * <li>an execution of the task terminates abruptly with an exception;
63 * in this case calling {@link Future#get()} will throw an {@link
64 * ExecutionException} holding the exception as its cause. In all
65 * other cases {@code get()} will throw {@link CancellationException};
66 * a periodic task's future never completes normally.
67 * </ul>
68 *
69 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
70 * of the inherited tuning methods are not useful for it. In
71 * particular, because it acts as a fixed-sized pool using
72 * {@code corePoolSize} threads and an unbounded queue, adjustments
73 * to {@code maximumPoolSize} have no useful effect. Additionally, it
74 * is almost never a good idea to set {@code corePoolSize} to zero or
75 * use {@code allowCoreThreadTimeOut} because this may leave the pool
76 * without threads to handle tasks once they become eligible to run.
77 *
78 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
79 * this class uses {@link Executors#defaultThreadFactory} as the
80 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
81 * as the default rejected execution handler.
82 *
83 * <p><b>Extension notes:</b> This class overrides the
84 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
85 * {@link AbstractExecutorService#submit(Runnable) submit}
86 * methods to generate internal {@link ScheduledFuture} objects to
87 * control per-task delays and scheduling. To preserve
88 * functionality, any further overrides of these methods in
89 * subclasses must invoke superclass versions, which effectively
90 * disables additional task customization. However, this class
91 * provides alternative protected extension method
92 * {@code decorateTask} (one version each for {@code Runnable} and
93 * {@code Callable}) that can be used to customize the concrete task
94 * types used to execute commands entered via {@code execute},
95 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
96 * and {@code scheduleWithFixedDelay}. By default, a
97 * {@code ScheduledThreadPoolExecutor} uses a task type extending
98 * {@link FutureTask}. However, this may be modified or replaced using
99 * subclasses of the form:
100 *
101 * <pre> {@code
102 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
103 *
104 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
105 *
106 * protected <V> RunnableScheduledFuture<V> decorateTask(
107 * Runnable r, RunnableScheduledFuture<V> task) {
108 * return new CustomTask<V>(r, task);
109 * }
110 *
111 * protected <V> RunnableScheduledFuture<V> decorateTask(
112 * Callable<V> c, RunnableScheduledFuture<V> task) {
113 * return new CustomTask<V>(c, task);
114 * }
115 * // ... add constructors, etc.
116 * }}</pre>
117 *
118 * @since 1.5
119 * @author Doug Lea
120 */
121 public class ScheduledThreadPoolExecutor
122 extends ThreadPoolExecutor
123 implements ScheduledExecutorService {
124
125 /*
126 * This class specializes ThreadPoolExecutor implementation by
127 *
128 * 1. Using a custom task type ScheduledFutureTask, even for tasks
129 * that don't require scheduling because they are submitted
130 * using ExecutorService rather than ScheduledExecutorService
131 * methods, which are treated as tasks with a delay of zero.
132 *
133 * 2. Using a custom queue (DelayedWorkQueue), a variant of
134 * unbounded DelayQueue. The lack of capacity constraint and
135 * the fact that corePoolSize and maximumPoolSize are
136 * effectively identical simplifies some execution mechanics
137 * (see delayedExecute) compared to ThreadPoolExecutor.
138 *
139 * 3. Supporting optional run-after-shutdown parameters, which
140 * leads to overrides of shutdown methods to remove and cancel
141 * tasks that should NOT be run after shutdown, as well as
142 * different recheck logic when task (re)submission overlaps
143 * with a shutdown.
144 *
145 * 4. Task decoration methods to allow interception and
146 * instrumentation, which are needed because subclasses cannot
147 * otherwise override submit methods to get this effect. These
148 * don't have any impact on pool control logic though.
149 */
150
151 /**
152 * False if should cancel/suppress periodic tasks on shutdown.
153 */
154 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
155
156 /**
157 * False if should cancel non-periodic not-yet-expired tasks on shutdown.
158 */
159 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
160
161 /**
162 * True if ScheduledFutureTask.cancel should remove from queue.
163 */
164 volatile boolean removeOnCancel;
165
166 /**
167 * Sequence number to break scheduling ties, and in turn to
168 * guarantee FIFO order among tied entries.
169 */
170 private static final AtomicLong sequencer = new AtomicLong();
171
172 private class ScheduledFutureTask<V>
173 extends FutureTask<V> implements RunnableScheduledFuture<V> {
174
175 /** Sequence number to break ties FIFO */
176 private final long sequenceNumber;
177
178 /** The nanoTime-based time when the task is enabled to execute. */
179 private volatile long time;
180
181 /**
182 * Period for repeating tasks, in nanoseconds.
183 * A positive value indicates fixed-rate execution.
184 * A negative value indicates fixed-delay execution.
185 * A value of 0 indicates a non-repeating (one-shot) task.
186 */
187 private final long period;
188
189 /** The actual task to be re-enqueued by reExecutePeriodic */
190 RunnableScheduledFuture<V> outerTask = this;
191
192 /**
193 * Index into delay queue, to support faster cancellation.
194 */
195 int heapIndex;
196
197 /**
198 * Creates a one-shot action with given nanoTime-based trigger time.
199 */
200 ScheduledFutureTask(Runnable r, V result, long triggerTime,
201 long sequenceNumber) {
202 super(r, result);
203 this.time = triggerTime;
204 this.period = 0;
205 this.sequenceNumber = sequenceNumber;
206 }
207
208 /**
209 * Creates a periodic action with given nanoTime-based initial
210 * trigger time and period.
211 */
212 ScheduledFutureTask(Runnable r, V result, long triggerTime,
213 long period, long sequenceNumber) {
214 super(r, result);
215 this.time = triggerTime;
216 this.period = period;
217 this.sequenceNumber = sequenceNumber;
218 }
219
220 /**
221 * Creates a one-shot action with given nanoTime-based trigger time.
222 */
223 ScheduledFutureTask(Callable<V> callable, long triggerTime,
224 long sequenceNumber) {
225 super(callable);
226 this.time = triggerTime;
227 this.period = 0;
228 this.sequenceNumber = sequenceNumber;
229 }
230
231 public long getDelay(TimeUnit unit) {
232 return unit.convert(time - System.nanoTime(), NANOSECONDS);
233 }
234
235 public int compareTo(Delayed other) {
236 if (other == this) // compare zero if same object
237 return 0;
238 if (other instanceof ScheduledFutureTask) {
239 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
240 long diff = time - x.time;
241 if (diff < 0)
242 return -1;
243 else if (diff > 0)
244 return 1;
245 else if (sequenceNumber < x.sequenceNumber)
246 return -1;
247 else
248 return 1;
249 }
250 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
251 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
252 }
253
254 /**
255 * Returns {@code true} if this is a periodic (not a one-shot) action.
256 *
257 * @return {@code true} if periodic
258 */
259 public boolean isPeriodic() {
260 return period != 0;
261 }
262
263 /**
264 * Sets the next time to run for a periodic task.
265 */
266 private void setNextRunTime() {
267 long p = period;
268 if (p > 0)
269 time += p;
270 else
271 time = triggerTime(-p);
272 }
273
274 public boolean cancel(boolean mayInterruptIfRunning) {
275 // The racy read of heapIndex below is benign:
276 // if heapIndex < 0, then OOTA guarantees that we have surely
277 // been removed; else we recheck under lock in remove()
278 boolean cancelled = super.cancel(mayInterruptIfRunning);
279 if (cancelled && removeOnCancel && heapIndex >= 0)
280 remove(this);
281 return cancelled;
282 }
283
284 /**
285 * Overrides FutureTask version so as to reset/requeue if periodic.
286 */
287 public void run() {
288 if (!canRunInCurrentRunState(this))
289 cancel(false);
290 else if (!isPeriodic())
291 super.run();
292 else if (super.runAndReset()) {
293 setNextRunTime();
294 reExecutePeriodic(outerTask);
295 }
296 }
297 }
298
299 /**
300 * Returns true if can run a task given current run state and
301 * run-after-shutdown parameters.
302 */
303 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
304 if (!isShutdown())
305 return true;
306 if (isStopped())
307 return false;
308 return task.isPeriodic()
309 ? continueExistingPeriodicTasksAfterShutdown
310 : (executeExistingDelayedTasksAfterShutdown
311 || task.getDelay(NANOSECONDS) <= 0);
312 }
313
314 /**
315 * Main execution method for delayed or periodic tasks. If pool
316 * is shut down, rejects the task. Otherwise adds task to queue
317 * and starts a thread, if necessary, to run it. (We cannot
318 * prestart the thread to run the task because the task (probably)
319 * shouldn't be run yet.) If the pool is shut down while the task
320 * is being added, cancel and remove it if required by state and
321 * run-after-shutdown parameters.
322 *
323 * @param task the task
324 */
325 private void delayedExecute(RunnableScheduledFuture<?> task) {
326 if (isShutdown())
327 reject(task);
328 else {
329 super.getQueue().add(task);
330 if (!canRunInCurrentRunState(task) && remove(task))
331 task.cancel(false);
332 else
333 ensurePrestart();
334 }
335 }
336
337 /**
338 * Requeues a periodic task unless current run state precludes it.
339 * Same idea as delayedExecute except drops task rather than rejecting.
340 *
341 * @param task the task
342 */
343 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
344 if (canRunInCurrentRunState(task)) {
345 super.getQueue().add(task);
346 if (canRunInCurrentRunState(task) || !remove(task)) {
347 ensurePrestart();
348 return;
349 }
350 }
351 task.cancel(false);
352 }
353
354 /**
355 * Cancels and clears the queue of all tasks that should not be run
356 * due to shutdown policy. Invoked within super.shutdown.
357 */
358 @Override void onShutdown() {
359 BlockingQueue<Runnable> q = super.getQueue();
360 boolean keepDelayed =
361 getExecuteExistingDelayedTasksAfterShutdownPolicy();
362 boolean keepPeriodic =
363 getContinueExistingPeriodicTasksAfterShutdownPolicy();
364 // Traverse snapshot to avoid iterator exceptions
365 // TODO: implement and use efficient removeIf
366 // super.getQueue().removeIf(...);
367 for (Object e : q.toArray()) {
368 if (e instanceof RunnableScheduledFuture) {
369 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
370 if ((t.isPeriodic()
371 ? !keepPeriodic
372 : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
373 || t.isCancelled()) { // also remove if already cancelled
374 if (q.remove(t))
375 t.cancel(false);
376 }
377 }
378 }
379 tryTerminate();
380 }
381
382 /**
383 * Modifies or replaces the task used to execute a runnable.
384 * This method can be used to override the concrete
385 * class used for managing internal tasks.
386 * The default implementation simply returns the given task.
387 *
388 * @param runnable the submitted Runnable
389 * @param task the task created to execute the runnable
390 * @param <V> the type of the task's result
391 * @return a task that can execute the runnable
392 * @since 1.6
393 */
394 protected <V> RunnableScheduledFuture<V> decorateTask(
395 Runnable runnable, RunnableScheduledFuture<V> task) {
396 return task;
397 }
398
399 /**
400 * Modifies or replaces the task used to execute a callable.
401 * This method can be used to override the concrete
402 * class used for managing internal tasks.
403 * The default implementation simply returns the given task.
404 *
405 * @param callable the submitted Callable
406 * @param task the task created to execute the callable
407 * @param <V> the type of the task's result
408 * @return a task that can execute the callable
409 * @since 1.6
410 */
411 protected <V> RunnableScheduledFuture<V> decorateTask(
412 Callable<V> callable, RunnableScheduledFuture<V> task) {
413 return task;
414 }
415
416 /**
417 * The default keep-alive time for pool threads.
418 *
419 * Normally, this value is unused because all pool threads will be
420 * core threads, but if a user creates a pool with a corePoolSize
421 * of zero (against our advice), we keep a thread alive as long as
422 * there are queued tasks. If the keep alive time is zero (the
423 * historic value), we end up hot-spinning in getTask, wasting a
424 * CPU. But on the other hand, if we set the value too high, and
425 * users create a one-shot pool which they don't cleanly shutdown,
426 * the pool's non-daemon threads will prevent JVM termination. A
427 * small but non-zero value (relative to a JVM's lifetime) seems
428 * best.
429 */
430 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
431
432 /**
433 * Creates a new {@code ScheduledThreadPoolExecutor} with the
434 * given core pool size.
435 *
436 * @param corePoolSize the number of threads to keep in the pool, even
437 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
438 * @throws IllegalArgumentException if {@code corePoolSize < 0}
439 */
440 public ScheduledThreadPoolExecutor(int corePoolSize) {
441 super(corePoolSize, Integer.MAX_VALUE,
442 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
443 new DelayedWorkQueue());
444 }
445
446 /**
447 * Creates a new {@code ScheduledThreadPoolExecutor} with the
448 * given initial parameters.
449 *
450 * @param corePoolSize the number of threads to keep in the pool, even
451 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
452 * @param threadFactory the factory to use when the executor
453 * creates a new thread
454 * @throws IllegalArgumentException if {@code corePoolSize < 0}
455 * @throws NullPointerException if {@code threadFactory} is null
456 */
457 public ScheduledThreadPoolExecutor(int corePoolSize,
458 ThreadFactory threadFactory) {
459 super(corePoolSize, Integer.MAX_VALUE,
460 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
461 new DelayedWorkQueue(), threadFactory);
462 }
463
464 /**
465 * Creates a new {@code ScheduledThreadPoolExecutor} with the
466 * given initial parameters.
467 *
468 * @param corePoolSize the number of threads to keep in the pool, even
469 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
470 * @param handler the handler to use when execution is blocked
471 * because the thread bounds and queue capacities are reached
472 * @throws IllegalArgumentException if {@code corePoolSize < 0}
473 * @throws NullPointerException if {@code handler} is null
474 */
475 public ScheduledThreadPoolExecutor(int corePoolSize,
476 RejectedExecutionHandler handler) {
477 super(corePoolSize, Integer.MAX_VALUE,
478 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
479 new DelayedWorkQueue(), handler);
480 }
481
482 /**
483 * Creates a new {@code ScheduledThreadPoolExecutor} with the
484 * given initial parameters.
485 *
486 * @param corePoolSize the number of threads to keep in the pool, even
487 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
488 * @param threadFactory the factory to use when the executor
489 * creates a new thread
490 * @param handler the handler to use when execution is blocked
491 * because the thread bounds and queue capacities are reached
492 * @throws IllegalArgumentException if {@code corePoolSize < 0}
493 * @throws NullPointerException if {@code threadFactory} or
494 * {@code handler} is null
495 */
496 public ScheduledThreadPoolExecutor(int corePoolSize,
497 ThreadFactory threadFactory,
498 RejectedExecutionHandler handler) {
499 super(corePoolSize, Integer.MAX_VALUE,
500 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
501 new DelayedWorkQueue(), threadFactory, handler);
502 }
503
504 /**
505 * Returns the nanoTime-based trigger time of a delayed action.
506 */
507 private long triggerTime(long delay, TimeUnit unit) {
508 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
509 }
510
511 /**
512 * Returns the nanoTime-based trigger time of a delayed action.
513 */
514 long triggerTime(long delay) {
515 return System.nanoTime() +
516 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
517 }
518
519 /**
520 * Constrains the values of all delays in the queue to be within
521 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
522 * This may occur if a task is eligible to be dequeued, but has
523 * not yet been, while some other task is added with a delay of
524 * Long.MAX_VALUE.
525 */
526 private long overflowFree(long delay) {
527 Delayed head = (Delayed) super.getQueue().peek();
528 if (head != null) {
529 long headDelay = head.getDelay(NANOSECONDS);
530 if (headDelay < 0 && (delay - headDelay < 0))
531 delay = Long.MAX_VALUE + headDelay;
532 }
533 return delay;
534 }
535
536 /**
537 * @throws RejectedExecutionException {@inheritDoc}
538 * @throws NullPointerException {@inheritDoc}
539 */
540 public ScheduledFuture<?> schedule(Runnable command,
541 long delay,
542 TimeUnit unit) {
543 if (command == null || unit == null)
544 throw new NullPointerException();
545 RunnableScheduledFuture<Void> t = decorateTask(command,
546 new ScheduledFutureTask<Void>(command, null,
547 triggerTime(delay, unit),
548 sequencer.getAndIncrement()));
549 delayedExecute(t);
550 return t;
551 }
552
553 /**
554 * @throws RejectedExecutionException {@inheritDoc}
555 * @throws NullPointerException {@inheritDoc}
556 */
557 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
558 long delay,
559 TimeUnit unit) {
560 if (callable == null || unit == null)
561 throw new NullPointerException();
562 RunnableScheduledFuture<V> t = decorateTask(callable,
563 new ScheduledFutureTask<V>(callable,
564 triggerTime(delay, unit),
565 sequencer.getAndIncrement()));
566 delayedExecute(t);
567 return t;
568 }
569
570 /**
571 * @throws RejectedExecutionException {@inheritDoc}
572 * @throws NullPointerException {@inheritDoc}
573 * @throws IllegalArgumentException {@inheritDoc}
574 */
575 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
576 long initialDelay,
577 long period,
578 TimeUnit unit) {
579 if (command == null || unit == null)
580 throw new NullPointerException();
581 if (period <= 0L)
582 throw new IllegalArgumentException();
583 ScheduledFutureTask<Void> sft =
584 new ScheduledFutureTask<Void>(command,
585 null,
586 triggerTime(initialDelay, unit),
587 unit.toNanos(period),
588 sequencer.getAndIncrement());
589 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
590 sft.outerTask = t;
591 delayedExecute(t);
592 return t;
593 }
594
595 /**
596 * @throws RejectedExecutionException {@inheritDoc}
597 * @throws NullPointerException {@inheritDoc}
598 * @throws IllegalArgumentException {@inheritDoc}
599 */
600 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
601 long initialDelay,
602 long delay,
603 TimeUnit unit) {
604 if (command == null || unit == null)
605 throw new NullPointerException();
606 if (delay <= 0L)
607 throw new IllegalArgumentException();
608 ScheduledFutureTask<Void> sft =
609 new ScheduledFutureTask<Void>(command,
610 null,
611 triggerTime(initialDelay, unit),
612 -unit.toNanos(delay),
613 sequencer.getAndIncrement());
614 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
615 sft.outerTask = t;
616 delayedExecute(t);
617 return t;
618 }
619
620 /**
621 * Executes {@code command} with zero required delay.
622 * This has effect equivalent to
623 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
624 * Note that inspections of the queue and of the list returned by
625 * {@code shutdownNow} will access the zero-delayed
626 * {@link ScheduledFuture}, not the {@code command} itself.
627 *
628 * <p>A consequence of the use of {@code ScheduledFuture} objects is
629 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
630 * called with a null second {@code Throwable} argument, even if the
631 * {@code command} terminated abruptly. Instead, the {@code Throwable}
632 * thrown by such a task can be obtained via {@link Future#get}.
633 *
634 * @throws RejectedExecutionException at discretion of
635 * {@code RejectedExecutionHandler}, if the task
636 * cannot be accepted for execution because the
637 * executor has been shut down
638 * @throws NullPointerException {@inheritDoc}
639 */
640 public void execute(Runnable command) {
641 schedule(command, 0, NANOSECONDS);
642 }
643
644 // Override AbstractExecutorService methods
645
646 /**
647 * @throws RejectedExecutionException {@inheritDoc}
648 * @throws NullPointerException {@inheritDoc}
649 */
650 public Future<?> submit(Runnable task) {
651 return schedule(task, 0, NANOSECONDS);
652 }
653
654 /**
655 * @throws RejectedExecutionException {@inheritDoc}
656 * @throws NullPointerException {@inheritDoc}
657 */
658 public <T> Future<T> submit(Runnable task, T result) {
659 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
660 }
661
662 /**
663 * @throws RejectedExecutionException {@inheritDoc}
664 * @throws NullPointerException {@inheritDoc}
665 */
666 public <T> Future<T> submit(Callable<T> task) {
667 return schedule(task, 0, NANOSECONDS);
668 }
669
670 /**
671 * Sets the policy on whether to continue executing existing
672 * periodic tasks even when this executor has been {@code shutdown}.
673 * This value is by default {@code false}.
674 *
675 * <p>If the policy is set to {@code true}, periodic tasks will
676 * continue executing until one of the following happens:
677 *
678 * <ul>
679 * <li>{@link #shutdownNow} is called
680 * <li>the policy is set to {@code false} when already shutdown
681 * <li>the task's future is {@linkplain Future#cancel cancelled}
682 * <li>an execution of the task terminates with an exception
683 * </ul>
684 *
685 * @param value if {@code true}, continue after shutdown, else don't
686 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
687 */
688 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
689 continueExistingPeriodicTasksAfterShutdown = value;
690 if (!value && isShutdown())
691 onShutdown();
692 }
693
694 /**
695 * Gets the policy on whether to continue executing existing
696 * periodic tasks even when this executor has been {@code shutdown}.
697 * This value is by default {@code false}.
698 *
699 * <p>If the policy is set to {@code true}, periodic tasks will
700 * continue executing until one of the following happens:
701 *
702 * <ul>
703 * <li>{@link #shutdownNow} is called
704 * <li>the policy is set to {@code false} when already shutdown
705 * <li>the task's future is {@linkplain Future#cancel cancelled}
706 * <li>an execution of the task terminates with an exception
707 * </ul>
708 *
709 * @return {@code true} if will continue after shutdown
710 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
711 */
712 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
713 return continueExistingPeriodicTasksAfterShutdown;
714 }
715
716 /**
717 * Sets the policy on whether to execute existing delayed
718 * tasks even when this executor has been {@code shutdown}.
719 * In this case, these tasks will only terminate upon
720 * {@code shutdownNow}, or after setting the policy to
721 * {@code false} when already shutdown.
722 * This value is by default {@code true}.
723 *
724 * @param value if {@code true}, execute after shutdown, else don't
725 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
726 */
727 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
728 executeExistingDelayedTasksAfterShutdown = value;
729 if (!value && isShutdown())
730 onShutdown();
731 }
732
733 /**
734 * Gets the policy on whether to execute existing delayed
735 * tasks even when this executor has been {@code shutdown}.
736 * In this case, these tasks will only terminate upon
737 * {@code shutdownNow}, or after setting the policy to
738 * {@code false} when already shutdown.
739 * This value is by default {@code true}.
740 *
741 * @return {@code true} if will execute after shutdown
742 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
743 */
744 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
745 return executeExistingDelayedTasksAfterShutdown;
746 }
747
748 /**
749 * Sets the policy on whether cancelled tasks should be immediately
750 * removed from the work queue at time of cancellation. This value is
751 * by default {@code false}.
752 *
753 * @param value if {@code true}, remove on cancellation, else don't
754 * @see #getRemoveOnCancelPolicy
755 * @since 1.7
756 */
757 public void setRemoveOnCancelPolicy(boolean value) {
758 removeOnCancel = value;
759 }
760
761 /**
762 * Gets the policy on whether cancelled tasks should be immediately
763 * removed from the work queue at time of cancellation. This value is
764 * by default {@code false}.
765 *
766 * @return {@code true} if cancelled tasks are immediately removed
767 * from the queue
768 * @see #setRemoveOnCancelPolicy
769 * @since 1.7
770 */
771 public boolean getRemoveOnCancelPolicy() {
772 return removeOnCancel;
773 }
774
775 /**
776 * Initiates an orderly shutdown in which previously submitted
777 * tasks are executed, but no new tasks will be accepted.
778 * Invocation has no additional effect if already shut down.
779 *
780 * <p>This method does not wait for previously submitted tasks to
781 * complete execution. Use {@link #awaitTermination awaitTermination}
782 * to do that.
783 *
784 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
785 * has been set {@code false}, existing delayed tasks whose delays
786 * have not yet elapsed are cancelled. And unless the {@code
787 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
788 * {@code true}, future executions of existing periodic tasks will
789 * be cancelled.
790 *
791 * @throws SecurityException {@inheritDoc}
792 */
793 public void shutdown() {
794 super.shutdown();
795 }
796
797 /**
798 * Attempts to stop all actively executing tasks, halts the
799 * processing of waiting tasks, and returns a list of the tasks
800 * that were awaiting execution. These tasks are drained (removed)
801 * from the task queue upon return from this method.
802 *
803 * <p>This method does not wait for actively executing tasks to
804 * terminate. Use {@link #awaitTermination awaitTermination} to
805 * do that.
806 *
807 * <p>There are no guarantees beyond best-effort attempts to stop
808 * processing actively executing tasks. This implementation
809 * interrupts tasks via {@link Thread#interrupt}; any task that
810 * fails to respond to interrupts may never terminate.
811 *
812 * @return list of tasks that never commenced execution.
813 * Each element of this list is a {@link ScheduledFuture}.
814 * For tasks submitted via one of the {@code schedule}
815 * methods, the element will be identical to the returned
816 * {@code ScheduledFuture}. For tasks submitted using
817 * {@link #execute execute}, the element will be a
818 * zero-delay {@code ScheduledFuture}.
819 * @throws SecurityException {@inheritDoc}
820 */
821 public List<Runnable> shutdownNow() {
822 return super.shutdownNow();
823 }
824
825 /**
826 * Returns the task queue used by this executor. Access to the
827 * task queue is intended primarily for debugging and monitoring.
828 * This queue may be in active use. Retrieving the task queue
829 * does not prevent queued tasks from executing.
830 *
831 * <p>Each element of this queue is a {@link ScheduledFuture}.
832 * For tasks submitted via one of the {@code schedule} methods, the
833 * element will be identical to the returned {@code ScheduledFuture}.
834 * For tasks submitted using {@link #execute execute}, the element
835 * will be a zero-delay {@code ScheduledFuture}.
836 *
837 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
838 * tasks in the order in which they will execute.
839 *
840 * @return the task queue
841 */
842 public BlockingQueue<Runnable> getQueue() {
843 return super.getQueue();
844 }
845
846 /**
847 * Specialized delay queue. To mesh with TPE declarations, this
848 * class must be declared as a BlockingQueue<Runnable> even though
849 * it can only hold RunnableScheduledFutures.
850 */
851 static class DelayedWorkQueue extends AbstractQueue<Runnable>
852 implements BlockingQueue<Runnable> {
853
854 /*
855 * A DelayedWorkQueue is based on a heap-based data structure
856 * like those in DelayQueue and PriorityQueue, except that
857 * every ScheduledFutureTask also records its index into the
858 * heap array. This eliminates the need to find a task upon
859 * cancellation, greatly speeding up removal (down from O(n)
860 * to O(log n)), and reducing garbage retention that would
861 * otherwise occur by waiting for the element to rise to top
862 * before clearing. But because the queue may also hold
863 * RunnableScheduledFutures that are not ScheduledFutureTasks,
864 * we are not guaranteed to have such indices available, in
865 * which case we fall back to linear search. (We expect that
866 * most tasks will not be decorated, and that the faster cases
867 * will be much more common.)
868 *
869 * All heap operations must record index changes -- mainly
870 * within siftUp and siftDown. Upon removal, a task's
871 * heapIndex is set to -1. Note that ScheduledFutureTasks can
872 * appear at most once in the queue (this need not be true for
873 * other kinds of tasks or work queues), so are uniquely
874 * identified by heapIndex.
875 */
876
877 private static final int INITIAL_CAPACITY = 16;
878 private RunnableScheduledFuture<?>[] queue =
879 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
880 private final ReentrantLock lock = new ReentrantLock();
881 private int size;
882
883 /**
884 * Thread designated to wait for the task at the head of the
885 * queue. This variant of the Leader-Follower pattern
886 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
887 * minimize unnecessary timed waiting. When a thread becomes
888 * the leader, it waits only for the next delay to elapse, but
889 * other threads await indefinitely. The leader thread must
890 * signal some other thread before returning from take() or
891 * poll(...), unless some other thread becomes leader in the
892 * interim. Whenever the head of the queue is replaced with a
893 * task with an earlier expiration time, the leader field is
894 * invalidated by being reset to null, and some waiting
895 * thread, but not necessarily the current leader, is
896 * signalled. So waiting threads must be prepared to acquire
897 * and lose leadership while waiting.
898 */
899 private Thread leader;
900
901 /**
902 * Condition signalled when a newer task becomes available at the
903 * head of the queue or a new thread may need to become leader.
904 */
905 private final Condition available = lock.newCondition();
906
907 /**
908 * Sets f's heapIndex if it is a ScheduledFutureTask.
909 */
910 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
911 if (f instanceof ScheduledFutureTask)
912 ((ScheduledFutureTask)f).heapIndex = idx;
913 }
914
915 /**
916 * Sifts element added at bottom up to its heap-ordered spot.
917 * Call only when holding lock.
918 */
919 private void siftUp(int k, RunnableScheduledFuture<?> key) {
920 while (k > 0) {
921 int parent = (k - 1) >>> 1;
922 RunnableScheduledFuture<?> e = queue[parent];
923 if (key.compareTo(e) >= 0)
924 break;
925 queue[k] = e;
926 setIndex(e, k);
927 k = parent;
928 }
929 queue[k] = key;
930 setIndex(key, k);
931 }
932
933 /**
934 * Sifts element added at top down to its heap-ordered spot.
935 * Call only when holding lock.
936 */
937 private void siftDown(int k, RunnableScheduledFuture<?> key) {
938 int half = size >>> 1;
939 while (k < half) {
940 int child = (k << 1) + 1;
941 RunnableScheduledFuture<?> c = queue[child];
942 int right = child + 1;
943 if (right < size && c.compareTo(queue[right]) > 0)
944 c = queue[child = right];
945 if (key.compareTo(c) <= 0)
946 break;
947 queue[k] = c;
948 setIndex(c, k);
949 k = child;
950 }
951 queue[k] = key;
952 setIndex(key, k);
953 }
954
955 /**
956 * Resizes the heap array. Call only when holding lock.
957 */
958 private void grow() {
959 int oldCapacity = queue.length;
960 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
961 if (newCapacity < 0) // overflow
962 newCapacity = Integer.MAX_VALUE;
963 queue = Arrays.copyOf(queue, newCapacity);
964 }
965
966 /**
967 * Finds index of given object, or -1 if absent.
968 */
969 private int indexOf(Object x) {
970 if (x != null) {
971 if (x instanceof ScheduledFutureTask) {
972 int i = ((ScheduledFutureTask) x).heapIndex;
973 // Sanity check; x could conceivably be a
974 // ScheduledFutureTask from some other pool.
975 if (i >= 0 && i < size && queue[i] == x)
976 return i;
977 } else {
978 for (int i = 0; i < size; i++)
979 if (x.equals(queue[i]))
980 return i;
981 }
982 }
983 return -1;
984 }
985
986 public boolean contains(Object x) {
987 final ReentrantLock lock = this.lock;
988 lock.lock();
989 try {
990 return indexOf(x) != -1;
991 } finally {
992 lock.unlock();
993 }
994 }
995
996 public boolean remove(Object x) {
997 final ReentrantLock lock = this.lock;
998 lock.lock();
999 try {
1000 int i = indexOf(x);
1001 if (i < 0)
1002 return false;
1003
1004 setIndex(queue[i], -1);
1005 int s = --size;
1006 RunnableScheduledFuture<?> replacement = queue[s];
1007 queue[s] = null;
1008 if (s != i) {
1009 siftDown(i, replacement);
1010 if (queue[i] == replacement)
1011 siftUp(i, replacement);
1012 }
1013 return true;
1014 } finally {
1015 lock.unlock();
1016 }
1017 }
1018
1019 public int size() {
1020 final ReentrantLock lock = this.lock;
1021 lock.lock();
1022 try {
1023 return size;
1024 } finally {
1025 lock.unlock();
1026 }
1027 }
1028
1029 public boolean isEmpty() {
1030 return size() == 0;
1031 }
1032
1033 public int remainingCapacity() {
1034 return Integer.MAX_VALUE;
1035 }
1036
1037 public RunnableScheduledFuture<?> peek() {
1038 final ReentrantLock lock = this.lock;
1039 lock.lock();
1040 try {
1041 return queue[0];
1042 } finally {
1043 lock.unlock();
1044 }
1045 }
1046
1047 public boolean offer(Runnable x) {
1048 if (x == null)
1049 throw new NullPointerException();
1050 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1051 final ReentrantLock lock = this.lock;
1052 lock.lock();
1053 try {
1054 int i = size;
1055 if (i >= queue.length)
1056 grow();
1057 size = i + 1;
1058 if (i == 0) {
1059 queue[0] = e;
1060 setIndex(e, 0);
1061 } else {
1062 siftUp(i, e);
1063 }
1064 if (queue[0] == e) {
1065 leader = null;
1066 available.signal();
1067 }
1068 } finally {
1069 lock.unlock();
1070 }
1071 return true;
1072 }
1073
1074 public void put(Runnable e) {
1075 offer(e);
1076 }
1077
1078 public boolean add(Runnable e) {
1079 return offer(e);
1080 }
1081
1082 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1083 return offer(e);
1084 }
1085
1086 /**
1087 * Performs common bookkeeping for poll and take: Replaces
1088 * first element with last and sifts it down. Call only when
1089 * holding lock.
1090 * @param f the task to remove and return
1091 */
1092 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1093 int s = --size;
1094 RunnableScheduledFuture<?> x = queue[s];
1095 queue[s] = null;
1096 if (s != 0)
1097 siftDown(0, x);
1098 setIndex(f, -1);
1099 return f;
1100 }
1101
1102 public RunnableScheduledFuture<?> poll() {
1103 final ReentrantLock lock = this.lock;
1104 lock.lock();
1105 try {
1106 RunnableScheduledFuture<?> first = queue[0];
1107 return (first == null || first.getDelay(NANOSECONDS) > 0)
1108 ? null
1109 : finishPoll(first);
1110 } finally {
1111 lock.unlock();
1112 }
1113 }
1114
1115 public RunnableScheduledFuture<?> take() throws InterruptedException {
1116 final ReentrantLock lock = this.lock;
1117 lock.lockInterruptibly();
1118 try {
1119 for (;;) {
1120 RunnableScheduledFuture<?> first = queue[0];
1121 if (first == null)
1122 available.await();
1123 else {
1124 long delay = first.getDelay(NANOSECONDS);
1125 if (delay <= 0L)
1126 return finishPoll(first);
1127 first = null; // don't retain ref while waiting
1128 if (leader != null)
1129 available.await();
1130 else {
1131 Thread thisThread = Thread.currentThread();
1132 leader = thisThread;
1133 try {
1134 available.awaitNanos(delay);
1135 } finally {
1136 if (leader == thisThread)
1137 leader = null;
1138 }
1139 }
1140 }
1141 }
1142 } finally {
1143 if (leader == null && queue[0] != null)
1144 available.signal();
1145 lock.unlock();
1146 }
1147 }
1148
1149 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1150 throws InterruptedException {
1151 long nanos = unit.toNanos(timeout);
1152 final ReentrantLock lock = this.lock;
1153 lock.lockInterruptibly();
1154 try {
1155 for (;;) {
1156 RunnableScheduledFuture<?> first = queue[0];
1157 if (first == null) {
1158 if (nanos <= 0L)
1159 return null;
1160 else
1161 nanos = available.awaitNanos(nanos);
1162 } else {
1163 long delay = first.getDelay(NANOSECONDS);
1164 if (delay <= 0L)
1165 return finishPoll(first);
1166 if (nanos <= 0L)
1167 return null;
1168 first = null; // don't retain ref while waiting
1169 if (nanos < delay || leader != null)
1170 nanos = available.awaitNanos(nanos);
1171 else {
1172 Thread thisThread = Thread.currentThread();
1173 leader = thisThread;
1174 try {
1175 long timeLeft = available.awaitNanos(delay);
1176 nanos -= delay - timeLeft;
1177 } finally {
1178 if (leader == thisThread)
1179 leader = null;
1180 }
1181 }
1182 }
1183 }
1184 } finally {
1185 if (leader == null && queue[0] != null)
1186 available.signal();
1187 lock.unlock();
1188 }
1189 }
1190
1191 public void clear() {
1192 final ReentrantLock lock = this.lock;
1193 lock.lock();
1194 try {
1195 for (int i = 0; i < size; i++) {
1196 RunnableScheduledFuture<?> t = queue[i];
1197 if (t != null) {
1198 queue[i] = null;
1199 setIndex(t, -1);
1200 }
1201 }
1202 size = 0;
1203 } finally {
1204 lock.unlock();
1205 }
1206 }
1207
1208 public int drainTo(Collection<? super Runnable> c) {
1209 return drainTo(c, Integer.MAX_VALUE);
1210 }
1211
1212 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1213 Objects.requireNonNull(c);
1214 if (c == this)
1215 throw new IllegalArgumentException();
1216 if (maxElements <= 0)
1217 return 0;
1218 final ReentrantLock lock = this.lock;
1219 lock.lock();
1220 try {
1221 int n = 0;
1222 for (RunnableScheduledFuture<?> first;
1223 n < maxElements
1224 && (first = queue[0]) != null
1225 && first.getDelay(NANOSECONDS) <= 0;) {
1226 c.add(first); // In this order, in case add() throws.
1227 finishPoll(first);
1228 ++n;
1229 }
1230 return n;
1231 } finally {
1232 lock.unlock();
1233 }
1234 }
1235
1236 public Object[] toArray() {
1237 final ReentrantLock lock = this.lock;
1238 lock.lock();
1239 try {
1240 return Arrays.copyOf(queue, size, Object[].class);
1241 } finally {
1242 lock.unlock();
1243 }
1244 }
1245
1246 @SuppressWarnings("unchecked")
1247 public <T> T[] toArray(T[] a) {
1248 final ReentrantLock lock = this.lock;
1249 lock.lock();
1250 try {
1251 if (a.length < size)
1252 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1253 System.arraycopy(queue, 0, a, 0, size);
1254 if (a.length > size)
1255 a[size] = null;
1256 return a;
1257 } finally {
1258 lock.unlock();
1259 }
1260 }
1261
1262 public Iterator<Runnable> iterator() {
1263 final ReentrantLock lock = this.lock;
1264 lock.lock();
1265 try {
1266 return new Itr(Arrays.copyOf(queue, size));
1267 } finally {
1268 lock.unlock();
1269 }
1270 }
1271
1272 /**
1273 * Snapshot iterator that works off copy of underlying q array.
1274 */
1275 private class Itr implements Iterator<Runnable> {
1276 final RunnableScheduledFuture<?>[] array;
1277 int cursor; // index of next element to return; initially 0
1278 int lastRet = -1; // index of last element returned; -1 if no such
1279
1280 Itr(RunnableScheduledFuture<?>[] array) {
1281 this.array = array;
1282 }
1283
1284 public boolean hasNext() {
1285 return cursor < array.length;
1286 }
1287
1288 public Runnable next() {
1289 if (cursor >= array.length)
1290 throw new NoSuchElementException();
1291 return array[lastRet = cursor++];
1292 }
1293
1294 public void remove() {
1295 if (lastRet < 0)
1296 throw new IllegalStateException();
1297 DelayedWorkQueue.this.remove(array[lastRet]);
1298 lastRet = -1;
1299 }
1300 }
1301 }
1302 }