ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.106
Committed: Tue Mar 28 00:41:44 2017 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.105: +6 -6 lines
Log Message:
align drainTo implementations

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.Objects;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Condition;
21 import java.util.concurrent.locks.ReentrantLock;
22
23 /**
24 * A {@link ThreadPoolExecutor} that can additionally schedule
25 * commands to run after a given delay, or to execute periodically.
26 * This class is preferable to {@link java.util.Timer} when multiple
27 * worker threads are needed, or when the additional flexibility or
28 * capabilities of {@link ThreadPoolExecutor} (which this class
29 * extends) are required.
30 *
31 * <p>Delayed tasks execute no sooner than they are enabled, but
32 * without any real-time guarantees about when, after they are
33 * enabled, they will commence. Tasks scheduled for exactly the same
34 * execution time are enabled in first-in-first-out (FIFO) order of
35 * submission.
36 *
37 * <p>When a submitted task is cancelled before it is run, execution
38 * is suppressed. By default, such a cancelled task is not
39 * automatically removed from the work queue until its delay elapses.
40 * While this enables further inspection and monitoring, it may also
41 * cause unbounded retention of cancelled tasks. To avoid this, use
42 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
43 * removed from the work queue at time of cancellation.
44 *
45 * <p>Successive executions of a periodic task scheduled via
46 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
47 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
48 * do not overlap. While different executions may be performed by
49 * different threads, the effects of prior executions
50 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
51 * those of subsequent ones.
52 *
53 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
54 * of the inherited tuning methods are not useful for it. In
55 * particular, because it acts as a fixed-sized pool using
56 * {@code corePoolSize} threads and an unbounded queue, adjustments
57 * to {@code maximumPoolSize} have no useful effect. Additionally, it
58 * is almost never a good idea to set {@code corePoolSize} to zero or
59 * use {@code allowCoreThreadTimeOut} because this may leave the pool
60 * without threads to handle tasks once they become eligible to run.
61 *
62 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
63 * this class uses {@link Executors#defaultThreadFactory} as the
64 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
65 * as the default rejected execution handler.
66 *
67 * <p><b>Extension notes:</b> This class overrides the
68 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
69 * {@link AbstractExecutorService#submit(Runnable) submit}
70 * methods to generate internal {@link ScheduledFuture} objects to
71 * control per-task delays and scheduling. To preserve
72 * functionality, any further overrides of these methods in
73 * subclasses must invoke superclass versions, which effectively
74 * disables additional task customization. However, this class
75 * provides alternative protected extension method
76 * {@code decorateTask} (one version each for {@code Runnable} and
77 * {@code Callable}) that can be used to customize the concrete task
78 * types used to execute commands entered via {@code execute},
79 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
80 * and {@code scheduleWithFixedDelay}. By default, a
81 * {@code ScheduledThreadPoolExecutor} uses a task type extending
82 * {@link FutureTask}. However, this may be modified or replaced using
83 * subclasses of the form:
84 *
85 * <pre> {@code
86 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
87 *
88 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
89 *
90 * protected <V> RunnableScheduledFuture<V> decorateTask(
91 * Runnable r, RunnableScheduledFuture<V> task) {
92 * return new CustomTask<V>(r, task);
93 * }
94 *
95 * protected <V> RunnableScheduledFuture<V> decorateTask(
96 * Callable<V> c, RunnableScheduledFuture<V> task) {
97 * return new CustomTask<V>(c, task);
98 * }
99 * // ... add constructors, etc.
100 * }}</pre>
101 *
102 * @since 1.5
103 * @author Doug Lea
104 */
105 public class ScheduledThreadPoolExecutor
106 extends ThreadPoolExecutor
107 implements ScheduledExecutorService {
108
109 /*
110 * This class specializes ThreadPoolExecutor implementation by
111 *
112 * 1. Using a custom task type ScheduledFutureTask, even for tasks
113 * that don't require scheduling because they are submitted
114 * using ExecutorService rather than ScheduledExecutorService
115 * methods, which are treated as tasks with a delay of zero.
116 *
117 * 2. Using a custom queue (DelayedWorkQueue), a variant of
118 * unbounded DelayQueue. The lack of capacity constraint and
119 * the fact that corePoolSize and maximumPoolSize are
120 * effectively identical simplifies some execution mechanics
121 * (see delayedExecute) compared to ThreadPoolExecutor.
122 *
123 * 3. Supporting optional run-after-shutdown parameters, which
124 * leads to overrides of shutdown methods to remove and cancel
125 * tasks that should NOT be run after shutdown, as well as
126 * different recheck logic when task (re)submission overlaps
127 * with a shutdown.
128 *
129 * 4. Task decoration methods to allow interception and
130 * instrumentation, which are needed because subclasses cannot
131 * otherwise override submit methods to get this effect. These
132 * don't have any impact on pool control logic though.
133 */
134
135 /**
136 * False if should cancel/suppress periodic tasks on shutdown.
137 */
138 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
139
140 /**
141 * False if should cancel non-periodic tasks on shutdown.
142 */
143 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
144
145 /**
146 * True if ScheduledFutureTask.cancel should remove from queue.
147 */
148 volatile boolean removeOnCancel;
149
150 /**
151 * Sequence number to break scheduling ties, and in turn to
152 * guarantee FIFO order among tied entries.
153 */
154 private static final AtomicLong sequencer = new AtomicLong();
155
156 private class ScheduledFutureTask<V>
157 extends FutureTask<V> implements RunnableScheduledFuture<V> {
158
159 /** Sequence number to break ties FIFO */
160 private final long sequenceNumber;
161
162 /** The nanoTime-based time when the task is enabled to execute. */
163 private volatile long time;
164
165 /**
166 * Period for repeating tasks, in nanoseconds.
167 * A positive value indicates fixed-rate execution.
168 * A negative value indicates fixed-delay execution.
169 * A value of 0 indicates a non-repeating (one-shot) task.
170 */
171 private final long period;
172
173 /** The actual task to be re-enqueued by reExecutePeriodic */
174 RunnableScheduledFuture<V> outerTask = this;
175
176 /**
177 * Index into delay queue, to support faster cancellation.
178 */
179 int heapIndex;
180
181 /**
182 * Creates a one-shot action with given nanoTime-based trigger time.
183 */
184 ScheduledFutureTask(Runnable r, V result, long triggerTime,
185 long sequenceNumber) {
186 super(r, result);
187 this.time = triggerTime;
188 this.period = 0;
189 this.sequenceNumber = sequenceNumber;
190 }
191
192 /**
193 * Creates a periodic action with given nanoTime-based initial
194 * trigger time and period.
195 */
196 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197 long period, long sequenceNumber) {
198 super(r, result);
199 this.time = triggerTime;
200 this.period = period;
201 this.sequenceNumber = sequenceNumber;
202 }
203
204 /**
205 * Creates a one-shot action with given nanoTime-based trigger time.
206 */
207 ScheduledFutureTask(Callable<V> callable, long triggerTime,
208 long sequenceNumber) {
209 super(callable);
210 this.time = triggerTime;
211 this.period = 0;
212 this.sequenceNumber = sequenceNumber;
213 }
214
215 public long getDelay(TimeUnit unit) {
216 return unit.convert(time - System.nanoTime(), NANOSECONDS);
217 }
218
219 public int compareTo(Delayed other) {
220 if (other == this) // compare zero if same object
221 return 0;
222 if (other instanceof ScheduledFutureTask) {
223 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
224 long diff = time - x.time;
225 if (diff < 0)
226 return -1;
227 else if (diff > 0)
228 return 1;
229 else if (sequenceNumber < x.sequenceNumber)
230 return -1;
231 else
232 return 1;
233 }
234 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
235 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
236 }
237
238 /**
239 * Returns {@code true} if this is a periodic (not a one-shot) action.
240 *
241 * @return {@code true} if periodic
242 */
243 public boolean isPeriodic() {
244 return period != 0;
245 }
246
247 /**
248 * Sets the next time to run for a periodic task.
249 */
250 private void setNextRunTime() {
251 long p = period;
252 if (p > 0)
253 time += p;
254 else
255 time = triggerTime(-p);
256 }
257
258 public boolean cancel(boolean mayInterruptIfRunning) {
259 // The racy read of heapIndex below is benign:
260 // if heapIndex < 0, then OOTA guarantees that we have surely
261 // been removed; else we recheck under lock in remove()
262 boolean cancelled = super.cancel(mayInterruptIfRunning);
263 if (cancelled && removeOnCancel && heapIndex >= 0)
264 remove(this);
265 return cancelled;
266 }
267
268 /**
269 * Overrides FutureTask version so as to reset/requeue if periodic.
270 */
271 public void run() {
272 boolean periodic = isPeriodic();
273 if (!canRunInCurrentRunState(periodic))
274 cancel(false);
275 else if (!periodic)
276 super.run();
277 else if (super.runAndReset()) {
278 setNextRunTime();
279 reExecutePeriodic(outerTask);
280 }
281 }
282 }
283
284 /**
285 * Returns true if can run a task given current run state
286 * and run-after-shutdown parameters.
287 *
288 * @param periodic true if this task periodic, false if delayed
289 */
290 boolean canRunInCurrentRunState(boolean periodic) {
291 return isRunningOrShutdown(periodic ?
292 continueExistingPeriodicTasksAfterShutdown :
293 executeExistingDelayedTasksAfterShutdown);
294 }
295
296 /**
297 * Main execution method for delayed or periodic tasks. If pool
298 * is shut down, rejects the task. Otherwise adds task to queue
299 * and starts a thread, if necessary, to run it. (We cannot
300 * prestart the thread to run the task because the task (probably)
301 * shouldn't be run yet.) If the pool is shut down while the task
302 * is being added, cancel and remove it if required by state and
303 * run-after-shutdown parameters.
304 *
305 * @param task the task
306 */
307 private void delayedExecute(RunnableScheduledFuture<?> task) {
308 if (isShutdown())
309 reject(task);
310 else {
311 super.getQueue().add(task);
312 if (isShutdown() &&
313 !canRunInCurrentRunState(task.isPeriodic()) &&
314 remove(task))
315 task.cancel(false);
316 else
317 ensurePrestart();
318 }
319 }
320
321 /**
322 * Requeues a periodic task unless current run state precludes it.
323 * Same idea as delayedExecute except drops task rather than rejecting.
324 *
325 * @param task the task
326 */
327 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
328 if (canRunInCurrentRunState(true)) {
329 super.getQueue().add(task);
330 if (!canRunInCurrentRunState(true) && remove(task))
331 task.cancel(false);
332 else
333 ensurePrestart();
334 }
335 }
336
337 /**
338 * Cancels and clears the queue of all tasks that should not be run
339 * due to shutdown policy. Invoked within super.shutdown.
340 */
341 @Override void onShutdown() {
342 BlockingQueue<Runnable> q = super.getQueue();
343 boolean keepDelayed =
344 getExecuteExistingDelayedTasksAfterShutdownPolicy();
345 boolean keepPeriodic =
346 getContinueExistingPeriodicTasksAfterShutdownPolicy();
347 if (!keepDelayed && !keepPeriodic) {
348 for (Object e : q.toArray())
349 if (e instanceof RunnableScheduledFuture<?>)
350 ((RunnableScheduledFuture<?>) e).cancel(false);
351 q.clear();
352 }
353 else {
354 // Traverse snapshot to avoid iterator exceptions
355 for (Object e : q.toArray()) {
356 if (e instanceof RunnableScheduledFuture) {
357 RunnableScheduledFuture<?> t =
358 (RunnableScheduledFuture<?>)e;
359 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
360 t.isCancelled()) { // also remove if already cancelled
361 if (q.remove(t))
362 t.cancel(false);
363 }
364 }
365 }
366 }
367 tryTerminate();
368 }
369
370 /**
371 * Modifies or replaces the task used to execute a runnable.
372 * This method can be used to override the concrete
373 * class used for managing internal tasks.
374 * The default implementation simply returns the given task.
375 *
376 * @param runnable the submitted Runnable
377 * @param task the task created to execute the runnable
378 * @param <V> the type of the task's result
379 * @return a task that can execute the runnable
380 * @since 1.6
381 */
382 protected <V> RunnableScheduledFuture<V> decorateTask(
383 Runnable runnable, RunnableScheduledFuture<V> task) {
384 return task;
385 }
386
387 /**
388 * Modifies or replaces the task used to execute a callable.
389 * This method can be used to override the concrete
390 * class used for managing internal tasks.
391 * The default implementation simply returns the given task.
392 *
393 * @param callable the submitted Callable
394 * @param task the task created to execute the callable
395 * @param <V> the type of the task's result
396 * @return a task that can execute the callable
397 * @since 1.6
398 */
399 protected <V> RunnableScheduledFuture<V> decorateTask(
400 Callable<V> callable, RunnableScheduledFuture<V> task) {
401 return task;
402 }
403
404 /**
405 * The default keep-alive time for pool threads.
406 *
407 * Normally, this value is unused because all pool threads will be
408 * core threads, but if a user creates a pool with a corePoolSize
409 * of zero (against our advice), we keep a thread alive as long as
410 * there are queued tasks. If the keep alive time is zero (the
411 * historic value), we end up hot-spinning in getTask, wasting a
412 * CPU. But on the other hand, if we set the value too high, and
413 * users create a one-shot pool which they don't cleanly shutdown,
414 * the pool's non-daemon threads will prevent JVM termination. A
415 * small but non-zero value (relative to a JVM's lifetime) seems
416 * best.
417 */
418 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
419
420 /**
421 * Creates a new {@code ScheduledThreadPoolExecutor} with the
422 * given core pool size.
423 *
424 * @param corePoolSize the number of threads to keep in the pool, even
425 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
426 * @throws IllegalArgumentException if {@code corePoolSize < 0}
427 */
428 public ScheduledThreadPoolExecutor(int corePoolSize) {
429 super(corePoolSize, Integer.MAX_VALUE,
430 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
431 new DelayedWorkQueue());
432 }
433
434 /**
435 * Creates a new {@code ScheduledThreadPoolExecutor} with the
436 * given initial parameters.
437 *
438 * @param corePoolSize the number of threads to keep in the pool, even
439 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
440 * @param threadFactory the factory to use when the executor
441 * creates a new thread
442 * @throws IllegalArgumentException if {@code corePoolSize < 0}
443 * @throws NullPointerException if {@code threadFactory} is null
444 */
445 public ScheduledThreadPoolExecutor(int corePoolSize,
446 ThreadFactory threadFactory) {
447 super(corePoolSize, Integer.MAX_VALUE,
448 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
449 new DelayedWorkQueue(), threadFactory);
450 }
451
452 /**
453 * Creates a new {@code ScheduledThreadPoolExecutor} with the
454 * given initial parameters.
455 *
456 * @param corePoolSize the number of threads to keep in the pool, even
457 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
458 * @param handler the handler to use when execution is blocked
459 * because the thread bounds and queue capacities are reached
460 * @throws IllegalArgumentException if {@code corePoolSize < 0}
461 * @throws NullPointerException if {@code handler} is null
462 */
463 public ScheduledThreadPoolExecutor(int corePoolSize,
464 RejectedExecutionHandler handler) {
465 super(corePoolSize, Integer.MAX_VALUE,
466 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
467 new DelayedWorkQueue(), handler);
468 }
469
470 /**
471 * Creates a new {@code ScheduledThreadPoolExecutor} with the
472 * given initial parameters.
473 *
474 * @param corePoolSize the number of threads to keep in the pool, even
475 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
476 * @param threadFactory the factory to use when the executor
477 * creates a new thread
478 * @param handler the handler to use when execution is blocked
479 * because the thread bounds and queue capacities are reached
480 * @throws IllegalArgumentException if {@code corePoolSize < 0}
481 * @throws NullPointerException if {@code threadFactory} or
482 * {@code handler} is null
483 */
484 public ScheduledThreadPoolExecutor(int corePoolSize,
485 ThreadFactory threadFactory,
486 RejectedExecutionHandler handler) {
487 super(corePoolSize, Integer.MAX_VALUE,
488 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
489 new DelayedWorkQueue(), threadFactory, handler);
490 }
491
492 /**
493 * Returns the nanoTime-based trigger time of a delayed action.
494 */
495 private long triggerTime(long delay, TimeUnit unit) {
496 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
497 }
498
499 /**
500 * Returns the nanoTime-based trigger time of a delayed action.
501 */
502 long triggerTime(long delay) {
503 return System.nanoTime() +
504 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
505 }
506
507 /**
508 * Constrains the values of all delays in the queue to be within
509 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
510 * This may occur if a task is eligible to be dequeued, but has
511 * not yet been, while some other task is added with a delay of
512 * Long.MAX_VALUE.
513 */
514 private long overflowFree(long delay) {
515 Delayed head = (Delayed) super.getQueue().peek();
516 if (head != null) {
517 long headDelay = head.getDelay(NANOSECONDS);
518 if (headDelay < 0 && (delay - headDelay < 0))
519 delay = Long.MAX_VALUE + headDelay;
520 }
521 return delay;
522 }
523
524 /**
525 * @throws RejectedExecutionException {@inheritDoc}
526 * @throws NullPointerException {@inheritDoc}
527 */
528 public ScheduledFuture<?> schedule(Runnable command,
529 long delay,
530 TimeUnit unit) {
531 if (command == null || unit == null)
532 throw new NullPointerException();
533 RunnableScheduledFuture<Void> t = decorateTask(command,
534 new ScheduledFutureTask<Void>(command, null,
535 triggerTime(delay, unit),
536 sequencer.getAndIncrement()));
537 delayedExecute(t);
538 return t;
539 }
540
541 /**
542 * @throws RejectedExecutionException {@inheritDoc}
543 * @throws NullPointerException {@inheritDoc}
544 */
545 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
546 long delay,
547 TimeUnit unit) {
548 if (callable == null || unit == null)
549 throw new NullPointerException();
550 RunnableScheduledFuture<V> t = decorateTask(callable,
551 new ScheduledFutureTask<V>(callable,
552 triggerTime(delay, unit),
553 sequencer.getAndIncrement()));
554 delayedExecute(t);
555 return t;
556 }
557
558 /**
559 * @throws RejectedExecutionException {@inheritDoc}
560 * @throws NullPointerException {@inheritDoc}
561 * @throws IllegalArgumentException {@inheritDoc}
562 */
563 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
564 long initialDelay,
565 long period,
566 TimeUnit unit) {
567 if (command == null || unit == null)
568 throw new NullPointerException();
569 if (period <= 0L)
570 throw new IllegalArgumentException();
571 ScheduledFutureTask<Void> sft =
572 new ScheduledFutureTask<Void>(command,
573 null,
574 triggerTime(initialDelay, unit),
575 unit.toNanos(period),
576 sequencer.getAndIncrement());
577 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
578 sft.outerTask = t;
579 delayedExecute(t);
580 return t;
581 }
582
583 /**
584 * @throws RejectedExecutionException {@inheritDoc}
585 * @throws NullPointerException {@inheritDoc}
586 * @throws IllegalArgumentException {@inheritDoc}
587 */
588 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
589 long initialDelay,
590 long delay,
591 TimeUnit unit) {
592 if (command == null || unit == null)
593 throw new NullPointerException();
594 if (delay <= 0L)
595 throw new IllegalArgumentException();
596 ScheduledFutureTask<Void> sft =
597 new ScheduledFutureTask<Void>(command,
598 null,
599 triggerTime(initialDelay, unit),
600 -unit.toNanos(delay),
601 sequencer.getAndIncrement());
602 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
603 sft.outerTask = t;
604 delayedExecute(t);
605 return t;
606 }
607
608 /**
609 * Executes {@code command} with zero required delay.
610 * This has effect equivalent to
611 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
612 * Note that inspections of the queue and of the list returned by
613 * {@code shutdownNow} will access the zero-delayed
614 * {@link ScheduledFuture}, not the {@code command} itself.
615 *
616 * <p>A consequence of the use of {@code ScheduledFuture} objects is
617 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
618 * called with a null second {@code Throwable} argument, even if the
619 * {@code command} terminated abruptly. Instead, the {@code Throwable}
620 * thrown by such a task can be obtained via {@link Future#get}.
621 *
622 * @throws RejectedExecutionException at discretion of
623 * {@code RejectedExecutionHandler}, if the task
624 * cannot be accepted for execution because the
625 * executor has been shut down
626 * @throws NullPointerException {@inheritDoc}
627 */
628 public void execute(Runnable command) {
629 schedule(command, 0, NANOSECONDS);
630 }
631
632 // Override AbstractExecutorService methods
633
634 /**
635 * @throws RejectedExecutionException {@inheritDoc}
636 * @throws NullPointerException {@inheritDoc}
637 */
638 public Future<?> submit(Runnable task) {
639 return schedule(task, 0, NANOSECONDS);
640 }
641
642 /**
643 * @throws RejectedExecutionException {@inheritDoc}
644 * @throws NullPointerException {@inheritDoc}
645 */
646 public <T> Future<T> submit(Runnable task, T result) {
647 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
648 }
649
650 /**
651 * @throws RejectedExecutionException {@inheritDoc}
652 * @throws NullPointerException {@inheritDoc}
653 */
654 public <T> Future<T> submit(Callable<T> task) {
655 return schedule(task, 0, NANOSECONDS);
656 }
657
658 /**
659 * Sets the policy on whether to continue executing existing
660 * periodic tasks even when this executor has been {@code shutdown}.
661 * In this case, these tasks will only terminate upon
662 * {@code shutdownNow} or after setting the policy to
663 * {@code false} when already shutdown.
664 * This value is by default {@code false}.
665 *
666 * @param value if {@code true}, continue after shutdown, else don't
667 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
668 */
669 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
670 continueExistingPeriodicTasksAfterShutdown = value;
671 if (!value && isShutdown())
672 onShutdown();
673 }
674
675 /**
676 * Gets the policy on whether to continue executing existing
677 * periodic tasks even when this executor has been {@code shutdown}.
678 * In this case, these tasks will only terminate upon
679 * {@code shutdownNow} or after setting the policy to
680 * {@code false} when already shutdown.
681 * This value is by default {@code false}.
682 *
683 * @return {@code true} if will continue after shutdown
684 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
685 */
686 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
687 return continueExistingPeriodicTasksAfterShutdown;
688 }
689
690 /**
691 * Sets the policy on whether to execute existing delayed
692 * tasks even when this executor has been {@code shutdown}.
693 * In this case, these tasks will only terminate upon
694 * {@code shutdownNow}, or after setting the policy to
695 * {@code false} when already shutdown.
696 * This value is by default {@code true}.
697 *
698 * @param value if {@code true}, execute after shutdown, else don't
699 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
700 */
701 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
702 executeExistingDelayedTasksAfterShutdown = value;
703 if (!value && isShutdown())
704 onShutdown();
705 }
706
707 /**
708 * Gets the policy on whether to execute existing delayed
709 * tasks even when this executor has been {@code shutdown}.
710 * In this case, these tasks will only terminate upon
711 * {@code shutdownNow}, or after setting the policy to
712 * {@code false} when already shutdown.
713 * This value is by default {@code true}.
714 *
715 * @return {@code true} if will execute after shutdown
716 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
717 */
718 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
719 return executeExistingDelayedTasksAfterShutdown;
720 }
721
722 /**
723 * Sets the policy on whether cancelled tasks should be immediately
724 * removed from the work queue at time of cancellation. This value is
725 * by default {@code false}.
726 *
727 * @param value if {@code true}, remove on cancellation, else don't
728 * @see #getRemoveOnCancelPolicy
729 * @since 1.7
730 */
731 public void setRemoveOnCancelPolicy(boolean value) {
732 removeOnCancel = value;
733 }
734
735 /**
736 * Gets the policy on whether cancelled tasks should be immediately
737 * removed from the work queue at time of cancellation. This value is
738 * by default {@code false}.
739 *
740 * @return {@code true} if cancelled tasks are immediately removed
741 * from the queue
742 * @see #setRemoveOnCancelPolicy
743 * @since 1.7
744 */
745 public boolean getRemoveOnCancelPolicy() {
746 return removeOnCancel;
747 }
748
749 /**
750 * Initiates an orderly shutdown in which previously submitted
751 * tasks are executed, but no new tasks will be accepted.
752 * Invocation has no additional effect if already shut down.
753 *
754 * <p>This method does not wait for previously submitted tasks to
755 * complete execution. Use {@link #awaitTermination awaitTermination}
756 * to do that.
757 *
758 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
759 * has been set {@code false}, existing delayed tasks whose delays
760 * have not yet elapsed are cancelled. And unless the {@code
761 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
762 * {@code true}, future executions of existing periodic tasks will
763 * be cancelled.
764 *
765 * @throws SecurityException {@inheritDoc}
766 */
767 public void shutdown() {
768 super.shutdown();
769 }
770
771 /**
772 * Attempts to stop all actively executing tasks, halts the
773 * processing of waiting tasks, and returns a list of the tasks
774 * that were awaiting execution. These tasks are drained (removed)
775 * from the task queue upon return from this method.
776 *
777 * <p>This method does not wait for actively executing tasks to
778 * terminate. Use {@link #awaitTermination awaitTermination} to
779 * do that.
780 *
781 * <p>There are no guarantees beyond best-effort attempts to stop
782 * processing actively executing tasks. This implementation
783 * interrupts tasks via {@link Thread#interrupt}; any task that
784 * fails to respond to interrupts may never terminate.
785 *
786 * @return list of tasks that never commenced execution.
787 * Each element of this list is a {@link ScheduledFuture}.
788 * For tasks submitted via one of the {@code schedule}
789 * methods, the element will be identical to the returned
790 * {@code ScheduledFuture}. For tasks submitted using
791 * {@link #execute execute}, the element will be a
792 * zero-delay {@code ScheduledFuture}.
793 * @throws SecurityException {@inheritDoc}
794 */
795 public List<Runnable> shutdownNow() {
796 return super.shutdownNow();
797 }
798
799 /**
800 * Returns the task queue used by this executor. Access to the
801 * task queue is intended primarily for debugging and monitoring.
802 * This queue may be in active use. Retrieving the task queue
803 * does not prevent queued tasks from executing.
804 *
805 * <p>Each element of this queue is a {@link ScheduledFuture}.
806 * For tasks submitted via one of the {@code schedule} methods, the
807 * element will be identical to the returned {@code ScheduledFuture}.
808 * For tasks submitted using {@link #execute execute}, the element
809 * will be a zero-delay {@code ScheduledFuture}.
810 *
811 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
812 * tasks in the order in which they will execute.
813 *
814 * @return the task queue
815 */
816 public BlockingQueue<Runnable> getQueue() {
817 return super.getQueue();
818 }
819
820 /**
821 * Specialized delay queue. To mesh with TPE declarations, this
822 * class must be declared as a BlockingQueue<Runnable> even though
823 * it can only hold RunnableScheduledFutures.
824 */
825 static class DelayedWorkQueue extends AbstractQueue<Runnable>
826 implements BlockingQueue<Runnable> {
827
828 /*
829 * A DelayedWorkQueue is based on a heap-based data structure
830 * like those in DelayQueue and PriorityQueue, except that
831 * every ScheduledFutureTask also records its index into the
832 * heap array. This eliminates the need to find a task upon
833 * cancellation, greatly speeding up removal (down from O(n)
834 * to O(log n)), and reducing garbage retention that would
835 * otherwise occur by waiting for the element to rise to top
836 * before clearing. But because the queue may also hold
837 * RunnableScheduledFutures that are not ScheduledFutureTasks,
838 * we are not guaranteed to have such indices available, in
839 * which case we fall back to linear search. (We expect that
840 * most tasks will not be decorated, and that the faster cases
841 * will be much more common.)
842 *
843 * All heap operations must record index changes -- mainly
844 * within siftUp and siftDown. Upon removal, a task's
845 * heapIndex is set to -1. Note that ScheduledFutureTasks can
846 * appear at most once in the queue (this need not be true for
847 * other kinds of tasks or work queues), so are uniquely
848 * identified by heapIndex.
849 */
850
851 private static final int INITIAL_CAPACITY = 16;
852 private RunnableScheduledFuture<?>[] queue =
853 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
854 private final ReentrantLock lock = new ReentrantLock();
855 private int size;
856
857 /**
858 * Thread designated to wait for the task at the head of the
859 * queue. This variant of the Leader-Follower pattern
860 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
861 * minimize unnecessary timed waiting. When a thread becomes
862 * the leader, it waits only for the next delay to elapse, but
863 * other threads await indefinitely. The leader thread must
864 * signal some other thread before returning from take() or
865 * poll(...), unless some other thread becomes leader in the
866 * interim. Whenever the head of the queue is replaced with a
867 * task with an earlier expiration time, the leader field is
868 * invalidated by being reset to null, and some waiting
869 * thread, but not necessarily the current leader, is
870 * signalled. So waiting threads must be prepared to acquire
871 * and lose leadership while waiting.
872 */
873 private Thread leader;
874
875 /**
876 * Condition signalled when a newer task becomes available at the
877 * head of the queue or a new thread may need to become leader.
878 */
879 private final Condition available = lock.newCondition();
880
881 /**
882 * Sets f's heapIndex if it is a ScheduledFutureTask.
883 */
884 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
885 if (f instanceof ScheduledFutureTask)
886 ((ScheduledFutureTask)f).heapIndex = idx;
887 }
888
889 /**
890 * Sifts element added at bottom up to its heap-ordered spot.
891 * Call only when holding lock.
892 */
893 private void siftUp(int k, RunnableScheduledFuture<?> key) {
894 while (k > 0) {
895 int parent = (k - 1) >>> 1;
896 RunnableScheduledFuture<?> e = queue[parent];
897 if (key.compareTo(e) >= 0)
898 break;
899 queue[k] = e;
900 setIndex(e, k);
901 k = parent;
902 }
903 queue[k] = key;
904 setIndex(key, k);
905 }
906
907 /**
908 * Sifts element added at top down to its heap-ordered spot.
909 * Call only when holding lock.
910 */
911 private void siftDown(int k, RunnableScheduledFuture<?> key) {
912 int half = size >>> 1;
913 while (k < half) {
914 int child = (k << 1) + 1;
915 RunnableScheduledFuture<?> c = queue[child];
916 int right = child + 1;
917 if (right < size && c.compareTo(queue[right]) > 0)
918 c = queue[child = right];
919 if (key.compareTo(c) <= 0)
920 break;
921 queue[k] = c;
922 setIndex(c, k);
923 k = child;
924 }
925 queue[k] = key;
926 setIndex(key, k);
927 }
928
929 /**
930 * Resizes the heap array. Call only when holding lock.
931 */
932 private void grow() {
933 int oldCapacity = queue.length;
934 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
935 if (newCapacity < 0) // overflow
936 newCapacity = Integer.MAX_VALUE;
937 queue = Arrays.copyOf(queue, newCapacity);
938 }
939
940 /**
941 * Finds index of given object, or -1 if absent.
942 */
943 private int indexOf(Object x) {
944 if (x != null) {
945 if (x instanceof ScheduledFutureTask) {
946 int i = ((ScheduledFutureTask) x).heapIndex;
947 // Sanity check; x could conceivably be a
948 // ScheduledFutureTask from some other pool.
949 if (i >= 0 && i < size && queue[i] == x)
950 return i;
951 } else {
952 for (int i = 0; i < size; i++)
953 if (x.equals(queue[i]))
954 return i;
955 }
956 }
957 return -1;
958 }
959
960 public boolean contains(Object x) {
961 final ReentrantLock lock = this.lock;
962 lock.lock();
963 try {
964 return indexOf(x) != -1;
965 } finally {
966 lock.unlock();
967 }
968 }
969
970 public boolean remove(Object x) {
971 final ReentrantLock lock = this.lock;
972 lock.lock();
973 try {
974 int i = indexOf(x);
975 if (i < 0)
976 return false;
977
978 setIndex(queue[i], -1);
979 int s = --size;
980 RunnableScheduledFuture<?> replacement = queue[s];
981 queue[s] = null;
982 if (s != i) {
983 siftDown(i, replacement);
984 if (queue[i] == replacement)
985 siftUp(i, replacement);
986 }
987 return true;
988 } finally {
989 lock.unlock();
990 }
991 }
992
993 public int size() {
994 final ReentrantLock lock = this.lock;
995 lock.lock();
996 try {
997 return size;
998 } finally {
999 lock.unlock();
1000 }
1001 }
1002
1003 public boolean isEmpty() {
1004 return size() == 0;
1005 }
1006
1007 public int remainingCapacity() {
1008 return Integer.MAX_VALUE;
1009 }
1010
1011 public RunnableScheduledFuture<?> peek() {
1012 final ReentrantLock lock = this.lock;
1013 lock.lock();
1014 try {
1015 return queue[0];
1016 } finally {
1017 lock.unlock();
1018 }
1019 }
1020
1021 public boolean offer(Runnable x) {
1022 if (x == null)
1023 throw new NullPointerException();
1024 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1025 final ReentrantLock lock = this.lock;
1026 lock.lock();
1027 try {
1028 int i = size;
1029 if (i >= queue.length)
1030 grow();
1031 size = i + 1;
1032 if (i == 0) {
1033 queue[0] = e;
1034 setIndex(e, 0);
1035 } else {
1036 siftUp(i, e);
1037 }
1038 if (queue[0] == e) {
1039 leader = null;
1040 available.signal();
1041 }
1042 } finally {
1043 lock.unlock();
1044 }
1045 return true;
1046 }
1047
1048 public void put(Runnable e) {
1049 offer(e);
1050 }
1051
1052 public boolean add(Runnable e) {
1053 return offer(e);
1054 }
1055
1056 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1057 return offer(e);
1058 }
1059
1060 /**
1061 * Performs common bookkeeping for poll and take: Replaces
1062 * first element with last and sifts it down. Call only when
1063 * holding lock.
1064 * @param f the task to remove and return
1065 */
1066 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1067 int s = --size;
1068 RunnableScheduledFuture<?> x = queue[s];
1069 queue[s] = null;
1070 if (s != 0)
1071 siftDown(0, x);
1072 setIndex(f, -1);
1073 return f;
1074 }
1075
1076 public RunnableScheduledFuture<?> poll() {
1077 final ReentrantLock lock = this.lock;
1078 lock.lock();
1079 try {
1080 RunnableScheduledFuture<?> first = queue[0];
1081 return (first == null || first.getDelay(NANOSECONDS) > 0)
1082 ? null
1083 : finishPoll(first);
1084 } finally {
1085 lock.unlock();
1086 }
1087 }
1088
1089 public RunnableScheduledFuture<?> take() throws InterruptedException {
1090 final ReentrantLock lock = this.lock;
1091 lock.lockInterruptibly();
1092 try {
1093 for (;;) {
1094 RunnableScheduledFuture<?> first = queue[0];
1095 if (first == null)
1096 available.await();
1097 else {
1098 long delay = first.getDelay(NANOSECONDS);
1099 if (delay <= 0L)
1100 return finishPoll(first);
1101 first = null; // don't retain ref while waiting
1102 if (leader != null)
1103 available.await();
1104 else {
1105 Thread thisThread = Thread.currentThread();
1106 leader = thisThread;
1107 try {
1108 available.awaitNanos(delay);
1109 } finally {
1110 if (leader == thisThread)
1111 leader = null;
1112 }
1113 }
1114 }
1115 }
1116 } finally {
1117 if (leader == null && queue[0] != null)
1118 available.signal();
1119 lock.unlock();
1120 }
1121 }
1122
1123 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1124 throws InterruptedException {
1125 long nanos = unit.toNanos(timeout);
1126 final ReentrantLock lock = this.lock;
1127 lock.lockInterruptibly();
1128 try {
1129 for (;;) {
1130 RunnableScheduledFuture<?> first = queue[0];
1131 if (first == null) {
1132 if (nanos <= 0L)
1133 return null;
1134 else
1135 nanos = available.awaitNanos(nanos);
1136 } else {
1137 long delay = first.getDelay(NANOSECONDS);
1138 if (delay <= 0L)
1139 return finishPoll(first);
1140 if (nanos <= 0L)
1141 return null;
1142 first = null; // don't retain ref while waiting
1143 if (nanos < delay || leader != null)
1144 nanos = available.awaitNanos(nanos);
1145 else {
1146 Thread thisThread = Thread.currentThread();
1147 leader = thisThread;
1148 try {
1149 long timeLeft = available.awaitNanos(delay);
1150 nanos -= delay - timeLeft;
1151 } finally {
1152 if (leader == thisThread)
1153 leader = null;
1154 }
1155 }
1156 }
1157 }
1158 } finally {
1159 if (leader == null && queue[0] != null)
1160 available.signal();
1161 lock.unlock();
1162 }
1163 }
1164
1165 public void clear() {
1166 final ReentrantLock lock = this.lock;
1167 lock.lock();
1168 try {
1169 for (int i = 0; i < size; i++) {
1170 RunnableScheduledFuture<?> t = queue[i];
1171 if (t != null) {
1172 queue[i] = null;
1173 setIndex(t, -1);
1174 }
1175 }
1176 size = 0;
1177 } finally {
1178 lock.unlock();
1179 }
1180 }
1181
1182 public int drainTo(Collection<? super Runnable> c) {
1183 return drainTo(c, Integer.MAX_VALUE);
1184 }
1185
1186 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1187 Objects.requireNonNull(c);
1188 if (c == this)
1189 throw new IllegalArgumentException();
1190 if (maxElements <= 0)
1191 return 0;
1192 final ReentrantLock lock = this.lock;
1193 lock.lock();
1194 try {
1195 int n = 0;
1196 for (RunnableScheduledFuture<?> first;
1197 n < maxElements
1198 && (first = queue[0]) != null
1199 && first.getDelay(NANOSECONDS) <= 0;) {
1200 c.add(first); // In this order, in case add() throws.
1201 finishPoll(first);
1202 ++n;
1203 }
1204 return n;
1205 } finally {
1206 lock.unlock();
1207 }
1208 }
1209
1210 public Object[] toArray() {
1211 final ReentrantLock lock = this.lock;
1212 lock.lock();
1213 try {
1214 return Arrays.copyOf(queue, size, Object[].class);
1215 } finally {
1216 lock.unlock();
1217 }
1218 }
1219
1220 @SuppressWarnings("unchecked")
1221 public <T> T[] toArray(T[] a) {
1222 final ReentrantLock lock = this.lock;
1223 lock.lock();
1224 try {
1225 if (a.length < size)
1226 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1227 System.arraycopy(queue, 0, a, 0, size);
1228 if (a.length > size)
1229 a[size] = null;
1230 return a;
1231 } finally {
1232 lock.unlock();
1233 }
1234 }
1235
1236 public Iterator<Runnable> iterator() {
1237 final ReentrantLock lock = this.lock;
1238 lock.lock();
1239 try {
1240 return new Itr(Arrays.copyOf(queue, size));
1241 } finally {
1242 lock.unlock();
1243 }
1244 }
1245
1246 /**
1247 * Snapshot iterator that works off copy of underlying q array.
1248 */
1249 private class Itr implements Iterator<Runnable> {
1250 final RunnableScheduledFuture<?>[] array;
1251 int cursor; // index of next element to return; initially 0
1252 int lastRet = -1; // index of last element returned; -1 if no such
1253
1254 Itr(RunnableScheduledFuture<?>[] array) {
1255 this.array = array;
1256 }
1257
1258 public boolean hasNext() {
1259 return cursor < array.length;
1260 }
1261
1262 public Runnable next() {
1263 if (cursor >= array.length)
1264 throw new NoSuchElementException();
1265 return array[lastRet = cursor++];
1266 }
1267
1268 public void remove() {
1269 if (lastRet < 0)
1270 throw new IllegalStateException();
1271 DelayedWorkQueue.this.remove(array[lastRet]);
1272 lastRet = -1;
1273 }
1274 }
1275 }
1276 }