ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.82
Committed: Wed Dec 3 21:55:44 2014 UTC (9 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.81: +7 -1 lines
Log Message:
never use wildcard imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 import static java.util.concurrent.TimeUnit.MILLISECONDS;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.util.concurrent.locks.Condition;
13 import java.util.concurrent.locks.ReentrantLock;
14 import java.util.AbstractQueue;
15 import java.util.Arrays;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.NoSuchElementException;
20
21 /**
22 * A {@link ThreadPoolExecutor} that can additionally schedule
23 * commands to run after a given delay, or to execute periodically.
24 * This class is preferable to {@link java.util.Timer} when multiple
25 * worker threads are needed, or when the additional flexibility or
26 * capabilities of {@link ThreadPoolExecutor} (which this class
27 * extends) are required.
28 *
29 * <p>Delayed tasks execute no sooner than they are enabled, but
30 * without any real-time guarantees about when, after they are
31 * enabled, they will commence. Tasks scheduled for exactly the same
32 * execution time are enabled in first-in-first-out (FIFO) order of
33 * submission.
34 *
35 * <p>When a submitted task is cancelled before it is run, execution
36 * is suppressed. By default, such a cancelled task is not
37 * automatically removed from the work queue until its delay elapses.
38 * While this enables further inspection and monitoring, it may also
39 * cause unbounded retention of cancelled tasks. To avoid this, use
40 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
41 * removed from the work queue at time of cancellation.
42 *
43 * <p>Successive executions of a periodic task scheduled via
44 * {@link #scheduleAtFixedRate} or
45 * {@link #scheduleWithFixedDelay} do not overlap. While different
46 * executions may be performed by different threads, the effects of
47 * prior executions <a
48 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
49 * those of subsequent ones.
50 *
51 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
52 * of the inherited tuning methods are not useful for it. In
53 * particular, because it acts as a fixed-sized pool using
54 * {@code corePoolSize} threads and an unbounded queue, adjustments
55 * to {@code maximumPoolSize} have no useful effect. Additionally, it
56 * is almost never a good idea to set {@code corePoolSize} to zero or
57 * use {@code allowCoreThreadTimeOut} because this may leave the pool
58 * without threads to handle tasks once they become eligible to run.
59 *
60 * <p><b>Extension notes:</b> This class overrides the
61 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
62 * {@link AbstractExecutorService#submit(Runnable) submit}
63 * methods to generate internal {@link ScheduledFuture} objects to
64 * control per-task delays and scheduling. To preserve
65 * functionality, any further overrides of these methods in
66 * subclasses must invoke superclass versions, which effectively
67 * disables additional task customization. However, this class
68 * provides alternative protected extension method
69 * {@code decorateTask} (one version each for {@code Runnable} and
70 * {@code Callable}) that can be used to customize the concrete task
71 * types used to execute commands entered via {@code execute},
72 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
73 * and {@code scheduleWithFixedDelay}. By default, a
74 * {@code ScheduledThreadPoolExecutor} uses a task type extending
75 * {@link FutureTask}. However, this may be modified or replaced using
76 * subclasses of the form:
77 *
78 * <pre> {@code
79 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
80 *
81 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
82 *
83 * protected <V> RunnableScheduledFuture<V> decorateTask(
84 * Runnable r, RunnableScheduledFuture<V> task) {
85 * return new CustomTask<V>(r, task);
86 * }
87 *
88 * protected <V> RunnableScheduledFuture<V> decorateTask(
89 * Callable<V> c, RunnableScheduledFuture<V> task) {
90 * return new CustomTask<V>(c, task);
91 * }
92 * // ... add constructors, etc.
93 * }}</pre>
94 *
95 * @since 1.5
96 * @author Doug Lea
97 */
98 public class ScheduledThreadPoolExecutor
99 extends ThreadPoolExecutor
100 implements ScheduledExecutorService {
101
102 /*
103 * This class specializes ThreadPoolExecutor implementation by
104 *
105 * 1. Using a custom task type, ScheduledFutureTask for
106 * tasks, even those that don't require scheduling (i.e.,
107 * those submitted using ExecutorService execute, not
108 * ScheduledExecutorService methods) which are treated as
109 * delayed tasks with a delay of zero.
110 *
111 * 2. Using a custom queue (DelayedWorkQueue), a variant of
112 * unbounded DelayQueue. The lack of capacity constraint and
113 * the fact that corePoolSize and maximumPoolSize are
114 * effectively identical simplifies some execution mechanics
115 * (see delayedExecute) compared to ThreadPoolExecutor.
116 *
117 * 3. Supporting optional run-after-shutdown parameters, which
118 * leads to overrides of shutdown methods to remove and cancel
119 * tasks that should NOT be run after shutdown, as well as
120 * different recheck logic when task (re)submission overlaps
121 * with a shutdown.
122 *
123 * 4. Task decoration methods to allow interception and
124 * instrumentation, which are needed because subclasses cannot
125 * otherwise override submit methods to get this effect. These
126 * don't have any impact on pool control logic though.
127 */
128
129 /**
130 * False if should cancel/suppress periodic tasks on shutdown.
131 */
132 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
133
134 /**
135 * False if should cancel non-periodic tasks on shutdown.
136 */
137 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
138
139 /**
140 * True if ScheduledFutureTask.cancel should remove from queue.
141 */
142 private volatile boolean removeOnCancel = false;
143
144 /**
145 * Sequence number to break scheduling ties, and in turn to
146 * guarantee FIFO order among tied entries.
147 */
148 private static final AtomicLong sequencer = new AtomicLong();
149
150 /**
151 * Returns current nanosecond time.
152 */
153 static final long now() {
154 return System.nanoTime();
155 }
156
157 private class ScheduledFutureTask<V>
158 extends FutureTask<V> implements RunnableScheduledFuture<V> {
159
160 /** Sequence number to break ties FIFO */
161 private final long sequenceNumber;
162
163 /** The time the task is enabled to execute in nanoTime units */
164 private long time;
165
166 /**
167 * Period in nanoseconds for repeating tasks.
168 * A positive value indicates fixed-rate execution.
169 * A negative value indicates fixed-delay execution.
170 * A value of 0 indicates a non-repeating (one-shot) task.
171 */
172 private final long period;
173
174 /** The actual task to be re-enqueued by reExecutePeriodic */
175 RunnableScheduledFuture<V> outerTask = this;
176
177 /**
178 * Index into delay queue, to support faster cancellation.
179 */
180 int heapIndex;
181
182 /**
183 * Creates a one-shot action with given nanoTime-based trigger time.
184 */
185 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
186 super(r, result);
187 this.time = triggerTime;
188 this.period = 0;
189 this.sequenceNumber = sequencer.getAndIncrement();
190 }
191
192 /**
193 * Creates a periodic action with given nanoTime-based initial
194 * trigger time and period.
195 */
196 ScheduledFutureTask(Runnable r, V result, long triggerTime,
197 long period) {
198 super(r, result);
199 this.time = triggerTime;
200 this.period = period;
201 this.sequenceNumber = sequencer.getAndIncrement();
202 }
203
204 /**
205 * Creates a one-shot action with given nanoTime-based trigger time.
206 */
207 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
208 super(callable);
209 this.time = triggerTime;
210 this.period = 0;
211 this.sequenceNumber = sequencer.getAndIncrement();
212 }
213
214 public long getDelay(TimeUnit unit) {
215 return unit.convert(time - now(), NANOSECONDS);
216 }
217
218 public int compareTo(Delayed other) {
219 if (other == this) // compare zero if same object
220 return 0;
221 if (other instanceof ScheduledFutureTask) {
222 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
223 long diff = time - x.time;
224 if (diff < 0)
225 return -1;
226 else if (diff > 0)
227 return 1;
228 else if (sequenceNumber < x.sequenceNumber)
229 return -1;
230 else
231 return 1;
232 }
233 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
234 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
235 }
236
237 /**
238 * Returns {@code true} if this is a periodic (not a one-shot) action.
239 *
240 * @return {@code true} if periodic
241 */
242 public boolean isPeriodic() {
243 return period != 0;
244 }
245
246 /**
247 * Sets the next time to run for a periodic task.
248 */
249 private void setNextRunTime() {
250 long p = period;
251 if (p > 0)
252 time += p;
253 else
254 time = triggerTime(-p);
255 }
256
257 public boolean cancel(boolean mayInterruptIfRunning) {
258 boolean cancelled = super.cancel(mayInterruptIfRunning);
259 if (cancelled && removeOnCancel && heapIndex >= 0)
260 remove(this);
261 return cancelled;
262 }
263
264 /**
265 * Overrides FutureTask version so as to reset/requeue if periodic.
266 */
267 public void run() {
268 boolean periodic = isPeriodic();
269 if (!canRunInCurrentRunState(periodic))
270 cancel(false);
271 else if (!periodic)
272 ScheduledFutureTask.super.run();
273 else if (ScheduledFutureTask.super.runAndReset()) {
274 setNextRunTime();
275 reExecutePeriodic(outerTask);
276 }
277 }
278 }
279
280 /**
281 * Returns true if can run a task given current run state
282 * and run-after-shutdown parameters.
283 *
284 * @param periodic true if this task periodic, false if delayed
285 */
286 boolean canRunInCurrentRunState(boolean periodic) {
287 return isRunningOrShutdown(periodic ?
288 continueExistingPeriodicTasksAfterShutdown :
289 executeExistingDelayedTasksAfterShutdown);
290 }
291
292 /**
293 * Main execution method for delayed or periodic tasks. If pool
294 * is shut down, rejects the task. Otherwise adds task to queue
295 * and starts a thread, if necessary, to run it. (We cannot
296 * prestart the thread to run the task because the task (probably)
297 * shouldn't be run yet.) If the pool is shut down while the task
298 * is being added, cancel and remove it if required by state and
299 * run-after-shutdown parameters.
300 *
301 * @param task the task
302 */
303 private void delayedExecute(RunnableScheduledFuture<?> task) {
304 if (isShutdown())
305 reject(task);
306 else {
307 super.getQueue().add(task);
308 if (isShutdown() &&
309 !canRunInCurrentRunState(task.isPeriodic()) &&
310 remove(task))
311 task.cancel(false);
312 else
313 ensurePrestart();
314 }
315 }
316
317 /**
318 * Requeues a periodic task unless current run state precludes it.
319 * Same idea as delayedExecute except drops task rather than rejecting.
320 *
321 * @param task the task
322 */
323 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
324 if (canRunInCurrentRunState(true)) {
325 super.getQueue().add(task);
326 if (!canRunInCurrentRunState(true) && remove(task))
327 task.cancel(false);
328 else
329 ensurePrestart();
330 }
331 }
332
333 /**
334 * Cancels and clears the queue of all tasks that should not be run
335 * due to shutdown policy. Invoked within super.shutdown.
336 */
337 @Override void onShutdown() {
338 BlockingQueue<Runnable> q = super.getQueue();
339 boolean keepDelayed =
340 getExecuteExistingDelayedTasksAfterShutdownPolicy();
341 boolean keepPeriodic =
342 getContinueExistingPeriodicTasksAfterShutdownPolicy();
343 if (!keepDelayed && !keepPeriodic) {
344 for (Object e : q.toArray())
345 if (e instanceof RunnableScheduledFuture<?>)
346 ((RunnableScheduledFuture<?>) e).cancel(false);
347 q.clear();
348 }
349 else {
350 // Traverse snapshot to avoid iterator exceptions
351 for (Object e : q.toArray()) {
352 if (e instanceof RunnableScheduledFuture) {
353 RunnableScheduledFuture<?> t =
354 (RunnableScheduledFuture<?>)e;
355 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
356 t.isCancelled()) { // also remove if already cancelled
357 if (q.remove(t))
358 t.cancel(false);
359 }
360 }
361 }
362 }
363 tryTerminate();
364 }
365
366 /**
367 * Modifies or replaces the task used to execute a runnable.
368 * This method can be used to override the concrete
369 * class used for managing internal tasks.
370 * The default implementation simply returns the given task.
371 *
372 * @param runnable the submitted Runnable
373 * @param task the task created to execute the runnable
374 * @param <V> the type of the task's result
375 * @return a task that can execute the runnable
376 * @since 1.6
377 */
378 protected <V> RunnableScheduledFuture<V> decorateTask(
379 Runnable runnable, RunnableScheduledFuture<V> task) {
380 return task;
381 }
382
383 /**
384 * Modifies or replaces the task used to execute a callable.
385 * This method can be used to override the concrete
386 * class used for managing internal tasks.
387 * The default implementation simply returns the given task.
388 *
389 * @param callable the submitted Callable
390 * @param task the task created to execute the callable
391 * @param <V> the type of the task's result
392 * @return a task that can execute the callable
393 * @since 1.6
394 */
395 protected <V> RunnableScheduledFuture<V> decorateTask(
396 Callable<V> callable, RunnableScheduledFuture<V> task) {
397 return task;
398 }
399
400 /**
401 * The default keep-alive time for pool threads.
402 *
403 * Normally, this value is unused because all pool threads will be
404 * core threads, but if a user creates a pool with a corePoolSize
405 * of zero (against our advice), we keep a thread alive as long as
406 * there are queued tasks. If the keep alive time is zero (the
407 * historic value), we end up hot-spinning in getTask, wasting a
408 * CPU. But on the other hand, if we set the value too high, and
409 * users create a one-shot pool which they don't cleanly shutdown,
410 * the pool's non-daemon threads will prevent JVM termination. A
411 * small but non-zero value (relative to a JVM's lifetime) seems
412 * best.
413 */
414 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
415
416 /**
417 * Creates a new {@code ScheduledThreadPoolExecutor} with the
418 * given core pool size.
419 *
420 * @param corePoolSize the number of threads to keep in the pool, even
421 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
422 * @throws IllegalArgumentException if {@code corePoolSize < 0}
423 */
424 public ScheduledThreadPoolExecutor(int corePoolSize) {
425 super(corePoolSize, Integer.MAX_VALUE,
426 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
427 new DelayedWorkQueue());
428 }
429
430 /**
431 * Creates a new {@code ScheduledThreadPoolExecutor} with the
432 * given initial parameters.
433 *
434 * @param corePoolSize the number of threads to keep in the pool, even
435 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
436 * @param threadFactory the factory to use when the executor
437 * creates a new thread
438 * @throws IllegalArgumentException if {@code corePoolSize < 0}
439 * @throws NullPointerException if {@code threadFactory} is null
440 */
441 public ScheduledThreadPoolExecutor(int corePoolSize,
442 ThreadFactory threadFactory) {
443 super(corePoolSize, Integer.MAX_VALUE,
444 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
445 new DelayedWorkQueue(), threadFactory);
446 }
447
448 /**
449 * Creates a new {@code ScheduledThreadPoolExecutor} with the
450 * given initial parameters.
451 *
452 * @param corePoolSize the number of threads to keep in the pool, even
453 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
454 * @param handler the handler to use when execution is blocked
455 * because the thread bounds and queue capacities are reached
456 * @throws IllegalArgumentException if {@code corePoolSize < 0}
457 * @throws NullPointerException if {@code handler} is null
458 */
459 public ScheduledThreadPoolExecutor(int corePoolSize,
460 RejectedExecutionHandler handler) {
461 super(corePoolSize, Integer.MAX_VALUE,
462 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
463 new DelayedWorkQueue(), handler);
464 }
465
466 /**
467 * Creates a new {@code ScheduledThreadPoolExecutor} with the
468 * given initial parameters.
469 *
470 * @param corePoolSize the number of threads to keep in the pool, even
471 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
472 * @param threadFactory the factory to use when the executor
473 * creates a new thread
474 * @param handler the handler to use when execution is blocked
475 * because the thread bounds and queue capacities are reached
476 * @throws IllegalArgumentException if {@code corePoolSize < 0}
477 * @throws NullPointerException if {@code threadFactory} or
478 * {@code handler} is null
479 */
480 public ScheduledThreadPoolExecutor(int corePoolSize,
481 ThreadFactory threadFactory,
482 RejectedExecutionHandler handler) {
483 super(corePoolSize, Integer.MAX_VALUE,
484 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
485 new DelayedWorkQueue(), threadFactory, handler);
486 }
487
488 /**
489 * Returns the nanoTime-based trigger time of a delayed action.
490 */
491 private long triggerTime(long delay, TimeUnit unit) {
492 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
493 }
494
495 /**
496 * Returns the nanoTime-based trigger time of a delayed action.
497 */
498 long triggerTime(long delay) {
499 return now() +
500 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
501 }
502
503 /**
504 * Constrains the values of all delays in the queue to be within
505 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
506 * This may occur if a task is eligible to be dequeued, but has
507 * not yet been, while some other task is added with a delay of
508 * Long.MAX_VALUE.
509 */
510 private long overflowFree(long delay) {
511 Delayed head = (Delayed) super.getQueue().peek();
512 if (head != null) {
513 long headDelay = head.getDelay(NANOSECONDS);
514 if (headDelay < 0 && (delay - headDelay < 0))
515 delay = Long.MAX_VALUE + headDelay;
516 }
517 return delay;
518 }
519
520 /**
521 * @throws RejectedExecutionException {@inheritDoc}
522 * @throws NullPointerException {@inheritDoc}
523 */
524 public ScheduledFuture<?> schedule(Runnable command,
525 long delay,
526 TimeUnit unit) {
527 if (command == null || unit == null)
528 throw new NullPointerException();
529 RunnableScheduledFuture<Void> t = decorateTask(command,
530 new ScheduledFutureTask<Void>(command, null,
531 triggerTime(delay, unit)));
532 delayedExecute(t);
533 return t;
534 }
535
536 /**
537 * @throws RejectedExecutionException {@inheritDoc}
538 * @throws NullPointerException {@inheritDoc}
539 */
540 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
541 long delay,
542 TimeUnit unit) {
543 if (callable == null || unit == null)
544 throw new NullPointerException();
545 RunnableScheduledFuture<V> t = decorateTask(callable,
546 new ScheduledFutureTask<V>(callable,
547 triggerTime(delay, unit)));
548 delayedExecute(t);
549 return t;
550 }
551
552 /**
553 * @throws RejectedExecutionException {@inheritDoc}
554 * @throws NullPointerException {@inheritDoc}
555 * @throws IllegalArgumentException {@inheritDoc}
556 */
557 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
558 long initialDelay,
559 long period,
560 TimeUnit unit) {
561 if (command == null || unit == null)
562 throw new NullPointerException();
563 if (period <= 0)
564 throw new IllegalArgumentException();
565 ScheduledFutureTask<Void> sft =
566 new ScheduledFutureTask<Void>(command,
567 null,
568 triggerTime(initialDelay, unit),
569 unit.toNanos(period));
570 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
571 sft.outerTask = t;
572 delayedExecute(t);
573 return t;
574 }
575
576 /**
577 * @throws RejectedExecutionException {@inheritDoc}
578 * @throws NullPointerException {@inheritDoc}
579 * @throws IllegalArgumentException {@inheritDoc}
580 */
581 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
582 long initialDelay,
583 long delay,
584 TimeUnit unit) {
585 if (command == null || unit == null)
586 throw new NullPointerException();
587 if (delay <= 0)
588 throw new IllegalArgumentException();
589 ScheduledFutureTask<Void> sft =
590 new ScheduledFutureTask<Void>(command,
591 null,
592 triggerTime(initialDelay, unit),
593 unit.toNanos(-delay));
594 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
595 sft.outerTask = t;
596 delayedExecute(t);
597 return t;
598 }
599
600 /**
601 * Executes {@code command} with zero required delay.
602 * This has effect equivalent to
603 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
604 * Note that inspections of the queue and of the list returned by
605 * {@code shutdownNow} will access the zero-delayed
606 * {@link ScheduledFuture}, not the {@code command} itself.
607 *
608 * <p>A consequence of the use of {@code ScheduledFuture} objects is
609 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
610 * called with a null second {@code Throwable} argument, even if the
611 * {@code command} terminated abruptly. Instead, the {@code Throwable}
612 * thrown by such a task can be obtained via {@link Future#get}.
613 *
614 * @throws RejectedExecutionException at discretion of
615 * {@code RejectedExecutionHandler}, if the task
616 * cannot be accepted for execution because the
617 * executor has been shut down
618 * @throws NullPointerException {@inheritDoc}
619 */
620 public void execute(Runnable command) {
621 schedule(command, 0, NANOSECONDS);
622 }
623
624 // Override AbstractExecutorService methods
625
626 /**
627 * @throws RejectedExecutionException {@inheritDoc}
628 * @throws NullPointerException {@inheritDoc}
629 */
630 public Future<?> submit(Runnable task) {
631 return schedule(task, 0, NANOSECONDS);
632 }
633
634 /**
635 * @throws RejectedExecutionException {@inheritDoc}
636 * @throws NullPointerException {@inheritDoc}
637 */
638 public <T> Future<T> submit(Runnable task, T result) {
639 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
640 }
641
642 /**
643 * @throws RejectedExecutionException {@inheritDoc}
644 * @throws NullPointerException {@inheritDoc}
645 */
646 public <T> Future<T> submit(Callable<T> task) {
647 return schedule(task, 0, NANOSECONDS);
648 }
649
650 /**
651 * Sets the policy on whether to continue executing existing
652 * periodic tasks even when this executor has been {@code shutdown}.
653 * In this case, these tasks will only terminate upon
654 * {@code shutdownNow} or after setting the policy to
655 * {@code false} when already shutdown.
656 * This value is by default {@code false}.
657 *
658 * @param value if {@code true}, continue after shutdown, else don't
659 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
660 */
661 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
662 continueExistingPeriodicTasksAfterShutdown = value;
663 if (!value && isShutdown())
664 onShutdown();
665 }
666
667 /**
668 * Gets the policy on whether to continue executing existing
669 * periodic tasks even when this executor has been {@code shutdown}.
670 * In this case, these tasks will only terminate upon
671 * {@code shutdownNow} or after setting the policy to
672 * {@code false} when already shutdown.
673 * This value is by default {@code false}.
674 *
675 * @return {@code true} if will continue after shutdown
676 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
677 */
678 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
679 return continueExistingPeriodicTasksAfterShutdown;
680 }
681
682 /**
683 * Sets the policy on whether to execute existing delayed
684 * tasks even when this executor has been {@code shutdown}.
685 * In this case, these tasks will only terminate upon
686 * {@code shutdownNow}, or after setting the policy to
687 * {@code false} when already shutdown.
688 * This value is by default {@code true}.
689 *
690 * @param value if {@code true}, execute after shutdown, else don't
691 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
692 */
693 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
694 executeExistingDelayedTasksAfterShutdown = value;
695 if (!value && isShutdown())
696 onShutdown();
697 }
698
699 /**
700 * Gets the policy on whether to execute existing delayed
701 * tasks even when this executor has been {@code shutdown}.
702 * In this case, these tasks will only terminate upon
703 * {@code shutdownNow}, or after setting the policy to
704 * {@code false} when already shutdown.
705 * This value is by default {@code true}.
706 *
707 * @return {@code true} if will execute after shutdown
708 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
709 */
710 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
711 return executeExistingDelayedTasksAfterShutdown;
712 }
713
714 /**
715 * Sets the policy on whether cancelled tasks should be immediately
716 * removed from the work queue at time of cancellation. This value is
717 * by default {@code false}.
718 *
719 * @param value if {@code true}, remove on cancellation, else don't
720 * @see #getRemoveOnCancelPolicy
721 * @since 1.7
722 */
723 public void setRemoveOnCancelPolicy(boolean value) {
724 removeOnCancel = value;
725 }
726
727 /**
728 * Gets the policy on whether cancelled tasks should be immediately
729 * removed from the work queue at time of cancellation. This value is
730 * by default {@code false}.
731 *
732 * @return {@code true} if cancelled tasks are immediately removed
733 * from the queue
734 * @see #setRemoveOnCancelPolicy
735 * @since 1.7
736 */
737 public boolean getRemoveOnCancelPolicy() {
738 return removeOnCancel;
739 }
740
741 /**
742 * Initiates an orderly shutdown in which previously submitted
743 * tasks are executed, but no new tasks will be accepted.
744 * Invocation has no additional effect if already shut down.
745 *
746 * <p>This method does not wait for previously submitted tasks to
747 * complete execution. Use {@link #awaitTermination awaitTermination}
748 * to do that.
749 *
750 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
751 * has been set {@code false}, existing delayed tasks whose delays
752 * have not yet elapsed are cancelled. And unless the {@code
753 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
754 * {@code true}, future executions of existing periodic tasks will
755 * be cancelled.
756 *
757 * @throws SecurityException {@inheritDoc}
758 */
759 public void shutdown() {
760 super.shutdown();
761 }
762
763 /**
764 * Attempts to stop all actively executing tasks, halts the
765 * processing of waiting tasks, and returns a list of the tasks
766 * that were awaiting execution.
767 *
768 * <p>This method does not wait for actively executing tasks to
769 * terminate. Use {@link #awaitTermination awaitTermination} to
770 * do that.
771 *
772 * <p>There are no guarantees beyond best-effort attempts to stop
773 * processing actively executing tasks. This implementation
774 * cancels tasks via {@link Thread#interrupt}, so any task that
775 * fails to respond to interrupts may never terminate.
776 *
777 * @return list of tasks that never commenced execution.
778 * Each element of this list is a {@link ScheduledFuture}.
779 * For tasks submitted via one of the {@code schedule}
780 * methods, the element will be identical to the returned
781 * {@code ScheduledFuture}. For tasks submitted using
782 * {@link #execute}, the element will be a zero-delay {@code
783 * ScheduledFuture}.
784 * @throws SecurityException {@inheritDoc}
785 */
786 public List<Runnable> shutdownNow() {
787 return super.shutdownNow();
788 }
789
790 /**
791 * Returns the task queue used by this executor.
792 * Each element of this list is a {@link ScheduledFuture}.
793 * For tasks submitted via one of the {@code schedule} methods, the
794 * element will be identical to the returned {@code ScheduledFuture}.
795 * For tasks submitted using {@link #execute}, the element will be a
796 * zero-delay {@code ScheduledFuture}.
797 *
798 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
799 * tasks in the order in which they will execute.
800 *
801 * @return the task queue
802 */
803 public BlockingQueue<Runnable> getQueue() {
804 return super.getQueue();
805 }
806
807 /**
808 * Specialized delay queue. To mesh with TPE declarations, this
809 * class must be declared as a BlockingQueue<Runnable> even though
810 * it can only hold RunnableScheduledFutures.
811 */
812 static class DelayedWorkQueue extends AbstractQueue<Runnable>
813 implements BlockingQueue<Runnable> {
814
815 /*
816 * A DelayedWorkQueue is based on a heap-based data structure
817 * like those in DelayQueue and PriorityQueue, except that
818 * every ScheduledFutureTask also records its index into the
819 * heap array. This eliminates the need to find a task upon
820 * cancellation, greatly speeding up removal (down from O(n)
821 * to O(log n)), and reducing garbage retention that would
822 * otherwise occur by waiting for the element to rise to top
823 * before clearing. But because the queue may also hold
824 * RunnableScheduledFutures that are not ScheduledFutureTasks,
825 * we are not guaranteed to have such indices available, in
826 * which case we fall back to linear search. (We expect that
827 * most tasks will not be decorated, and that the faster cases
828 * will be much more common.)
829 *
830 * All heap operations must record index changes -- mainly
831 * within siftUp and siftDown. Upon removal, a task's
832 * heapIndex is set to -1. Note that ScheduledFutureTasks can
833 * appear at most once in the queue (this need not be true for
834 * other kinds of tasks or work queues), so are uniquely
835 * identified by heapIndex.
836 */
837
838 private static final int INITIAL_CAPACITY = 16;
839 private RunnableScheduledFuture<?>[] queue =
840 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
841 private final ReentrantLock lock = new ReentrantLock();
842 private int size;
843
844 /**
845 * Thread designated to wait for the task at the head of the
846 * queue. This variant of the Leader-Follower pattern
847 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
848 * minimize unnecessary timed waiting. When a thread becomes
849 * the leader, it waits only for the next delay to elapse, but
850 * other threads await indefinitely. The leader thread must
851 * signal some other thread before returning from take() or
852 * poll(...), unless some other thread becomes leader in the
853 * interim. Whenever the head of the queue is replaced with a
854 * task with an earlier expiration time, the leader field is
855 * invalidated by being reset to null, and some waiting
856 * thread, but not necessarily the current leader, is
857 * signalled. So waiting threads must be prepared to acquire
858 * and lose leadership while waiting.
859 */
860 private Thread leader;
861
862 /**
863 * Condition signalled when a newer task becomes available at the
864 * head of the queue or a new thread may need to become leader.
865 */
866 private final Condition available = lock.newCondition();
867
868 /**
869 * Sets f's heapIndex if it is a ScheduledFutureTask.
870 */
871 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
872 if (f instanceof ScheduledFutureTask)
873 ((ScheduledFutureTask)f).heapIndex = idx;
874 }
875
876 /**
877 * Sifts element added at bottom up to its heap-ordered spot.
878 * Call only when holding lock.
879 */
880 private void siftUp(int k, RunnableScheduledFuture<?> key) {
881 while (k > 0) {
882 int parent = (k - 1) >>> 1;
883 RunnableScheduledFuture<?> e = queue[parent];
884 if (key.compareTo(e) >= 0)
885 break;
886 queue[k] = e;
887 setIndex(e, k);
888 k = parent;
889 }
890 queue[k] = key;
891 setIndex(key, k);
892 }
893
894 /**
895 * Sifts element added at top down to its heap-ordered spot.
896 * Call only when holding lock.
897 */
898 private void siftDown(int k, RunnableScheduledFuture<?> key) {
899 int half = size >>> 1;
900 while (k < half) {
901 int child = (k << 1) + 1;
902 RunnableScheduledFuture<?> c = queue[child];
903 int right = child + 1;
904 if (right < size && c.compareTo(queue[right]) > 0)
905 c = queue[child = right];
906 if (key.compareTo(c) <= 0)
907 break;
908 queue[k] = c;
909 setIndex(c, k);
910 k = child;
911 }
912 queue[k] = key;
913 setIndex(key, k);
914 }
915
916 /**
917 * Resizes the heap array. Call only when holding lock.
918 */
919 private void grow() {
920 int oldCapacity = queue.length;
921 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
922 if (newCapacity < 0) // overflow
923 newCapacity = Integer.MAX_VALUE;
924 queue = Arrays.copyOf(queue, newCapacity);
925 }
926
927 /**
928 * Finds index of given object, or -1 if absent.
929 */
930 private int indexOf(Object x) {
931 if (x != null) {
932 if (x instanceof ScheduledFutureTask) {
933 int i = ((ScheduledFutureTask) x).heapIndex;
934 // Sanity check; x could conceivably be a
935 // ScheduledFutureTask from some other pool.
936 if (i >= 0 && i < size && queue[i] == x)
937 return i;
938 } else {
939 for (int i = 0; i < size; i++)
940 if (x.equals(queue[i]))
941 return i;
942 }
943 }
944 return -1;
945 }
946
947 public boolean contains(Object x) {
948 final ReentrantLock lock = this.lock;
949 lock.lock();
950 try {
951 return indexOf(x) != -1;
952 } finally {
953 lock.unlock();
954 }
955 }
956
957 public boolean remove(Object x) {
958 final ReentrantLock lock = this.lock;
959 lock.lock();
960 try {
961 int i = indexOf(x);
962 if (i < 0)
963 return false;
964
965 setIndex(queue[i], -1);
966 int s = --size;
967 RunnableScheduledFuture<?> replacement = queue[s];
968 queue[s] = null;
969 if (s != i) {
970 siftDown(i, replacement);
971 if (queue[i] == replacement)
972 siftUp(i, replacement);
973 }
974 return true;
975 } finally {
976 lock.unlock();
977 }
978 }
979
980 public int size() {
981 final ReentrantLock lock = this.lock;
982 lock.lock();
983 try {
984 return size;
985 } finally {
986 lock.unlock();
987 }
988 }
989
990 public boolean isEmpty() {
991 return size() == 0;
992 }
993
994 public int remainingCapacity() {
995 return Integer.MAX_VALUE;
996 }
997
998 public RunnableScheduledFuture<?> peek() {
999 final ReentrantLock lock = this.lock;
1000 lock.lock();
1001 try {
1002 return queue[0];
1003 } finally {
1004 lock.unlock();
1005 }
1006 }
1007
1008 public boolean offer(Runnable x) {
1009 if (x == null)
1010 throw new NullPointerException();
1011 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1012 final ReentrantLock lock = this.lock;
1013 lock.lock();
1014 try {
1015 int i = size;
1016 if (i >= queue.length)
1017 grow();
1018 size = i + 1;
1019 if (i == 0) {
1020 queue[0] = e;
1021 setIndex(e, 0);
1022 } else {
1023 siftUp(i, e);
1024 }
1025 if (queue[0] == e) {
1026 leader = null;
1027 available.signal();
1028 }
1029 } finally {
1030 lock.unlock();
1031 }
1032 return true;
1033 }
1034
1035 public void put(Runnable e) {
1036 offer(e);
1037 }
1038
1039 public boolean add(Runnable e) {
1040 return offer(e);
1041 }
1042
1043 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1044 return offer(e);
1045 }
1046
1047 /**
1048 * Performs common bookkeeping for poll and take: Replaces
1049 * first element with last and sifts it down. Call only when
1050 * holding lock.
1051 * @param f the task to remove and return
1052 */
1053 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1054 int s = --size;
1055 RunnableScheduledFuture<?> x = queue[s];
1056 queue[s] = null;
1057 if (s != 0)
1058 siftDown(0, x);
1059 setIndex(f, -1);
1060 return f;
1061 }
1062
1063 public RunnableScheduledFuture<?> poll() {
1064 final ReentrantLock lock = this.lock;
1065 lock.lock();
1066 try {
1067 RunnableScheduledFuture<?> first = queue[0];
1068 if (first == null || first.getDelay(NANOSECONDS) > 0)
1069 return null;
1070 else
1071 return finishPoll(first);
1072 } finally {
1073 lock.unlock();
1074 }
1075 }
1076
1077 public RunnableScheduledFuture<?> take() throws InterruptedException {
1078 final ReentrantLock lock = this.lock;
1079 lock.lockInterruptibly();
1080 try {
1081 for (;;) {
1082 RunnableScheduledFuture<?> first = queue[0];
1083 if (first == null)
1084 available.await();
1085 else {
1086 long delay = first.getDelay(NANOSECONDS);
1087 if (delay <= 0)
1088 return finishPoll(first);
1089 first = null; // don't retain ref while waiting
1090 if (leader != null)
1091 available.await();
1092 else {
1093 Thread thisThread = Thread.currentThread();
1094 leader = thisThread;
1095 try {
1096 available.awaitNanos(delay);
1097 } finally {
1098 if (leader == thisThread)
1099 leader = null;
1100 }
1101 }
1102 }
1103 }
1104 } finally {
1105 if (leader == null && queue[0] != null)
1106 available.signal();
1107 lock.unlock();
1108 }
1109 }
1110
1111 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1112 throws InterruptedException {
1113 long nanos = unit.toNanos(timeout);
1114 final ReentrantLock lock = this.lock;
1115 lock.lockInterruptibly();
1116 try {
1117 for (;;) {
1118 RunnableScheduledFuture<?> first = queue[0];
1119 if (first == null) {
1120 if (nanos <= 0)
1121 return null;
1122 else
1123 nanos = available.awaitNanos(nanos);
1124 } else {
1125 long delay = first.getDelay(NANOSECONDS);
1126 if (delay <= 0)
1127 return finishPoll(first);
1128 if (nanos <= 0)
1129 return null;
1130 first = null; // don't retain ref while waiting
1131 if (nanos < delay || leader != null)
1132 nanos = available.awaitNanos(nanos);
1133 else {
1134 Thread thisThread = Thread.currentThread();
1135 leader = thisThread;
1136 try {
1137 long timeLeft = available.awaitNanos(delay);
1138 nanos -= delay - timeLeft;
1139 } finally {
1140 if (leader == thisThread)
1141 leader = null;
1142 }
1143 }
1144 }
1145 }
1146 } finally {
1147 if (leader == null && queue[0] != null)
1148 available.signal();
1149 lock.unlock();
1150 }
1151 }
1152
1153 public void clear() {
1154 final ReentrantLock lock = this.lock;
1155 lock.lock();
1156 try {
1157 for (int i = 0; i < size; i++) {
1158 RunnableScheduledFuture<?> t = queue[i];
1159 if (t != null) {
1160 queue[i] = null;
1161 setIndex(t, -1);
1162 }
1163 }
1164 size = 0;
1165 } finally {
1166 lock.unlock();
1167 }
1168 }
1169
1170 /**
1171 * Returns first element only if it is expired.
1172 * Used only by drainTo. Call only when holding lock.
1173 */
1174 private RunnableScheduledFuture<?> peekExpired() {
1175 // assert lock.isHeldByCurrentThread();
1176 RunnableScheduledFuture<?> first = queue[0];
1177 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1178 null : first;
1179 }
1180
1181 public int drainTo(Collection<? super Runnable> c) {
1182 if (c == null)
1183 throw new NullPointerException();
1184 if (c == this)
1185 throw new IllegalArgumentException();
1186 final ReentrantLock lock = this.lock;
1187 lock.lock();
1188 try {
1189 RunnableScheduledFuture<?> first;
1190 int n = 0;
1191 while ((first = peekExpired()) != null) {
1192 c.add(first); // In this order, in case add() throws.
1193 finishPoll(first);
1194 ++n;
1195 }
1196 return n;
1197 } finally {
1198 lock.unlock();
1199 }
1200 }
1201
1202 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1203 if (c == null)
1204 throw new NullPointerException();
1205 if (c == this)
1206 throw new IllegalArgumentException();
1207 if (maxElements <= 0)
1208 return 0;
1209 final ReentrantLock lock = this.lock;
1210 lock.lock();
1211 try {
1212 RunnableScheduledFuture<?> first;
1213 int n = 0;
1214 while (n < maxElements && (first = peekExpired()) != null) {
1215 c.add(first); // In this order, in case add() throws.
1216 finishPoll(first);
1217 ++n;
1218 }
1219 return n;
1220 } finally {
1221 lock.unlock();
1222 }
1223 }
1224
1225 public Object[] toArray() {
1226 final ReentrantLock lock = this.lock;
1227 lock.lock();
1228 try {
1229 return Arrays.copyOf(queue, size, Object[].class);
1230 } finally {
1231 lock.unlock();
1232 }
1233 }
1234
1235 @SuppressWarnings("unchecked")
1236 public <T> T[] toArray(T[] a) {
1237 final ReentrantLock lock = this.lock;
1238 lock.lock();
1239 try {
1240 if (a.length < size)
1241 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1242 System.arraycopy(queue, 0, a, 0, size);
1243 if (a.length > size)
1244 a[size] = null;
1245 return a;
1246 } finally {
1247 lock.unlock();
1248 }
1249 }
1250
1251 public Iterator<Runnable> iterator() {
1252 return new Itr(Arrays.copyOf(queue, size));
1253 }
1254
1255 /**
1256 * Snapshot iterator that works off copy of underlying q array.
1257 */
1258 private class Itr implements Iterator<Runnable> {
1259 final RunnableScheduledFuture<?>[] array;
1260 int cursor = 0; // index of next element to return
1261 int lastRet = -1; // index of last element, or -1 if no such
1262
1263 Itr(RunnableScheduledFuture<?>[] array) {
1264 this.array = array;
1265 }
1266
1267 public boolean hasNext() {
1268 return cursor < array.length;
1269 }
1270
1271 public Runnable next() {
1272 if (cursor >= array.length)
1273 throw new NoSuchElementException();
1274 lastRet = cursor;
1275 return array[cursor++];
1276 }
1277
1278 public void remove() {
1279 if (lastRet < 0)
1280 throw new IllegalStateException();
1281 DelayedWorkQueue.this.remove(array[lastRet]);
1282 lastRet = -1;
1283 }
1284 }
1285 }
1286 }