ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.105
Committed: Tue Mar 28 00:02:18 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.104: +7 -1 lines
Log Message:
DelayedWorkQueue.iterator must hold lock

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 /**
23 * A {@link ThreadPoolExecutor} that can additionally schedule
24 * commands to run after a given delay, or to execute periodically.
25 * This class is preferable to {@link java.util.Timer} when multiple
26 * worker threads are needed, or when the additional flexibility or
27 * capabilities of {@link ThreadPoolExecutor} (which this class
28 * extends) are required.
29 *
30 * <p>Delayed tasks execute no sooner than they are enabled, but
31 * without any real-time guarantees about when, after they are
32 * enabled, they will commence. Tasks scheduled for exactly the same
33 * execution time are enabled in first-in-first-out (FIFO) order of
34 * submission.
35 *
36 * <p>When a submitted task is cancelled before it is run, execution
37 * is suppressed. By default, such a cancelled task is not
38 * automatically removed from the work queue until its delay elapses.
39 * While this enables further inspection and monitoring, it may also
40 * cause unbounded retention of cancelled tasks. To avoid this, use
41 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42 * removed from the work queue at time of cancellation.
43 *
44 * <p>Successive executions of a periodic task scheduled via
45 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 * do not overlap. While different executions may be performed by
48 * different threads, the effects of prior executions
49 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 * those of subsequent ones.
51 *
52 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 * of the inherited tuning methods are not useful for it. In
54 * particular, because it acts as a fixed-sized pool using
55 * {@code corePoolSize} threads and an unbounded queue, adjustments
56 * to {@code maximumPoolSize} have no useful effect. Additionally, it
57 * is almost never a good idea to set {@code corePoolSize} to zero or
58 * use {@code allowCoreThreadTimeOut} because this may leave the pool
59 * without threads to handle tasks once they become eligible to run.
60 *
61 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
62 * this class uses {@link Executors#defaultThreadFactory} as the
63 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
64 * as the default rejected execution handler.
65 *
66 * <p><b>Extension notes:</b> This class overrides the
67 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
68 * {@link AbstractExecutorService#submit(Runnable) submit}
69 * methods to generate internal {@link ScheduledFuture} objects to
70 * control per-task delays and scheduling. To preserve
71 * functionality, any further overrides of these methods in
72 * subclasses must invoke superclass versions, which effectively
73 * disables additional task customization. However, this class
74 * provides alternative protected extension method
75 * {@code decorateTask} (one version each for {@code Runnable} and
76 * {@code Callable}) that can be used to customize the concrete task
77 * types used to execute commands entered via {@code execute},
78 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
79 * and {@code scheduleWithFixedDelay}. By default, a
80 * {@code ScheduledThreadPoolExecutor} uses a task type extending
81 * {@link FutureTask}. However, this may be modified or replaced using
82 * subclasses of the form:
83 *
84 * <pre> {@code
85 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
86 *
87 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
88 *
89 * protected <V> RunnableScheduledFuture<V> decorateTask(
90 * Runnable r, RunnableScheduledFuture<V> task) {
91 * return new CustomTask<V>(r, task);
92 * }
93 *
94 * protected <V> RunnableScheduledFuture<V> decorateTask(
95 * Callable<V> c, RunnableScheduledFuture<V> task) {
96 * return new CustomTask<V>(c, task);
97 * }
98 * // ... add constructors, etc.
99 * }}</pre>
100 *
101 * @since 1.5
102 * @author Doug Lea
103 */
104 public class ScheduledThreadPoolExecutor
105 extends ThreadPoolExecutor
106 implements ScheduledExecutorService {
107
108 /*
109 * This class specializes ThreadPoolExecutor implementation by
110 *
111 * 1. Using a custom task type ScheduledFutureTask, even for tasks
112 * that don't require scheduling because they are submitted
113 * using ExecutorService rather than ScheduledExecutorService
114 * methods, which are treated as tasks with a delay of zero.
115 *
116 * 2. Using a custom queue (DelayedWorkQueue), a variant of
117 * unbounded DelayQueue. The lack of capacity constraint and
118 * the fact that corePoolSize and maximumPoolSize are
119 * effectively identical simplifies some execution mechanics
120 * (see delayedExecute) compared to ThreadPoolExecutor.
121 *
122 * 3. Supporting optional run-after-shutdown parameters, which
123 * leads to overrides of shutdown methods to remove and cancel
124 * tasks that should NOT be run after shutdown, as well as
125 * different recheck logic when task (re)submission overlaps
126 * with a shutdown.
127 *
128 * 4. Task decoration methods to allow interception and
129 * instrumentation, which are needed because subclasses cannot
130 * otherwise override submit methods to get this effect. These
131 * don't have any impact on pool control logic though.
132 */
133
134 /**
135 * False if should cancel/suppress periodic tasks on shutdown.
136 */
137 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
138
139 /**
140 * False if should cancel non-periodic tasks on shutdown.
141 */
142 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
143
144 /**
145 * True if ScheduledFutureTask.cancel should remove from queue.
146 */
147 volatile boolean removeOnCancel;
148
149 /**
150 * Sequence number to break scheduling ties, and in turn to
151 * guarantee FIFO order among tied entries.
152 */
153 private static final AtomicLong sequencer = new AtomicLong();
154
155 private class ScheduledFutureTask<V>
156 extends FutureTask<V> implements RunnableScheduledFuture<V> {
157
158 /** Sequence number to break ties FIFO */
159 private final long sequenceNumber;
160
161 /** The nanoTime-based time when the task is enabled to execute. */
162 private volatile long time;
163
164 /**
165 * Period for repeating tasks, in nanoseconds.
166 * A positive value indicates fixed-rate execution.
167 * A negative value indicates fixed-delay execution.
168 * A value of 0 indicates a non-repeating (one-shot) task.
169 */
170 private final long period;
171
172 /** The actual task to be re-enqueued by reExecutePeriodic */
173 RunnableScheduledFuture<V> outerTask = this;
174
175 /**
176 * Index into delay queue, to support faster cancellation.
177 */
178 int heapIndex;
179
180 /**
181 * Creates a one-shot action with given nanoTime-based trigger time.
182 */
183 ScheduledFutureTask(Runnable r, V result, long triggerTime,
184 long sequenceNumber) {
185 super(r, result);
186 this.time = triggerTime;
187 this.period = 0;
188 this.sequenceNumber = sequenceNumber;
189 }
190
191 /**
192 * Creates a periodic action with given nanoTime-based initial
193 * trigger time and period.
194 */
195 ScheduledFutureTask(Runnable r, V result, long triggerTime,
196 long period, long sequenceNumber) {
197 super(r, result);
198 this.time = triggerTime;
199 this.period = period;
200 this.sequenceNumber = sequenceNumber;
201 }
202
203 /**
204 * Creates a one-shot action with given nanoTime-based trigger time.
205 */
206 ScheduledFutureTask(Callable<V> callable, long triggerTime,
207 long sequenceNumber) {
208 super(callable);
209 this.time = triggerTime;
210 this.period = 0;
211 this.sequenceNumber = sequenceNumber;
212 }
213
214 public long getDelay(TimeUnit unit) {
215 return unit.convert(time - System.nanoTime(), NANOSECONDS);
216 }
217
218 public int compareTo(Delayed other) {
219 if (other == this) // compare zero if same object
220 return 0;
221 if (other instanceof ScheduledFutureTask) {
222 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
223 long diff = time - x.time;
224 if (diff < 0)
225 return -1;
226 else if (diff > 0)
227 return 1;
228 else if (sequenceNumber < x.sequenceNumber)
229 return -1;
230 else
231 return 1;
232 }
233 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
234 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
235 }
236
237 /**
238 * Returns {@code true} if this is a periodic (not a one-shot) action.
239 *
240 * @return {@code true} if periodic
241 */
242 public boolean isPeriodic() {
243 return period != 0;
244 }
245
246 /**
247 * Sets the next time to run for a periodic task.
248 */
249 private void setNextRunTime() {
250 long p = period;
251 if (p > 0)
252 time += p;
253 else
254 time = triggerTime(-p);
255 }
256
257 public boolean cancel(boolean mayInterruptIfRunning) {
258 // The racy read of heapIndex below is benign:
259 // if heapIndex < 0, then OOTA guarantees that we have surely
260 // been removed; else we recheck under lock in remove()
261 boolean cancelled = super.cancel(mayInterruptIfRunning);
262 if (cancelled && removeOnCancel && heapIndex >= 0)
263 remove(this);
264 return cancelled;
265 }
266
267 /**
268 * Overrides FutureTask version so as to reset/requeue if periodic.
269 */
270 public void run() {
271 boolean periodic = isPeriodic();
272 if (!canRunInCurrentRunState(periodic))
273 cancel(false);
274 else if (!periodic)
275 super.run();
276 else if (super.runAndReset()) {
277 setNextRunTime();
278 reExecutePeriodic(outerTask);
279 }
280 }
281 }
282
283 /**
284 * Returns true if can run a task given current run state
285 * and run-after-shutdown parameters.
286 *
287 * @param periodic true if this task periodic, false if delayed
288 */
289 boolean canRunInCurrentRunState(boolean periodic) {
290 return isRunningOrShutdown(periodic ?
291 continueExistingPeriodicTasksAfterShutdown :
292 executeExistingDelayedTasksAfterShutdown);
293 }
294
295 /**
296 * Main execution method for delayed or periodic tasks. If pool
297 * is shut down, rejects the task. Otherwise adds task to queue
298 * and starts a thread, if necessary, to run it. (We cannot
299 * prestart the thread to run the task because the task (probably)
300 * shouldn't be run yet.) If the pool is shut down while the task
301 * is being added, cancel and remove it if required by state and
302 * run-after-shutdown parameters.
303 *
304 * @param task the task
305 */
306 private void delayedExecute(RunnableScheduledFuture<?> task) {
307 if (isShutdown())
308 reject(task);
309 else {
310 super.getQueue().add(task);
311 if (isShutdown() &&
312 !canRunInCurrentRunState(task.isPeriodic()) &&
313 remove(task))
314 task.cancel(false);
315 else
316 ensurePrestart();
317 }
318 }
319
320 /**
321 * Requeues a periodic task unless current run state precludes it.
322 * Same idea as delayedExecute except drops task rather than rejecting.
323 *
324 * @param task the task
325 */
326 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
327 if (canRunInCurrentRunState(true)) {
328 super.getQueue().add(task);
329 if (!canRunInCurrentRunState(true) && remove(task))
330 task.cancel(false);
331 else
332 ensurePrestart();
333 }
334 }
335
336 /**
337 * Cancels and clears the queue of all tasks that should not be run
338 * due to shutdown policy. Invoked within super.shutdown.
339 */
340 @Override void onShutdown() {
341 BlockingQueue<Runnable> q = super.getQueue();
342 boolean keepDelayed =
343 getExecuteExistingDelayedTasksAfterShutdownPolicy();
344 boolean keepPeriodic =
345 getContinueExistingPeriodicTasksAfterShutdownPolicy();
346 if (!keepDelayed && !keepPeriodic) {
347 for (Object e : q.toArray())
348 if (e instanceof RunnableScheduledFuture<?>)
349 ((RunnableScheduledFuture<?>) e).cancel(false);
350 q.clear();
351 }
352 else {
353 // Traverse snapshot to avoid iterator exceptions
354 for (Object e : q.toArray()) {
355 if (e instanceof RunnableScheduledFuture) {
356 RunnableScheduledFuture<?> t =
357 (RunnableScheduledFuture<?>)e;
358 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
359 t.isCancelled()) { // also remove if already cancelled
360 if (q.remove(t))
361 t.cancel(false);
362 }
363 }
364 }
365 }
366 tryTerminate();
367 }
368
369 /**
370 * Modifies or replaces the task used to execute a runnable.
371 * This method can be used to override the concrete
372 * class used for managing internal tasks.
373 * The default implementation simply returns the given task.
374 *
375 * @param runnable the submitted Runnable
376 * @param task the task created to execute the runnable
377 * @param <V> the type of the task's result
378 * @return a task that can execute the runnable
379 * @since 1.6
380 */
381 protected <V> RunnableScheduledFuture<V> decorateTask(
382 Runnable runnable, RunnableScheduledFuture<V> task) {
383 return task;
384 }
385
386 /**
387 * Modifies or replaces the task used to execute a callable.
388 * This method can be used to override the concrete
389 * class used for managing internal tasks.
390 * The default implementation simply returns the given task.
391 *
392 * @param callable the submitted Callable
393 * @param task the task created to execute the callable
394 * @param <V> the type of the task's result
395 * @return a task that can execute the callable
396 * @since 1.6
397 */
398 protected <V> RunnableScheduledFuture<V> decorateTask(
399 Callable<V> callable, RunnableScheduledFuture<V> task) {
400 return task;
401 }
402
403 /**
404 * The default keep-alive time for pool threads.
405 *
406 * Normally, this value is unused because all pool threads will be
407 * core threads, but if a user creates a pool with a corePoolSize
408 * of zero (against our advice), we keep a thread alive as long as
409 * there are queued tasks. If the keep alive time is zero (the
410 * historic value), we end up hot-spinning in getTask, wasting a
411 * CPU. But on the other hand, if we set the value too high, and
412 * users create a one-shot pool which they don't cleanly shutdown,
413 * the pool's non-daemon threads will prevent JVM termination. A
414 * small but non-zero value (relative to a JVM's lifetime) seems
415 * best.
416 */
417 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
418
419 /**
420 * Creates a new {@code ScheduledThreadPoolExecutor} with the
421 * given core pool size.
422 *
423 * @param corePoolSize the number of threads to keep in the pool, even
424 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
425 * @throws IllegalArgumentException if {@code corePoolSize < 0}
426 */
427 public ScheduledThreadPoolExecutor(int corePoolSize) {
428 super(corePoolSize, Integer.MAX_VALUE,
429 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
430 new DelayedWorkQueue());
431 }
432
433 /**
434 * Creates a new {@code ScheduledThreadPoolExecutor} with the
435 * given initial parameters.
436 *
437 * @param corePoolSize the number of threads to keep in the pool, even
438 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
439 * @param threadFactory the factory to use when the executor
440 * creates a new thread
441 * @throws IllegalArgumentException if {@code corePoolSize < 0}
442 * @throws NullPointerException if {@code threadFactory} is null
443 */
444 public ScheduledThreadPoolExecutor(int corePoolSize,
445 ThreadFactory threadFactory) {
446 super(corePoolSize, Integer.MAX_VALUE,
447 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
448 new DelayedWorkQueue(), threadFactory);
449 }
450
451 /**
452 * Creates a new {@code ScheduledThreadPoolExecutor} with the
453 * given initial parameters.
454 *
455 * @param corePoolSize the number of threads to keep in the pool, even
456 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
457 * @param handler the handler to use when execution is blocked
458 * because the thread bounds and queue capacities are reached
459 * @throws IllegalArgumentException if {@code corePoolSize < 0}
460 * @throws NullPointerException if {@code handler} is null
461 */
462 public ScheduledThreadPoolExecutor(int corePoolSize,
463 RejectedExecutionHandler handler) {
464 super(corePoolSize, Integer.MAX_VALUE,
465 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
466 new DelayedWorkQueue(), handler);
467 }
468
469 /**
470 * Creates a new {@code ScheduledThreadPoolExecutor} with the
471 * given initial parameters.
472 *
473 * @param corePoolSize the number of threads to keep in the pool, even
474 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
475 * @param threadFactory the factory to use when the executor
476 * creates a new thread
477 * @param handler the handler to use when execution is blocked
478 * because the thread bounds and queue capacities are reached
479 * @throws IllegalArgumentException if {@code corePoolSize < 0}
480 * @throws NullPointerException if {@code threadFactory} or
481 * {@code handler} is null
482 */
483 public ScheduledThreadPoolExecutor(int corePoolSize,
484 ThreadFactory threadFactory,
485 RejectedExecutionHandler handler) {
486 super(corePoolSize, Integer.MAX_VALUE,
487 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
488 new DelayedWorkQueue(), threadFactory, handler);
489 }
490
491 /**
492 * Returns the nanoTime-based trigger time of a delayed action.
493 */
494 private long triggerTime(long delay, TimeUnit unit) {
495 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
496 }
497
498 /**
499 * Returns the nanoTime-based trigger time of a delayed action.
500 */
501 long triggerTime(long delay) {
502 return System.nanoTime() +
503 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
504 }
505
506 /**
507 * Constrains the values of all delays in the queue to be within
508 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
509 * This may occur if a task is eligible to be dequeued, but has
510 * not yet been, while some other task is added with a delay of
511 * Long.MAX_VALUE.
512 */
513 private long overflowFree(long delay) {
514 Delayed head = (Delayed) super.getQueue().peek();
515 if (head != null) {
516 long headDelay = head.getDelay(NANOSECONDS);
517 if (headDelay < 0 && (delay - headDelay < 0))
518 delay = Long.MAX_VALUE + headDelay;
519 }
520 return delay;
521 }
522
523 /**
524 * @throws RejectedExecutionException {@inheritDoc}
525 * @throws NullPointerException {@inheritDoc}
526 */
527 public ScheduledFuture<?> schedule(Runnable command,
528 long delay,
529 TimeUnit unit) {
530 if (command == null || unit == null)
531 throw new NullPointerException();
532 RunnableScheduledFuture<Void> t = decorateTask(command,
533 new ScheduledFutureTask<Void>(command, null,
534 triggerTime(delay, unit),
535 sequencer.getAndIncrement()));
536 delayedExecute(t);
537 return t;
538 }
539
540 /**
541 * @throws RejectedExecutionException {@inheritDoc}
542 * @throws NullPointerException {@inheritDoc}
543 */
544 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
545 long delay,
546 TimeUnit unit) {
547 if (callable == null || unit == null)
548 throw new NullPointerException();
549 RunnableScheduledFuture<V> t = decorateTask(callable,
550 new ScheduledFutureTask<V>(callable,
551 triggerTime(delay, unit),
552 sequencer.getAndIncrement()));
553 delayedExecute(t);
554 return t;
555 }
556
557 /**
558 * @throws RejectedExecutionException {@inheritDoc}
559 * @throws NullPointerException {@inheritDoc}
560 * @throws IllegalArgumentException {@inheritDoc}
561 */
562 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
563 long initialDelay,
564 long period,
565 TimeUnit unit) {
566 if (command == null || unit == null)
567 throw new NullPointerException();
568 if (period <= 0L)
569 throw new IllegalArgumentException();
570 ScheduledFutureTask<Void> sft =
571 new ScheduledFutureTask<Void>(command,
572 null,
573 triggerTime(initialDelay, unit),
574 unit.toNanos(period),
575 sequencer.getAndIncrement());
576 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
577 sft.outerTask = t;
578 delayedExecute(t);
579 return t;
580 }
581
582 /**
583 * @throws RejectedExecutionException {@inheritDoc}
584 * @throws NullPointerException {@inheritDoc}
585 * @throws IllegalArgumentException {@inheritDoc}
586 */
587 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
588 long initialDelay,
589 long delay,
590 TimeUnit unit) {
591 if (command == null || unit == null)
592 throw new NullPointerException();
593 if (delay <= 0L)
594 throw new IllegalArgumentException();
595 ScheduledFutureTask<Void> sft =
596 new ScheduledFutureTask<Void>(command,
597 null,
598 triggerTime(initialDelay, unit),
599 -unit.toNanos(delay),
600 sequencer.getAndIncrement());
601 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
602 sft.outerTask = t;
603 delayedExecute(t);
604 return t;
605 }
606
607 /**
608 * Executes {@code command} with zero required delay.
609 * This has effect equivalent to
610 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
611 * Note that inspections of the queue and of the list returned by
612 * {@code shutdownNow} will access the zero-delayed
613 * {@link ScheduledFuture}, not the {@code command} itself.
614 *
615 * <p>A consequence of the use of {@code ScheduledFuture} objects is
616 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
617 * called with a null second {@code Throwable} argument, even if the
618 * {@code command} terminated abruptly. Instead, the {@code Throwable}
619 * thrown by such a task can be obtained via {@link Future#get}.
620 *
621 * @throws RejectedExecutionException at discretion of
622 * {@code RejectedExecutionHandler}, if the task
623 * cannot be accepted for execution because the
624 * executor has been shut down
625 * @throws NullPointerException {@inheritDoc}
626 */
627 public void execute(Runnable command) {
628 schedule(command, 0, NANOSECONDS);
629 }
630
631 // Override AbstractExecutorService methods
632
633 /**
634 * @throws RejectedExecutionException {@inheritDoc}
635 * @throws NullPointerException {@inheritDoc}
636 */
637 public Future<?> submit(Runnable task) {
638 return schedule(task, 0, NANOSECONDS);
639 }
640
641 /**
642 * @throws RejectedExecutionException {@inheritDoc}
643 * @throws NullPointerException {@inheritDoc}
644 */
645 public <T> Future<T> submit(Runnable task, T result) {
646 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
647 }
648
649 /**
650 * @throws RejectedExecutionException {@inheritDoc}
651 * @throws NullPointerException {@inheritDoc}
652 */
653 public <T> Future<T> submit(Callable<T> task) {
654 return schedule(task, 0, NANOSECONDS);
655 }
656
657 /**
658 * Sets the policy on whether to continue executing existing
659 * periodic tasks even when this executor has been {@code shutdown}.
660 * In this case, these tasks will only terminate upon
661 * {@code shutdownNow} or after setting the policy to
662 * {@code false} when already shutdown.
663 * This value is by default {@code false}.
664 *
665 * @param value if {@code true}, continue after shutdown, else don't
666 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
667 */
668 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
669 continueExistingPeriodicTasksAfterShutdown = value;
670 if (!value && isShutdown())
671 onShutdown();
672 }
673
674 /**
675 * Gets the policy on whether to continue executing existing
676 * periodic tasks even when this executor has been {@code shutdown}.
677 * In this case, these tasks will only terminate upon
678 * {@code shutdownNow} or after setting the policy to
679 * {@code false} when already shutdown.
680 * This value is by default {@code false}.
681 *
682 * @return {@code true} if will continue after shutdown
683 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
684 */
685 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
686 return continueExistingPeriodicTasksAfterShutdown;
687 }
688
689 /**
690 * Sets the policy on whether to execute existing delayed
691 * tasks even when this executor has been {@code shutdown}.
692 * In this case, these tasks will only terminate upon
693 * {@code shutdownNow}, or after setting the policy to
694 * {@code false} when already shutdown.
695 * This value is by default {@code true}.
696 *
697 * @param value if {@code true}, execute after shutdown, else don't
698 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
699 */
700 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
701 executeExistingDelayedTasksAfterShutdown = value;
702 if (!value && isShutdown())
703 onShutdown();
704 }
705
706 /**
707 * Gets the policy on whether to execute existing delayed
708 * tasks even when this executor has been {@code shutdown}.
709 * In this case, these tasks will only terminate upon
710 * {@code shutdownNow}, or after setting the policy to
711 * {@code false} when already shutdown.
712 * This value is by default {@code true}.
713 *
714 * @return {@code true} if will execute after shutdown
715 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
716 */
717 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
718 return executeExistingDelayedTasksAfterShutdown;
719 }
720
721 /**
722 * Sets the policy on whether cancelled tasks should be immediately
723 * removed from the work queue at time of cancellation. This value is
724 * by default {@code false}.
725 *
726 * @param value if {@code true}, remove on cancellation, else don't
727 * @see #getRemoveOnCancelPolicy
728 * @since 1.7
729 */
730 public void setRemoveOnCancelPolicy(boolean value) {
731 removeOnCancel = value;
732 }
733
734 /**
735 * Gets the policy on whether cancelled tasks should be immediately
736 * removed from the work queue at time of cancellation. This value is
737 * by default {@code false}.
738 *
739 * @return {@code true} if cancelled tasks are immediately removed
740 * from the queue
741 * @see #setRemoveOnCancelPolicy
742 * @since 1.7
743 */
744 public boolean getRemoveOnCancelPolicy() {
745 return removeOnCancel;
746 }
747
748 /**
749 * Initiates an orderly shutdown in which previously submitted
750 * tasks are executed, but no new tasks will be accepted.
751 * Invocation has no additional effect if already shut down.
752 *
753 * <p>This method does not wait for previously submitted tasks to
754 * complete execution. Use {@link #awaitTermination awaitTermination}
755 * to do that.
756 *
757 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
758 * has been set {@code false}, existing delayed tasks whose delays
759 * have not yet elapsed are cancelled. And unless the {@code
760 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
761 * {@code true}, future executions of existing periodic tasks will
762 * be cancelled.
763 *
764 * @throws SecurityException {@inheritDoc}
765 */
766 public void shutdown() {
767 super.shutdown();
768 }
769
770 /**
771 * Attempts to stop all actively executing tasks, halts the
772 * processing of waiting tasks, and returns a list of the tasks
773 * that were awaiting execution. These tasks are drained (removed)
774 * from the task queue upon return from this method.
775 *
776 * <p>This method does not wait for actively executing tasks to
777 * terminate. Use {@link #awaitTermination awaitTermination} to
778 * do that.
779 *
780 * <p>There are no guarantees beyond best-effort attempts to stop
781 * processing actively executing tasks. This implementation
782 * interrupts tasks via {@link Thread#interrupt}; any task that
783 * fails to respond to interrupts may never terminate.
784 *
785 * @return list of tasks that never commenced execution.
786 * Each element of this list is a {@link ScheduledFuture}.
787 * For tasks submitted via one of the {@code schedule}
788 * methods, the element will be identical to the returned
789 * {@code ScheduledFuture}. For tasks submitted using
790 * {@link #execute execute}, the element will be a
791 * zero-delay {@code ScheduledFuture}.
792 * @throws SecurityException {@inheritDoc}
793 */
794 public List<Runnable> shutdownNow() {
795 return super.shutdownNow();
796 }
797
798 /**
799 * Returns the task queue used by this executor. Access to the
800 * task queue is intended primarily for debugging and monitoring.
801 * This queue may be in active use. Retrieving the task queue
802 * does not prevent queued tasks from executing.
803 *
804 * <p>Each element of this queue is a {@link ScheduledFuture}.
805 * For tasks submitted via one of the {@code schedule} methods, the
806 * element will be identical to the returned {@code ScheduledFuture}.
807 * For tasks submitted using {@link #execute execute}, the element
808 * will be a zero-delay {@code ScheduledFuture}.
809 *
810 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
811 * tasks in the order in which they will execute.
812 *
813 * @return the task queue
814 */
815 public BlockingQueue<Runnable> getQueue() {
816 return super.getQueue();
817 }
818
819 /**
820 * Specialized delay queue. To mesh with TPE declarations, this
821 * class must be declared as a BlockingQueue<Runnable> even though
822 * it can only hold RunnableScheduledFutures.
823 */
824 static class DelayedWorkQueue extends AbstractQueue<Runnable>
825 implements BlockingQueue<Runnable> {
826
827 /*
828 * A DelayedWorkQueue is based on a heap-based data structure
829 * like those in DelayQueue and PriorityQueue, except that
830 * every ScheduledFutureTask also records its index into the
831 * heap array. This eliminates the need to find a task upon
832 * cancellation, greatly speeding up removal (down from O(n)
833 * to O(log n)), and reducing garbage retention that would
834 * otherwise occur by waiting for the element to rise to top
835 * before clearing. But because the queue may also hold
836 * RunnableScheduledFutures that are not ScheduledFutureTasks,
837 * we are not guaranteed to have such indices available, in
838 * which case we fall back to linear search. (We expect that
839 * most tasks will not be decorated, and that the faster cases
840 * will be much more common.)
841 *
842 * All heap operations must record index changes -- mainly
843 * within siftUp and siftDown. Upon removal, a task's
844 * heapIndex is set to -1. Note that ScheduledFutureTasks can
845 * appear at most once in the queue (this need not be true for
846 * other kinds of tasks or work queues), so are uniquely
847 * identified by heapIndex.
848 */
849
850 private static final int INITIAL_CAPACITY = 16;
851 private RunnableScheduledFuture<?>[] queue =
852 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
853 private final ReentrantLock lock = new ReentrantLock();
854 private int size;
855
856 /**
857 * Thread designated to wait for the task at the head of the
858 * queue. This variant of the Leader-Follower pattern
859 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
860 * minimize unnecessary timed waiting. When a thread becomes
861 * the leader, it waits only for the next delay to elapse, but
862 * other threads await indefinitely. The leader thread must
863 * signal some other thread before returning from take() or
864 * poll(...), unless some other thread becomes leader in the
865 * interim. Whenever the head of the queue is replaced with a
866 * task with an earlier expiration time, the leader field is
867 * invalidated by being reset to null, and some waiting
868 * thread, but not necessarily the current leader, is
869 * signalled. So waiting threads must be prepared to acquire
870 * and lose leadership while waiting.
871 */
872 private Thread leader;
873
874 /**
875 * Condition signalled when a newer task becomes available at the
876 * head of the queue or a new thread may need to become leader.
877 */
878 private final Condition available = lock.newCondition();
879
880 /**
881 * Sets f's heapIndex if it is a ScheduledFutureTask.
882 */
883 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
884 if (f instanceof ScheduledFutureTask)
885 ((ScheduledFutureTask)f).heapIndex = idx;
886 }
887
888 /**
889 * Sifts element added at bottom up to its heap-ordered spot.
890 * Call only when holding lock.
891 */
892 private void siftUp(int k, RunnableScheduledFuture<?> key) {
893 while (k > 0) {
894 int parent = (k - 1) >>> 1;
895 RunnableScheduledFuture<?> e = queue[parent];
896 if (key.compareTo(e) >= 0)
897 break;
898 queue[k] = e;
899 setIndex(e, k);
900 k = parent;
901 }
902 queue[k] = key;
903 setIndex(key, k);
904 }
905
906 /**
907 * Sifts element added at top down to its heap-ordered spot.
908 * Call only when holding lock.
909 */
910 private void siftDown(int k, RunnableScheduledFuture<?> key) {
911 int half = size >>> 1;
912 while (k < half) {
913 int child = (k << 1) + 1;
914 RunnableScheduledFuture<?> c = queue[child];
915 int right = child + 1;
916 if (right < size && c.compareTo(queue[right]) > 0)
917 c = queue[child = right];
918 if (key.compareTo(c) <= 0)
919 break;
920 queue[k] = c;
921 setIndex(c, k);
922 k = child;
923 }
924 queue[k] = key;
925 setIndex(key, k);
926 }
927
928 /**
929 * Resizes the heap array. Call only when holding lock.
930 */
931 private void grow() {
932 int oldCapacity = queue.length;
933 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
934 if (newCapacity < 0) // overflow
935 newCapacity = Integer.MAX_VALUE;
936 queue = Arrays.copyOf(queue, newCapacity);
937 }
938
939 /**
940 * Finds index of given object, or -1 if absent.
941 */
942 private int indexOf(Object x) {
943 if (x != null) {
944 if (x instanceof ScheduledFutureTask) {
945 int i = ((ScheduledFutureTask) x).heapIndex;
946 // Sanity check; x could conceivably be a
947 // ScheduledFutureTask from some other pool.
948 if (i >= 0 && i < size && queue[i] == x)
949 return i;
950 } else {
951 for (int i = 0; i < size; i++)
952 if (x.equals(queue[i]))
953 return i;
954 }
955 }
956 return -1;
957 }
958
959 public boolean contains(Object x) {
960 final ReentrantLock lock = this.lock;
961 lock.lock();
962 try {
963 return indexOf(x) != -1;
964 } finally {
965 lock.unlock();
966 }
967 }
968
969 public boolean remove(Object x) {
970 final ReentrantLock lock = this.lock;
971 lock.lock();
972 try {
973 int i = indexOf(x);
974 if (i < 0)
975 return false;
976
977 setIndex(queue[i], -1);
978 int s = --size;
979 RunnableScheduledFuture<?> replacement = queue[s];
980 queue[s] = null;
981 if (s != i) {
982 siftDown(i, replacement);
983 if (queue[i] == replacement)
984 siftUp(i, replacement);
985 }
986 return true;
987 } finally {
988 lock.unlock();
989 }
990 }
991
992 public int size() {
993 final ReentrantLock lock = this.lock;
994 lock.lock();
995 try {
996 return size;
997 } finally {
998 lock.unlock();
999 }
1000 }
1001
1002 public boolean isEmpty() {
1003 return size() == 0;
1004 }
1005
1006 public int remainingCapacity() {
1007 return Integer.MAX_VALUE;
1008 }
1009
1010 public RunnableScheduledFuture<?> peek() {
1011 final ReentrantLock lock = this.lock;
1012 lock.lock();
1013 try {
1014 return queue[0];
1015 } finally {
1016 lock.unlock();
1017 }
1018 }
1019
1020 public boolean offer(Runnable x) {
1021 if (x == null)
1022 throw new NullPointerException();
1023 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1024 final ReentrantLock lock = this.lock;
1025 lock.lock();
1026 try {
1027 int i = size;
1028 if (i >= queue.length)
1029 grow();
1030 size = i + 1;
1031 if (i == 0) {
1032 queue[0] = e;
1033 setIndex(e, 0);
1034 } else {
1035 siftUp(i, e);
1036 }
1037 if (queue[0] == e) {
1038 leader = null;
1039 available.signal();
1040 }
1041 } finally {
1042 lock.unlock();
1043 }
1044 return true;
1045 }
1046
1047 public void put(Runnable e) {
1048 offer(e);
1049 }
1050
1051 public boolean add(Runnable e) {
1052 return offer(e);
1053 }
1054
1055 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1056 return offer(e);
1057 }
1058
1059 /**
1060 * Performs common bookkeeping for poll and take: Replaces
1061 * first element with last and sifts it down. Call only when
1062 * holding lock.
1063 * @param f the task to remove and return
1064 */
1065 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1066 int s = --size;
1067 RunnableScheduledFuture<?> x = queue[s];
1068 queue[s] = null;
1069 if (s != 0)
1070 siftDown(0, x);
1071 setIndex(f, -1);
1072 return f;
1073 }
1074
1075 public RunnableScheduledFuture<?> poll() {
1076 final ReentrantLock lock = this.lock;
1077 lock.lock();
1078 try {
1079 RunnableScheduledFuture<?> first = queue[0];
1080 return (first == null || first.getDelay(NANOSECONDS) > 0)
1081 ? null
1082 : finishPoll(first);
1083 } finally {
1084 lock.unlock();
1085 }
1086 }
1087
1088 public RunnableScheduledFuture<?> take() throws InterruptedException {
1089 final ReentrantLock lock = this.lock;
1090 lock.lockInterruptibly();
1091 try {
1092 for (;;) {
1093 RunnableScheduledFuture<?> first = queue[0];
1094 if (first == null)
1095 available.await();
1096 else {
1097 long delay = first.getDelay(NANOSECONDS);
1098 if (delay <= 0L)
1099 return finishPoll(first);
1100 first = null; // don't retain ref while waiting
1101 if (leader != null)
1102 available.await();
1103 else {
1104 Thread thisThread = Thread.currentThread();
1105 leader = thisThread;
1106 try {
1107 available.awaitNanos(delay);
1108 } finally {
1109 if (leader == thisThread)
1110 leader = null;
1111 }
1112 }
1113 }
1114 }
1115 } finally {
1116 if (leader == null && queue[0] != null)
1117 available.signal();
1118 lock.unlock();
1119 }
1120 }
1121
1122 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1123 throws InterruptedException {
1124 long nanos = unit.toNanos(timeout);
1125 final ReentrantLock lock = this.lock;
1126 lock.lockInterruptibly();
1127 try {
1128 for (;;) {
1129 RunnableScheduledFuture<?> first = queue[0];
1130 if (first == null) {
1131 if (nanos <= 0L)
1132 return null;
1133 else
1134 nanos = available.awaitNanos(nanos);
1135 } else {
1136 long delay = first.getDelay(NANOSECONDS);
1137 if (delay <= 0L)
1138 return finishPoll(first);
1139 if (nanos <= 0L)
1140 return null;
1141 first = null; // don't retain ref while waiting
1142 if (nanos < delay || leader != null)
1143 nanos = available.awaitNanos(nanos);
1144 else {
1145 Thread thisThread = Thread.currentThread();
1146 leader = thisThread;
1147 try {
1148 long timeLeft = available.awaitNanos(delay);
1149 nanos -= delay - timeLeft;
1150 } finally {
1151 if (leader == thisThread)
1152 leader = null;
1153 }
1154 }
1155 }
1156 }
1157 } finally {
1158 if (leader == null && queue[0] != null)
1159 available.signal();
1160 lock.unlock();
1161 }
1162 }
1163
1164 public void clear() {
1165 final ReentrantLock lock = this.lock;
1166 lock.lock();
1167 try {
1168 for (int i = 0; i < size; i++) {
1169 RunnableScheduledFuture<?> t = queue[i];
1170 if (t != null) {
1171 queue[i] = null;
1172 setIndex(t, -1);
1173 }
1174 }
1175 size = 0;
1176 } finally {
1177 lock.unlock();
1178 }
1179 }
1180
1181 public int drainTo(Collection<? super Runnable> c) {
1182 return drainTo(c, Integer.MAX_VALUE);
1183 }
1184
1185 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1186 if (c == null)
1187 throw new NullPointerException();
1188 if (c == this)
1189 throw new IllegalArgumentException();
1190 if (maxElements <= 0)
1191 return 0;
1192 final ReentrantLock lock = this.lock;
1193 lock.lock();
1194 try {
1195 RunnableScheduledFuture<?> first;
1196 int n = 0;
1197 while (n < maxElements
1198 && (first = queue[0]) != null
1199 && first.getDelay(NANOSECONDS) <= 0) {
1200 c.add(first); // In this order, in case add() throws.
1201 finishPoll(first);
1202 ++n;
1203 }
1204 return n;
1205 } finally {
1206 lock.unlock();
1207 }
1208 }
1209
1210 public Object[] toArray() {
1211 final ReentrantLock lock = this.lock;
1212 lock.lock();
1213 try {
1214 return Arrays.copyOf(queue, size, Object[].class);
1215 } finally {
1216 lock.unlock();
1217 }
1218 }
1219
1220 @SuppressWarnings("unchecked")
1221 public <T> T[] toArray(T[] a) {
1222 final ReentrantLock lock = this.lock;
1223 lock.lock();
1224 try {
1225 if (a.length < size)
1226 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1227 System.arraycopy(queue, 0, a, 0, size);
1228 if (a.length > size)
1229 a[size] = null;
1230 return a;
1231 } finally {
1232 lock.unlock();
1233 }
1234 }
1235
1236 public Iterator<Runnable> iterator() {
1237 final ReentrantLock lock = this.lock;
1238 lock.lock();
1239 try {
1240 return new Itr(Arrays.copyOf(queue, size));
1241 } finally {
1242 lock.unlock();
1243 }
1244 }
1245
1246 /**
1247 * Snapshot iterator that works off copy of underlying q array.
1248 */
1249 private class Itr implements Iterator<Runnable> {
1250 final RunnableScheduledFuture<?>[] array;
1251 int cursor; // index of next element to return; initially 0
1252 int lastRet = -1; // index of last element returned; -1 if no such
1253
1254 Itr(RunnableScheduledFuture<?>[] array) {
1255 this.array = array;
1256 }
1257
1258 public boolean hasNext() {
1259 return cursor < array.length;
1260 }
1261
1262 public Runnable next() {
1263 if (cursor >= array.length)
1264 throw new NoSuchElementException();
1265 return array[lastRet = cursor++];
1266 }
1267
1268 public void remove() {
1269 if (lastRet < 0)
1270 throw new IllegalStateException();
1271 DelayedWorkQueue.this.remove(array[lastRet]);
1272 lastRet = -1;
1273 }
1274 }
1275 }
1276 }