ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.101
Committed: Wed Nov 30 03:46:37 2016 UTC (7 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.100: +1 -2 lines
Log Message:
use idiom: lastRet = cursor++

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 /**
23 * A {@link ThreadPoolExecutor} that can additionally schedule
24 * commands to run after a given delay, or to execute periodically.
25 * This class is preferable to {@link java.util.Timer} when multiple
26 * worker threads are needed, or when the additional flexibility or
27 * capabilities of {@link ThreadPoolExecutor} (which this class
28 * extends) are required.
29 *
30 * <p>Delayed tasks execute no sooner than they are enabled, but
31 * without any real-time guarantees about when, after they are
32 * enabled, they will commence. Tasks scheduled for exactly the same
33 * execution time are enabled in first-in-first-out (FIFO) order of
34 * submission.
35 *
36 * <p>When a submitted task is cancelled before it is run, execution
37 * is suppressed. By default, such a cancelled task is not
38 * automatically removed from the work queue until its delay elapses.
39 * While this enables further inspection and monitoring, it may also
40 * cause unbounded retention of cancelled tasks. To avoid this, use
41 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42 * removed from the work queue at time of cancellation.
43 *
44 * <p>Successive executions of a periodic task scheduled via
45 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 * do not overlap. While different executions may be performed by
48 * different threads, the effects of prior executions
49 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 * those of subsequent ones.
51 *
52 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 * of the inherited tuning methods are not useful for it. In
54 * particular, because it acts as a fixed-sized pool using
55 * {@code corePoolSize} threads and an unbounded queue, adjustments
56 * to {@code maximumPoolSize} have no useful effect. Additionally, it
57 * is almost never a good idea to set {@code corePoolSize} to zero or
58 * use {@code allowCoreThreadTimeOut} because this may leave the pool
59 * without threads to handle tasks once they become eligible to run.
60 *
61 * <p><b>Extension notes:</b> This class overrides the
62 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 * {@link AbstractExecutorService#submit(Runnable) submit}
64 * methods to generate internal {@link ScheduledFuture} objects to
65 * control per-task delays and scheduling. To preserve
66 * functionality, any further overrides of these methods in
67 * subclasses must invoke superclass versions, which effectively
68 * disables additional task customization. However, this class
69 * provides alternative protected extension method
70 * {@code decorateTask} (one version each for {@code Runnable} and
71 * {@code Callable}) that can be used to customize the concrete task
72 * types used to execute commands entered via {@code execute},
73 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74 * and {@code scheduleWithFixedDelay}. By default, a
75 * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 * {@link FutureTask}. However, this may be modified or replaced using
77 * subclasses of the form:
78 *
79 * <pre> {@code
80 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81 *
82 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 *
84 * protected <V> RunnableScheduledFuture<V> decorateTask(
85 * Runnable r, RunnableScheduledFuture<V> task) {
86 * return new CustomTask<V>(r, task);
87 * }
88 *
89 * protected <V> RunnableScheduledFuture<V> decorateTask(
90 * Callable<V> c, RunnableScheduledFuture<V> task) {
91 * return new CustomTask<V>(c, task);
92 * }
93 * // ... add constructors, etc.
94 * }}</pre>
95 *
96 * @since 1.5
97 * @author Doug Lea
98 */
99 public class ScheduledThreadPoolExecutor
100 extends ThreadPoolExecutor
101 implements ScheduledExecutorService {
102
103 /*
104 * This class specializes ThreadPoolExecutor implementation by
105 *
106 * 1. Using a custom task type ScheduledFutureTask, even for tasks
107 * that don't require scheduling because they are submitted
108 * using ExecutorService rather than ScheduledExecutorService
109 * methods, which are treated as tasks with a delay of zero.
110 *
111 * 2. Using a custom queue (DelayedWorkQueue), a variant of
112 * unbounded DelayQueue. The lack of capacity constraint and
113 * the fact that corePoolSize and maximumPoolSize are
114 * effectively identical simplifies some execution mechanics
115 * (see delayedExecute) compared to ThreadPoolExecutor.
116 *
117 * 3. Supporting optional run-after-shutdown parameters, which
118 * leads to overrides of shutdown methods to remove and cancel
119 * tasks that should NOT be run after shutdown, as well as
120 * different recheck logic when task (re)submission overlaps
121 * with a shutdown.
122 *
123 * 4. Task decoration methods to allow interception and
124 * instrumentation, which are needed because subclasses cannot
125 * otherwise override submit methods to get this effect. These
126 * don't have any impact on pool control logic though.
127 */
128
129 /**
130 * False if should cancel/suppress periodic tasks on shutdown.
131 */
132 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
133
134 /**
135 * False if should cancel non-periodic tasks on shutdown.
136 */
137 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
138
139 /**
140 * True if ScheduledFutureTask.cancel should remove from queue.
141 */
142 volatile boolean removeOnCancel;
143
144 /**
145 * Sequence number to break scheduling ties, and in turn to
146 * guarantee FIFO order among tied entries.
147 */
148 private static final AtomicLong sequencer = new AtomicLong();
149
150 private class ScheduledFutureTask<V>
151 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152
153 /** Sequence number to break ties FIFO */
154 private final long sequenceNumber;
155
156 /** The nanoTime-based time when the task is enabled to execute. */
157 private volatile long time;
158
159 /**
160 * Period for repeating tasks, in nanoseconds.
161 * A positive value indicates fixed-rate execution.
162 * A negative value indicates fixed-delay execution.
163 * A value of 0 indicates a non-repeating (one-shot) task.
164 */
165 private final long period;
166
167 /** The actual task to be re-enqueued by reExecutePeriodic */
168 RunnableScheduledFuture<V> outerTask = this;
169
170 /**
171 * Index into delay queue, to support faster cancellation.
172 */
173 int heapIndex;
174
175 /**
176 * Creates a one-shot action with given nanoTime-based trigger time.
177 */
178 ScheduledFutureTask(Runnable r, V result, long triggerTime,
179 long sequenceNumber) {
180 super(r, result);
181 this.time = triggerTime;
182 this.period = 0;
183 this.sequenceNumber = sequenceNumber;
184 }
185
186 /**
187 * Creates a periodic action with given nanoTime-based initial
188 * trigger time and period.
189 */
190 ScheduledFutureTask(Runnable r, V result, long triggerTime,
191 long period, long sequenceNumber) {
192 super(r, result);
193 this.time = triggerTime;
194 this.period = period;
195 this.sequenceNumber = sequenceNumber;
196 }
197
198 /**
199 * Creates a one-shot action with given nanoTime-based trigger time.
200 */
201 ScheduledFutureTask(Callable<V> callable, long triggerTime,
202 long sequenceNumber) {
203 super(callable);
204 this.time = triggerTime;
205 this.period = 0;
206 this.sequenceNumber = sequenceNumber;
207 }
208
209 public long getDelay(TimeUnit unit) {
210 return unit.convert(time - System.nanoTime(), NANOSECONDS);
211 }
212
213 public int compareTo(Delayed other) {
214 if (other == this) // compare zero if same object
215 return 0;
216 if (other instanceof ScheduledFutureTask) {
217 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
218 long diff = time - x.time;
219 if (diff < 0)
220 return -1;
221 else if (diff > 0)
222 return 1;
223 else if (sequenceNumber < x.sequenceNumber)
224 return -1;
225 else
226 return 1;
227 }
228 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
229 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
230 }
231
232 /**
233 * Returns {@code true} if this is a periodic (not a one-shot) action.
234 *
235 * @return {@code true} if periodic
236 */
237 public boolean isPeriodic() {
238 return period != 0;
239 }
240
241 /**
242 * Sets the next time to run for a periodic task.
243 */
244 private void setNextRunTime() {
245 long p = period;
246 if (p > 0)
247 time += p;
248 else
249 time = triggerTime(-p);
250 }
251
252 public boolean cancel(boolean mayInterruptIfRunning) {
253 // The racy read of heapIndex below is benign:
254 // if heapIndex < 0, then OOTA guarantees that we have surely
255 // been removed; else we recheck under lock in remove()
256 boolean cancelled = super.cancel(mayInterruptIfRunning);
257 if (cancelled && removeOnCancel && heapIndex >= 0)
258 remove(this);
259 return cancelled;
260 }
261
262 /**
263 * Overrides FutureTask version so as to reset/requeue if periodic.
264 */
265 public void run() {
266 boolean periodic = isPeriodic();
267 if (!canRunInCurrentRunState(periodic))
268 cancel(false);
269 else if (!periodic)
270 super.run();
271 else if (super.runAndReset()) {
272 setNextRunTime();
273 reExecutePeriodic(outerTask);
274 }
275 }
276 }
277
278 /**
279 * Returns true if can run a task given current run state
280 * and run-after-shutdown parameters.
281 *
282 * @param periodic true if this task periodic, false if delayed
283 */
284 boolean canRunInCurrentRunState(boolean periodic) {
285 return isRunningOrShutdown(periodic ?
286 continueExistingPeriodicTasksAfterShutdown :
287 executeExistingDelayedTasksAfterShutdown);
288 }
289
290 /**
291 * Main execution method for delayed or periodic tasks. If pool
292 * is shut down, rejects the task. Otherwise adds task to queue
293 * and starts a thread, if necessary, to run it. (We cannot
294 * prestart the thread to run the task because the task (probably)
295 * shouldn't be run yet.) If the pool is shut down while the task
296 * is being added, cancel and remove it if required by state and
297 * run-after-shutdown parameters.
298 *
299 * @param task the task
300 */
301 private void delayedExecute(RunnableScheduledFuture<?> task) {
302 if (isShutdown())
303 reject(task);
304 else {
305 super.getQueue().add(task);
306 if (isShutdown() &&
307 !canRunInCurrentRunState(task.isPeriodic()) &&
308 remove(task))
309 task.cancel(false);
310 else
311 ensurePrestart();
312 }
313 }
314
315 /**
316 * Requeues a periodic task unless current run state precludes it.
317 * Same idea as delayedExecute except drops task rather than rejecting.
318 *
319 * @param task the task
320 */
321 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
322 if (canRunInCurrentRunState(true)) {
323 super.getQueue().add(task);
324 if (!canRunInCurrentRunState(true) && remove(task))
325 task.cancel(false);
326 else
327 ensurePrestart();
328 }
329 }
330
331 /**
332 * Cancels and clears the queue of all tasks that should not be run
333 * due to shutdown policy. Invoked within super.shutdown.
334 */
335 @Override void onShutdown() {
336 BlockingQueue<Runnable> q = super.getQueue();
337 boolean keepDelayed =
338 getExecuteExistingDelayedTasksAfterShutdownPolicy();
339 boolean keepPeriodic =
340 getContinueExistingPeriodicTasksAfterShutdownPolicy();
341 if (!keepDelayed && !keepPeriodic) {
342 for (Object e : q.toArray())
343 if (e instanceof RunnableScheduledFuture<?>)
344 ((RunnableScheduledFuture<?>) e).cancel(false);
345 q.clear();
346 }
347 else {
348 // Traverse snapshot to avoid iterator exceptions
349 for (Object e : q.toArray()) {
350 if (e instanceof RunnableScheduledFuture) {
351 RunnableScheduledFuture<?> t =
352 (RunnableScheduledFuture<?>)e;
353 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
354 t.isCancelled()) { // also remove if already cancelled
355 if (q.remove(t))
356 t.cancel(false);
357 }
358 }
359 }
360 }
361 tryTerminate();
362 }
363
364 /**
365 * Modifies or replaces the task used to execute a runnable.
366 * This method can be used to override the concrete
367 * class used for managing internal tasks.
368 * The default implementation simply returns the given task.
369 *
370 * @param runnable the submitted Runnable
371 * @param task the task created to execute the runnable
372 * @param <V> the type of the task's result
373 * @return a task that can execute the runnable
374 * @since 1.6
375 */
376 protected <V> RunnableScheduledFuture<V> decorateTask(
377 Runnable runnable, RunnableScheduledFuture<V> task) {
378 return task;
379 }
380
381 /**
382 * Modifies or replaces the task used to execute a callable.
383 * This method can be used to override the concrete
384 * class used for managing internal tasks.
385 * The default implementation simply returns the given task.
386 *
387 * @param callable the submitted Callable
388 * @param task the task created to execute the callable
389 * @param <V> the type of the task's result
390 * @return a task that can execute the callable
391 * @since 1.6
392 */
393 protected <V> RunnableScheduledFuture<V> decorateTask(
394 Callable<V> callable, RunnableScheduledFuture<V> task) {
395 return task;
396 }
397
398 /**
399 * The default keep-alive time for pool threads.
400 *
401 * Normally, this value is unused because all pool threads will be
402 * core threads, but if a user creates a pool with a corePoolSize
403 * of zero (against our advice), we keep a thread alive as long as
404 * there are queued tasks. If the keep alive time is zero (the
405 * historic value), we end up hot-spinning in getTask, wasting a
406 * CPU. But on the other hand, if we set the value too high, and
407 * users create a one-shot pool which they don't cleanly shutdown,
408 * the pool's non-daemon threads will prevent JVM termination. A
409 * small but non-zero value (relative to a JVM's lifetime) seems
410 * best.
411 */
412 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
413
414 /**
415 * Creates a new {@code ScheduledThreadPoolExecutor} with the
416 * given core pool size.
417 *
418 * @param corePoolSize the number of threads to keep in the pool, even
419 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
420 * @throws IllegalArgumentException if {@code corePoolSize < 0}
421 */
422 public ScheduledThreadPoolExecutor(int corePoolSize) {
423 super(corePoolSize, Integer.MAX_VALUE,
424 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
425 new DelayedWorkQueue());
426 }
427
428 /**
429 * Creates a new {@code ScheduledThreadPoolExecutor} with the
430 * given initial parameters.
431 *
432 * @param corePoolSize the number of threads to keep in the pool, even
433 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
434 * @param threadFactory the factory to use when the executor
435 * creates a new thread
436 * @throws IllegalArgumentException if {@code corePoolSize < 0}
437 * @throws NullPointerException if {@code threadFactory} is null
438 */
439 public ScheduledThreadPoolExecutor(int corePoolSize,
440 ThreadFactory threadFactory) {
441 super(corePoolSize, Integer.MAX_VALUE,
442 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
443 new DelayedWorkQueue(), threadFactory);
444 }
445
446 /**
447 * Creates a new {@code ScheduledThreadPoolExecutor} with the
448 * given initial parameters.
449 *
450 * @param corePoolSize the number of threads to keep in the pool, even
451 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
452 * @param handler the handler to use when execution is blocked
453 * because the thread bounds and queue capacities are reached
454 * @throws IllegalArgumentException if {@code corePoolSize < 0}
455 * @throws NullPointerException if {@code handler} is null
456 */
457 public ScheduledThreadPoolExecutor(int corePoolSize,
458 RejectedExecutionHandler handler) {
459 super(corePoolSize, Integer.MAX_VALUE,
460 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
461 new DelayedWorkQueue(), handler);
462 }
463
464 /**
465 * Creates a new {@code ScheduledThreadPoolExecutor} with the
466 * given initial parameters.
467 *
468 * @param corePoolSize the number of threads to keep in the pool, even
469 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
470 * @param threadFactory the factory to use when the executor
471 * creates a new thread
472 * @param handler the handler to use when execution is blocked
473 * because the thread bounds and queue capacities are reached
474 * @throws IllegalArgumentException if {@code corePoolSize < 0}
475 * @throws NullPointerException if {@code threadFactory} or
476 * {@code handler} is null
477 */
478 public ScheduledThreadPoolExecutor(int corePoolSize,
479 ThreadFactory threadFactory,
480 RejectedExecutionHandler handler) {
481 super(corePoolSize, Integer.MAX_VALUE,
482 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
483 new DelayedWorkQueue(), threadFactory, handler);
484 }
485
486 /**
487 * Returns the nanoTime-based trigger time of a delayed action.
488 */
489 private long triggerTime(long delay, TimeUnit unit) {
490 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
491 }
492
493 /**
494 * Returns the nanoTime-based trigger time of a delayed action.
495 */
496 long triggerTime(long delay) {
497 return System.nanoTime() +
498 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
499 }
500
501 /**
502 * Constrains the values of all delays in the queue to be within
503 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
504 * This may occur if a task is eligible to be dequeued, but has
505 * not yet been, while some other task is added with a delay of
506 * Long.MAX_VALUE.
507 */
508 private long overflowFree(long delay) {
509 Delayed head = (Delayed) super.getQueue().peek();
510 if (head != null) {
511 long headDelay = head.getDelay(NANOSECONDS);
512 if (headDelay < 0 && (delay - headDelay < 0))
513 delay = Long.MAX_VALUE + headDelay;
514 }
515 return delay;
516 }
517
518 /**
519 * @throws RejectedExecutionException {@inheritDoc}
520 * @throws NullPointerException {@inheritDoc}
521 */
522 public ScheduledFuture<?> schedule(Runnable command,
523 long delay,
524 TimeUnit unit) {
525 if (command == null || unit == null)
526 throw new NullPointerException();
527 RunnableScheduledFuture<Void> t = decorateTask(command,
528 new ScheduledFutureTask<Void>(command, null,
529 triggerTime(delay, unit),
530 sequencer.getAndIncrement()));
531 delayedExecute(t);
532 return t;
533 }
534
535 /**
536 * @throws RejectedExecutionException {@inheritDoc}
537 * @throws NullPointerException {@inheritDoc}
538 */
539 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
540 long delay,
541 TimeUnit unit) {
542 if (callable == null || unit == null)
543 throw new NullPointerException();
544 RunnableScheduledFuture<V> t = decorateTask(callable,
545 new ScheduledFutureTask<V>(callable,
546 triggerTime(delay, unit),
547 sequencer.getAndIncrement()));
548 delayedExecute(t);
549 return t;
550 }
551
552 /**
553 * @throws RejectedExecutionException {@inheritDoc}
554 * @throws NullPointerException {@inheritDoc}
555 * @throws IllegalArgumentException {@inheritDoc}
556 */
557 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
558 long initialDelay,
559 long period,
560 TimeUnit unit) {
561 if (command == null || unit == null)
562 throw new NullPointerException();
563 if (period <= 0L)
564 throw new IllegalArgumentException();
565 ScheduledFutureTask<Void> sft =
566 new ScheduledFutureTask<Void>(command,
567 null,
568 triggerTime(initialDelay, unit),
569 unit.toNanos(period),
570 sequencer.getAndIncrement());
571 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
572 sft.outerTask = t;
573 delayedExecute(t);
574 return t;
575 }
576
577 /**
578 * @throws RejectedExecutionException {@inheritDoc}
579 * @throws NullPointerException {@inheritDoc}
580 * @throws IllegalArgumentException {@inheritDoc}
581 */
582 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
583 long initialDelay,
584 long delay,
585 TimeUnit unit) {
586 if (command == null || unit == null)
587 throw new NullPointerException();
588 if (delay <= 0L)
589 throw new IllegalArgumentException();
590 ScheduledFutureTask<Void> sft =
591 new ScheduledFutureTask<Void>(command,
592 null,
593 triggerTime(initialDelay, unit),
594 -unit.toNanos(delay),
595 sequencer.getAndIncrement());
596 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
597 sft.outerTask = t;
598 delayedExecute(t);
599 return t;
600 }
601
602 /**
603 * Executes {@code command} with zero required delay.
604 * This has effect equivalent to
605 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
606 * Note that inspections of the queue and of the list returned by
607 * {@code shutdownNow} will access the zero-delayed
608 * {@link ScheduledFuture}, not the {@code command} itself.
609 *
610 * <p>A consequence of the use of {@code ScheduledFuture} objects is
611 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
612 * called with a null second {@code Throwable} argument, even if the
613 * {@code command} terminated abruptly. Instead, the {@code Throwable}
614 * thrown by such a task can be obtained via {@link Future#get}.
615 *
616 * @throws RejectedExecutionException at discretion of
617 * {@code RejectedExecutionHandler}, if the task
618 * cannot be accepted for execution because the
619 * executor has been shut down
620 * @throws NullPointerException {@inheritDoc}
621 */
622 public void execute(Runnable command) {
623 schedule(command, 0, NANOSECONDS);
624 }
625
626 // Override AbstractExecutorService methods
627
628 /**
629 * @throws RejectedExecutionException {@inheritDoc}
630 * @throws NullPointerException {@inheritDoc}
631 */
632 public Future<?> submit(Runnable task) {
633 return schedule(task, 0, NANOSECONDS);
634 }
635
636 /**
637 * @throws RejectedExecutionException {@inheritDoc}
638 * @throws NullPointerException {@inheritDoc}
639 */
640 public <T> Future<T> submit(Runnable task, T result) {
641 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
642 }
643
644 /**
645 * @throws RejectedExecutionException {@inheritDoc}
646 * @throws NullPointerException {@inheritDoc}
647 */
648 public <T> Future<T> submit(Callable<T> task) {
649 return schedule(task, 0, NANOSECONDS);
650 }
651
652 /**
653 * Sets the policy on whether to continue executing existing
654 * periodic tasks even when this executor has been {@code shutdown}.
655 * In this case, these tasks will only terminate upon
656 * {@code shutdownNow} or after setting the policy to
657 * {@code false} when already shutdown.
658 * This value is by default {@code false}.
659 *
660 * @param value if {@code true}, continue after shutdown, else don't
661 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
662 */
663 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
664 continueExistingPeriodicTasksAfterShutdown = value;
665 if (!value && isShutdown())
666 onShutdown();
667 }
668
669 /**
670 * Gets the policy on whether to continue executing existing
671 * periodic tasks even when this executor has been {@code shutdown}.
672 * In this case, these tasks will only terminate upon
673 * {@code shutdownNow} or after setting the policy to
674 * {@code false} when already shutdown.
675 * This value is by default {@code false}.
676 *
677 * @return {@code true} if will continue after shutdown
678 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
679 */
680 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
681 return continueExistingPeriodicTasksAfterShutdown;
682 }
683
684 /**
685 * Sets the policy on whether to execute existing delayed
686 * tasks even when this executor has been {@code shutdown}.
687 * In this case, these tasks will only terminate upon
688 * {@code shutdownNow}, or after setting the policy to
689 * {@code false} when already shutdown.
690 * This value is by default {@code true}.
691 *
692 * @param value if {@code true}, execute after shutdown, else don't
693 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
694 */
695 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
696 executeExistingDelayedTasksAfterShutdown = value;
697 if (!value && isShutdown())
698 onShutdown();
699 }
700
701 /**
702 * Gets the policy on whether to execute existing delayed
703 * tasks even when this executor has been {@code shutdown}.
704 * In this case, these tasks will only terminate upon
705 * {@code shutdownNow}, or after setting the policy to
706 * {@code false} when already shutdown.
707 * This value is by default {@code true}.
708 *
709 * @return {@code true} if will execute after shutdown
710 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
711 */
712 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
713 return executeExistingDelayedTasksAfterShutdown;
714 }
715
716 /**
717 * Sets the policy on whether cancelled tasks should be immediately
718 * removed from the work queue at time of cancellation. This value is
719 * by default {@code false}.
720 *
721 * @param value if {@code true}, remove on cancellation, else don't
722 * @see #getRemoveOnCancelPolicy
723 * @since 1.7
724 */
725 public void setRemoveOnCancelPolicy(boolean value) {
726 removeOnCancel = value;
727 }
728
729 /**
730 * Gets the policy on whether cancelled tasks should be immediately
731 * removed from the work queue at time of cancellation. This value is
732 * by default {@code false}.
733 *
734 * @return {@code true} if cancelled tasks are immediately removed
735 * from the queue
736 * @see #setRemoveOnCancelPolicy
737 * @since 1.7
738 */
739 public boolean getRemoveOnCancelPolicy() {
740 return removeOnCancel;
741 }
742
743 /**
744 * Initiates an orderly shutdown in which previously submitted
745 * tasks are executed, but no new tasks will be accepted.
746 * Invocation has no additional effect if already shut down.
747 *
748 * <p>This method does not wait for previously submitted tasks to
749 * complete execution. Use {@link #awaitTermination awaitTermination}
750 * to do that.
751 *
752 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
753 * has been set {@code false}, existing delayed tasks whose delays
754 * have not yet elapsed are cancelled. And unless the {@code
755 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
756 * {@code true}, future executions of existing periodic tasks will
757 * be cancelled.
758 *
759 * @throws SecurityException {@inheritDoc}
760 */
761 public void shutdown() {
762 super.shutdown();
763 }
764
765 /**
766 * Attempts to stop all actively executing tasks, halts the
767 * processing of waiting tasks, and returns a list of the tasks
768 * that were awaiting execution. These tasks are drained (removed)
769 * from the task queue upon return from this method.
770 *
771 * <p>This method does not wait for actively executing tasks to
772 * terminate. Use {@link #awaitTermination awaitTermination} to
773 * do that.
774 *
775 * <p>There are no guarantees beyond best-effort attempts to stop
776 * processing actively executing tasks. This implementation
777 * interrupts tasks via {@link Thread#interrupt}; any task that
778 * fails to respond to interrupts may never terminate.
779 *
780 * @return list of tasks that never commenced execution.
781 * Each element of this list is a {@link ScheduledFuture}.
782 * For tasks submitted via one of the {@code schedule}
783 * methods, the element will be identical to the returned
784 * {@code ScheduledFuture}. For tasks submitted using
785 * {@link #execute execute}, the element will be a
786 * zero-delay {@code ScheduledFuture}.
787 * @throws SecurityException {@inheritDoc}
788 */
789 public List<Runnable> shutdownNow() {
790 return super.shutdownNow();
791 }
792
793 /**
794 * Returns the task queue used by this executor. Access to the
795 * task queue is intended primarily for debugging and monitoring.
796 * This queue may be in active use. Retrieving the task queue
797 * does not prevent queued tasks from executing.
798 *
799 * <p>Each element of this queue is a {@link ScheduledFuture}.
800 * For tasks submitted via one of the {@code schedule} methods, the
801 * element will be identical to the returned {@code ScheduledFuture}.
802 * For tasks submitted using {@link #execute execute}, the element
803 * will be a zero-delay {@code ScheduledFuture}.
804 *
805 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
806 * tasks in the order in which they will execute.
807 *
808 * @return the task queue
809 */
810 public BlockingQueue<Runnable> getQueue() {
811 return super.getQueue();
812 }
813
814 /**
815 * Specialized delay queue. To mesh with TPE declarations, this
816 * class must be declared as a BlockingQueue<Runnable> even though
817 * it can only hold RunnableScheduledFutures.
818 */
819 static class DelayedWorkQueue extends AbstractQueue<Runnable>
820 implements BlockingQueue<Runnable> {
821
822 /*
823 * A DelayedWorkQueue is based on a heap-based data structure
824 * like those in DelayQueue and PriorityQueue, except that
825 * every ScheduledFutureTask also records its index into the
826 * heap array. This eliminates the need to find a task upon
827 * cancellation, greatly speeding up removal (down from O(n)
828 * to O(log n)), and reducing garbage retention that would
829 * otherwise occur by waiting for the element to rise to top
830 * before clearing. But because the queue may also hold
831 * RunnableScheduledFutures that are not ScheduledFutureTasks,
832 * we are not guaranteed to have such indices available, in
833 * which case we fall back to linear search. (We expect that
834 * most tasks will not be decorated, and that the faster cases
835 * will be much more common.)
836 *
837 * All heap operations must record index changes -- mainly
838 * within siftUp and siftDown. Upon removal, a task's
839 * heapIndex is set to -1. Note that ScheduledFutureTasks can
840 * appear at most once in the queue (this need not be true for
841 * other kinds of tasks or work queues), so are uniquely
842 * identified by heapIndex.
843 */
844
845 private static final int INITIAL_CAPACITY = 16;
846 private RunnableScheduledFuture<?>[] queue =
847 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
848 private final ReentrantLock lock = new ReentrantLock();
849 private int size;
850
851 /**
852 * Thread designated to wait for the task at the head of the
853 * queue. This variant of the Leader-Follower pattern
854 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
855 * minimize unnecessary timed waiting. When a thread becomes
856 * the leader, it waits only for the next delay to elapse, but
857 * other threads await indefinitely. The leader thread must
858 * signal some other thread before returning from take() or
859 * poll(...), unless some other thread becomes leader in the
860 * interim. Whenever the head of the queue is replaced with a
861 * task with an earlier expiration time, the leader field is
862 * invalidated by being reset to null, and some waiting
863 * thread, but not necessarily the current leader, is
864 * signalled. So waiting threads must be prepared to acquire
865 * and lose leadership while waiting.
866 */
867 private Thread leader;
868
869 /**
870 * Condition signalled when a newer task becomes available at the
871 * head of the queue or a new thread may need to become leader.
872 */
873 private final Condition available = lock.newCondition();
874
875 /**
876 * Sets f's heapIndex if it is a ScheduledFutureTask.
877 */
878 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
879 if (f instanceof ScheduledFutureTask)
880 ((ScheduledFutureTask)f).heapIndex = idx;
881 }
882
883 /**
884 * Sifts element added at bottom up to its heap-ordered spot.
885 * Call only when holding lock.
886 */
887 private void siftUp(int k, RunnableScheduledFuture<?> key) {
888 while (k > 0) {
889 int parent = (k - 1) >>> 1;
890 RunnableScheduledFuture<?> e = queue[parent];
891 if (key.compareTo(e) >= 0)
892 break;
893 queue[k] = e;
894 setIndex(e, k);
895 k = parent;
896 }
897 queue[k] = key;
898 setIndex(key, k);
899 }
900
901 /**
902 * Sifts element added at top down to its heap-ordered spot.
903 * Call only when holding lock.
904 */
905 private void siftDown(int k, RunnableScheduledFuture<?> key) {
906 int half = size >>> 1;
907 while (k < half) {
908 int child = (k << 1) + 1;
909 RunnableScheduledFuture<?> c = queue[child];
910 int right = child + 1;
911 if (right < size && c.compareTo(queue[right]) > 0)
912 c = queue[child = right];
913 if (key.compareTo(c) <= 0)
914 break;
915 queue[k] = c;
916 setIndex(c, k);
917 k = child;
918 }
919 queue[k] = key;
920 setIndex(key, k);
921 }
922
923 /**
924 * Resizes the heap array. Call only when holding lock.
925 */
926 private void grow() {
927 int oldCapacity = queue.length;
928 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
929 if (newCapacity < 0) // overflow
930 newCapacity = Integer.MAX_VALUE;
931 queue = Arrays.copyOf(queue, newCapacity);
932 }
933
934 /**
935 * Finds index of given object, or -1 if absent.
936 */
937 private int indexOf(Object x) {
938 if (x != null) {
939 if (x instanceof ScheduledFutureTask) {
940 int i = ((ScheduledFutureTask) x).heapIndex;
941 // Sanity check; x could conceivably be a
942 // ScheduledFutureTask from some other pool.
943 if (i >= 0 && i < size && queue[i] == x)
944 return i;
945 } else {
946 for (int i = 0; i < size; i++)
947 if (x.equals(queue[i]))
948 return i;
949 }
950 }
951 return -1;
952 }
953
954 public boolean contains(Object x) {
955 final ReentrantLock lock = this.lock;
956 lock.lock();
957 try {
958 return indexOf(x) != -1;
959 } finally {
960 lock.unlock();
961 }
962 }
963
964 public boolean remove(Object x) {
965 final ReentrantLock lock = this.lock;
966 lock.lock();
967 try {
968 int i = indexOf(x);
969 if (i < 0)
970 return false;
971
972 setIndex(queue[i], -1);
973 int s = --size;
974 RunnableScheduledFuture<?> replacement = queue[s];
975 queue[s] = null;
976 if (s != i) {
977 siftDown(i, replacement);
978 if (queue[i] == replacement)
979 siftUp(i, replacement);
980 }
981 return true;
982 } finally {
983 lock.unlock();
984 }
985 }
986
987 public int size() {
988 final ReentrantLock lock = this.lock;
989 lock.lock();
990 try {
991 return size;
992 } finally {
993 lock.unlock();
994 }
995 }
996
997 public boolean isEmpty() {
998 return size() == 0;
999 }
1000
1001 public int remainingCapacity() {
1002 return Integer.MAX_VALUE;
1003 }
1004
1005 public RunnableScheduledFuture<?> peek() {
1006 final ReentrantLock lock = this.lock;
1007 lock.lock();
1008 try {
1009 return queue[0];
1010 } finally {
1011 lock.unlock();
1012 }
1013 }
1014
1015 public boolean offer(Runnable x) {
1016 if (x == null)
1017 throw new NullPointerException();
1018 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1019 final ReentrantLock lock = this.lock;
1020 lock.lock();
1021 try {
1022 int i = size;
1023 if (i >= queue.length)
1024 grow();
1025 size = i + 1;
1026 if (i == 0) {
1027 queue[0] = e;
1028 setIndex(e, 0);
1029 } else {
1030 siftUp(i, e);
1031 }
1032 if (queue[0] == e) {
1033 leader = null;
1034 available.signal();
1035 }
1036 } finally {
1037 lock.unlock();
1038 }
1039 return true;
1040 }
1041
1042 public void put(Runnable e) {
1043 offer(e);
1044 }
1045
1046 public boolean add(Runnable e) {
1047 return offer(e);
1048 }
1049
1050 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1051 return offer(e);
1052 }
1053
1054 /**
1055 * Performs common bookkeeping for poll and take: Replaces
1056 * first element with last and sifts it down. Call only when
1057 * holding lock.
1058 * @param f the task to remove and return
1059 */
1060 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1061 int s = --size;
1062 RunnableScheduledFuture<?> x = queue[s];
1063 queue[s] = null;
1064 if (s != 0)
1065 siftDown(0, x);
1066 setIndex(f, -1);
1067 return f;
1068 }
1069
1070 public RunnableScheduledFuture<?> poll() {
1071 final ReentrantLock lock = this.lock;
1072 lock.lock();
1073 try {
1074 RunnableScheduledFuture<?> first = queue[0];
1075 return (first == null || first.getDelay(NANOSECONDS) > 0)
1076 ? null
1077 : finishPoll(first);
1078 } finally {
1079 lock.unlock();
1080 }
1081 }
1082
1083 public RunnableScheduledFuture<?> take() throws InterruptedException {
1084 final ReentrantLock lock = this.lock;
1085 lock.lockInterruptibly();
1086 try {
1087 for (;;) {
1088 RunnableScheduledFuture<?> first = queue[0];
1089 if (first == null)
1090 available.await();
1091 else {
1092 long delay = first.getDelay(NANOSECONDS);
1093 if (delay <= 0L)
1094 return finishPoll(first);
1095 first = null; // don't retain ref while waiting
1096 if (leader != null)
1097 available.await();
1098 else {
1099 Thread thisThread = Thread.currentThread();
1100 leader = thisThread;
1101 try {
1102 available.awaitNanos(delay);
1103 } finally {
1104 if (leader == thisThread)
1105 leader = null;
1106 }
1107 }
1108 }
1109 }
1110 } finally {
1111 if (leader == null && queue[0] != null)
1112 available.signal();
1113 lock.unlock();
1114 }
1115 }
1116
1117 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1118 throws InterruptedException {
1119 long nanos = unit.toNanos(timeout);
1120 final ReentrantLock lock = this.lock;
1121 lock.lockInterruptibly();
1122 try {
1123 for (;;) {
1124 RunnableScheduledFuture<?> first = queue[0];
1125 if (first == null) {
1126 if (nanos <= 0L)
1127 return null;
1128 else
1129 nanos = available.awaitNanos(nanos);
1130 } else {
1131 long delay = first.getDelay(NANOSECONDS);
1132 if (delay <= 0L)
1133 return finishPoll(first);
1134 if (nanos <= 0L)
1135 return null;
1136 first = null; // don't retain ref while waiting
1137 if (nanos < delay || leader != null)
1138 nanos = available.awaitNanos(nanos);
1139 else {
1140 Thread thisThread = Thread.currentThread();
1141 leader = thisThread;
1142 try {
1143 long timeLeft = available.awaitNanos(delay);
1144 nanos -= delay - timeLeft;
1145 } finally {
1146 if (leader == thisThread)
1147 leader = null;
1148 }
1149 }
1150 }
1151 }
1152 } finally {
1153 if (leader == null && queue[0] != null)
1154 available.signal();
1155 lock.unlock();
1156 }
1157 }
1158
1159 public void clear() {
1160 final ReentrantLock lock = this.lock;
1161 lock.lock();
1162 try {
1163 for (int i = 0; i < size; i++) {
1164 RunnableScheduledFuture<?> t = queue[i];
1165 if (t != null) {
1166 queue[i] = null;
1167 setIndex(t, -1);
1168 }
1169 }
1170 size = 0;
1171 } finally {
1172 lock.unlock();
1173 }
1174 }
1175
1176 /**
1177 * Returns first element only if it is expired.
1178 * Used only by drainTo. Call only when holding lock.
1179 */
1180 private RunnableScheduledFuture<?> peekExpired() {
1181 // assert lock.isHeldByCurrentThread();
1182 RunnableScheduledFuture<?> first = queue[0];
1183 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1184 null : first;
1185 }
1186
1187 public int drainTo(Collection<? super Runnable> c) {
1188 if (c == null)
1189 throw new NullPointerException();
1190 if (c == this)
1191 throw new IllegalArgumentException();
1192 final ReentrantLock lock = this.lock;
1193 lock.lock();
1194 try {
1195 RunnableScheduledFuture<?> first;
1196 int n = 0;
1197 while ((first = peekExpired()) != null) {
1198 c.add(first); // In this order, in case add() throws.
1199 finishPoll(first);
1200 ++n;
1201 }
1202 return n;
1203 } finally {
1204 lock.unlock();
1205 }
1206 }
1207
1208 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1209 if (c == null)
1210 throw new NullPointerException();
1211 if (c == this)
1212 throw new IllegalArgumentException();
1213 if (maxElements <= 0)
1214 return 0;
1215 final ReentrantLock lock = this.lock;
1216 lock.lock();
1217 try {
1218 RunnableScheduledFuture<?> first;
1219 int n = 0;
1220 while (n < maxElements && (first = peekExpired()) != null) {
1221 c.add(first); // In this order, in case add() throws.
1222 finishPoll(first);
1223 ++n;
1224 }
1225 return n;
1226 } finally {
1227 lock.unlock();
1228 }
1229 }
1230
1231 public Object[] toArray() {
1232 final ReentrantLock lock = this.lock;
1233 lock.lock();
1234 try {
1235 return Arrays.copyOf(queue, size, Object[].class);
1236 } finally {
1237 lock.unlock();
1238 }
1239 }
1240
1241 @SuppressWarnings("unchecked")
1242 public <T> T[] toArray(T[] a) {
1243 final ReentrantLock lock = this.lock;
1244 lock.lock();
1245 try {
1246 if (a.length < size)
1247 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1248 System.arraycopy(queue, 0, a, 0, size);
1249 if (a.length > size)
1250 a[size] = null;
1251 return a;
1252 } finally {
1253 lock.unlock();
1254 }
1255 }
1256
1257 public Iterator<Runnable> iterator() {
1258 return new Itr(Arrays.copyOf(queue, size));
1259 }
1260
1261 /**
1262 * Snapshot iterator that works off copy of underlying q array.
1263 */
1264 private class Itr implements Iterator<Runnable> {
1265 final RunnableScheduledFuture<?>[] array;
1266 int cursor; // index of next element to return; initially 0
1267 int lastRet = -1; // index of last element returned; -1 if no such
1268
1269 Itr(RunnableScheduledFuture<?>[] array) {
1270 this.array = array;
1271 }
1272
1273 public boolean hasNext() {
1274 return cursor < array.length;
1275 }
1276
1277 public Runnable next() {
1278 if (cursor >= array.length)
1279 throw new NoSuchElementException();
1280 return array[lastRet = cursor++];
1281 }
1282
1283 public void remove() {
1284 if (lastRet < 0)
1285 throw new IllegalStateException();
1286 DelayedWorkQueue.this.remove(array[lastRet]);
1287 lastRet = -1;
1288 }
1289 }
1290 }
1291 }