ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.99
Committed: Fri Nov 20 19:54:03 2015 UTC (8 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.98: +6 -7 lines
Log Message:
improve some comments

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 /**
23 * A {@link ThreadPoolExecutor} that can additionally schedule
24 * commands to run after a given delay, or to execute periodically.
25 * This class is preferable to {@link java.util.Timer} when multiple
26 * worker threads are needed, or when the additional flexibility or
27 * capabilities of {@link ThreadPoolExecutor} (which this class
28 * extends) are required.
29 *
30 * <p>Delayed tasks execute no sooner than they are enabled, but
31 * without any real-time guarantees about when, after they are
32 * enabled, they will commence. Tasks scheduled for exactly the same
33 * execution time are enabled in first-in-first-out (FIFO) order of
34 * submission.
35 *
36 * <p>When a submitted task is cancelled before it is run, execution
37 * is suppressed. By default, such a cancelled task is not
38 * automatically removed from the work queue until its delay elapses.
39 * While this enables further inspection and monitoring, it may also
40 * cause unbounded retention of cancelled tasks. To avoid this, use
41 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42 * removed from the work queue at time of cancellation.
43 *
44 * <p>Successive executions of a periodic task scheduled via
45 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 * do not overlap. While different executions may be performed by
48 * different threads, the effects of prior executions
49 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 * those of subsequent ones.
51 *
52 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 * of the inherited tuning methods are not useful for it. In
54 * particular, because it acts as a fixed-sized pool using
55 * {@code corePoolSize} threads and an unbounded queue, adjustments
56 * to {@code maximumPoolSize} have no useful effect. Additionally, it
57 * is almost never a good idea to set {@code corePoolSize} to zero or
58 * use {@code allowCoreThreadTimeOut} because this may leave the pool
59 * without threads to handle tasks once they become eligible to run.
60 *
61 * <p><b>Extension notes:</b> This class overrides the
62 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 * {@link AbstractExecutorService#submit(Runnable) submit}
64 * methods to generate internal {@link ScheduledFuture} objects to
65 * control per-task delays and scheduling. To preserve
66 * functionality, any further overrides of these methods in
67 * subclasses must invoke superclass versions, which effectively
68 * disables additional task customization. However, this class
69 * provides alternative protected extension method
70 * {@code decorateTask} (one version each for {@code Runnable} and
71 * {@code Callable}) that can be used to customize the concrete task
72 * types used to execute commands entered via {@code execute},
73 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74 * and {@code scheduleWithFixedDelay}. By default, a
75 * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 * {@link FutureTask}. However, this may be modified or replaced using
77 * subclasses of the form:
78 *
79 * <pre> {@code
80 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81 *
82 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 *
84 * protected <V> RunnableScheduledFuture<V> decorateTask(
85 * Runnable r, RunnableScheduledFuture<V> task) {
86 * return new CustomTask<V>(r, task);
87 * }
88 *
89 * protected <V> RunnableScheduledFuture<V> decorateTask(
90 * Callable<V> c, RunnableScheduledFuture<V> task) {
91 * return new CustomTask<V>(c, task);
92 * }
93 * // ... add constructors, etc.
94 * }}</pre>
95 *
96 * @since 1.5
97 * @author Doug Lea
98 */
99 public class ScheduledThreadPoolExecutor
100 extends ThreadPoolExecutor
101 implements ScheduledExecutorService {
102
103 /*
104 * This class specializes ThreadPoolExecutor implementation by
105 *
106 * 1. Using a custom task type ScheduledFutureTask, even for tasks
107 * that don't require scheduling because they are submitted
108 * using ExecutorService rather than ScheduledExecutorService
109 * methods, which are treated as tasks with a delay of zero.
110 *
111 * 2. Using a custom queue (DelayedWorkQueue), a variant of
112 * unbounded DelayQueue. The lack of capacity constraint and
113 * the fact that corePoolSize and maximumPoolSize are
114 * effectively identical simplifies some execution mechanics
115 * (see delayedExecute) compared to ThreadPoolExecutor.
116 *
117 * 3. Supporting optional run-after-shutdown parameters, which
118 * leads to overrides of shutdown methods to remove and cancel
119 * tasks that should NOT be run after shutdown, as well as
120 * different recheck logic when task (re)submission overlaps
121 * with a shutdown.
122 *
123 * 4. Task decoration methods to allow interception and
124 * instrumentation, which are needed because subclasses cannot
125 * otherwise override submit methods to get this effect. These
126 * don't have any impact on pool control logic though.
127 */
128
129 /**
130 * False if should cancel/suppress periodic tasks on shutdown.
131 */
132 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
133
134 /**
135 * False if should cancel non-periodic tasks on shutdown.
136 */
137 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
138
139 /**
140 * True if ScheduledFutureTask.cancel should remove from queue.
141 */
142 volatile boolean removeOnCancel;
143
144 /**
145 * Sequence number to break scheduling ties, and in turn to
146 * guarantee FIFO order among tied entries.
147 */
148 private static final AtomicLong sequencer = new AtomicLong();
149
150 private class ScheduledFutureTask<V>
151 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152
153 /** Sequence number to break ties FIFO */
154 private final long sequenceNumber;
155
156 /** The nanoTime-based time when the task is enabled to execute. */
157 private volatile long time;
158
159 /**
160 * Period for repeating tasks, in nanoseconds.
161 * A positive value indicates fixed-rate execution.
162 * A negative value indicates fixed-delay execution.
163 * A value of 0 indicates a non-repeating (one-shot) task.
164 */
165 private final long period;
166
167 /** The actual task to be re-enqueued by reExecutePeriodic */
168 RunnableScheduledFuture<V> outerTask = this;
169
170 /**
171 * Index into delay queue, to support faster cancellation.
172 */
173 int heapIndex;
174
175 /**
176 * Creates a one-shot action with given nanoTime-based trigger time.
177 */
178 ScheduledFutureTask(Runnable r, V result, long triggerTime,
179 long sequenceNumber) {
180 super(r, result);
181 this.time = triggerTime;
182 this.period = 0;
183 this.sequenceNumber = sequenceNumber;
184 }
185
186 /**
187 * Creates a periodic action with given nanoTime-based initial
188 * trigger time and period.
189 */
190 ScheduledFutureTask(Runnable r, V result, long triggerTime,
191 long period, long sequenceNumber) {
192 super(r, result);
193 this.time = triggerTime;
194 this.period = period;
195 this.sequenceNumber = sequenceNumber;
196 }
197
198 /**
199 * Creates a one-shot action with given nanoTime-based trigger time.
200 */
201 ScheduledFutureTask(Callable<V> callable, long triggerTime,
202 long sequenceNumber) {
203 super(callable);
204 this.time = triggerTime;
205 this.period = 0;
206 this.sequenceNumber = sequenceNumber;
207 }
208
209 public long getDelay(TimeUnit unit) {
210 return unit.convert(time - System.nanoTime(), NANOSECONDS);
211 }
212
213 public int compareTo(Delayed other) {
214 if (other == this) // compare zero if same object
215 return 0;
216 if (other instanceof ScheduledFutureTask) {
217 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
218 long diff = time - x.time;
219 if (diff < 0)
220 return -1;
221 else if (diff > 0)
222 return 1;
223 else if (sequenceNumber < x.sequenceNumber)
224 return -1;
225 else
226 return 1;
227 }
228 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
229 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
230 }
231
232 /**
233 * Returns {@code true} if this is a periodic (not a one-shot) action.
234 *
235 * @return {@code true} if periodic
236 */
237 public boolean isPeriodic() {
238 return period != 0;
239 }
240
241 /**
242 * Sets the next time to run for a periodic task.
243 */
244 private void setNextRunTime() {
245 long p = period;
246 if (p > 0)
247 time += p;
248 else
249 time = triggerTime(-p);
250 }
251
252 public boolean cancel(boolean mayInterruptIfRunning) {
253 boolean cancelled = super.cancel(mayInterruptIfRunning);
254 if (cancelled && removeOnCancel && heapIndex >= 0)
255 remove(this);
256 return cancelled;
257 }
258
259 /**
260 * Overrides FutureTask version so as to reset/requeue if periodic.
261 */
262 public void run() {
263 boolean periodic = isPeriodic();
264 if (!canRunInCurrentRunState(periodic))
265 cancel(false);
266 else if (!periodic)
267 super.run();
268 else if (super.runAndReset()) {
269 setNextRunTime();
270 reExecutePeriodic(outerTask);
271 }
272 }
273 }
274
275 /**
276 * Returns true if can run a task given current run state
277 * and run-after-shutdown parameters.
278 *
279 * @param periodic true if this task periodic, false if delayed
280 */
281 boolean canRunInCurrentRunState(boolean periodic) {
282 return isRunningOrShutdown(periodic ?
283 continueExistingPeriodicTasksAfterShutdown :
284 executeExistingDelayedTasksAfterShutdown);
285 }
286
287 /**
288 * Main execution method for delayed or periodic tasks. If pool
289 * is shut down, rejects the task. Otherwise adds task to queue
290 * and starts a thread, if necessary, to run it. (We cannot
291 * prestart the thread to run the task because the task (probably)
292 * shouldn't be run yet.) If the pool is shut down while the task
293 * is being added, cancel and remove it if required by state and
294 * run-after-shutdown parameters.
295 *
296 * @param task the task
297 */
298 private void delayedExecute(RunnableScheduledFuture<?> task) {
299 if (isShutdown())
300 reject(task);
301 else {
302 super.getQueue().add(task);
303 if (isShutdown() &&
304 !canRunInCurrentRunState(task.isPeriodic()) &&
305 remove(task))
306 task.cancel(false);
307 else
308 ensurePrestart();
309 }
310 }
311
312 /**
313 * Requeues a periodic task unless current run state precludes it.
314 * Same idea as delayedExecute except drops task rather than rejecting.
315 *
316 * @param task the task
317 */
318 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
319 if (canRunInCurrentRunState(true)) {
320 super.getQueue().add(task);
321 if (!canRunInCurrentRunState(true) && remove(task))
322 task.cancel(false);
323 else
324 ensurePrestart();
325 }
326 }
327
328 /**
329 * Cancels and clears the queue of all tasks that should not be run
330 * due to shutdown policy. Invoked within super.shutdown.
331 */
332 @Override void onShutdown() {
333 BlockingQueue<Runnable> q = super.getQueue();
334 boolean keepDelayed =
335 getExecuteExistingDelayedTasksAfterShutdownPolicy();
336 boolean keepPeriodic =
337 getContinueExistingPeriodicTasksAfterShutdownPolicy();
338 if (!keepDelayed && !keepPeriodic) {
339 for (Object e : q.toArray())
340 if (e instanceof RunnableScheduledFuture<?>)
341 ((RunnableScheduledFuture<?>) e).cancel(false);
342 q.clear();
343 }
344 else {
345 // Traverse snapshot to avoid iterator exceptions
346 for (Object e : q.toArray()) {
347 if (e instanceof RunnableScheduledFuture) {
348 RunnableScheduledFuture<?> t =
349 (RunnableScheduledFuture<?>)e;
350 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
351 t.isCancelled()) { // also remove if already cancelled
352 if (q.remove(t))
353 t.cancel(false);
354 }
355 }
356 }
357 }
358 tryTerminate();
359 }
360
361 /**
362 * Modifies or replaces the task used to execute a runnable.
363 * This method can be used to override the concrete
364 * class used for managing internal tasks.
365 * The default implementation simply returns the given task.
366 *
367 * @param runnable the submitted Runnable
368 * @param task the task created to execute the runnable
369 * @param <V> the type of the task's result
370 * @return a task that can execute the runnable
371 * @since 1.6
372 */
373 protected <V> RunnableScheduledFuture<V> decorateTask(
374 Runnable runnable, RunnableScheduledFuture<V> task) {
375 return task;
376 }
377
378 /**
379 * Modifies or replaces the task used to execute a callable.
380 * This method can be used to override the concrete
381 * class used for managing internal tasks.
382 * The default implementation simply returns the given task.
383 *
384 * @param callable the submitted Callable
385 * @param task the task created to execute the callable
386 * @param <V> the type of the task's result
387 * @return a task that can execute the callable
388 * @since 1.6
389 */
390 protected <V> RunnableScheduledFuture<V> decorateTask(
391 Callable<V> callable, RunnableScheduledFuture<V> task) {
392 return task;
393 }
394
395 /**
396 * The default keep-alive time for pool threads.
397 *
398 * Normally, this value is unused because all pool threads will be
399 * core threads, but if a user creates a pool with a corePoolSize
400 * of zero (against our advice), we keep a thread alive as long as
401 * there are queued tasks. If the keep alive time is zero (the
402 * historic value), we end up hot-spinning in getTask, wasting a
403 * CPU. But on the other hand, if we set the value too high, and
404 * users create a one-shot pool which they don't cleanly shutdown,
405 * the pool's non-daemon threads will prevent JVM termination. A
406 * small but non-zero value (relative to a JVM's lifetime) seems
407 * best.
408 */
409 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
410
411 /**
412 * Creates a new {@code ScheduledThreadPoolExecutor} with the
413 * given core pool size.
414 *
415 * @param corePoolSize the number of threads to keep in the pool, even
416 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
417 * @throws IllegalArgumentException if {@code corePoolSize < 0}
418 */
419 public ScheduledThreadPoolExecutor(int corePoolSize) {
420 super(corePoolSize, Integer.MAX_VALUE,
421 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
422 new DelayedWorkQueue());
423 }
424
425 /**
426 * Creates a new {@code ScheduledThreadPoolExecutor} with the
427 * given initial parameters.
428 *
429 * @param corePoolSize the number of threads to keep in the pool, even
430 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431 * @param threadFactory the factory to use when the executor
432 * creates a new thread
433 * @throws IllegalArgumentException if {@code corePoolSize < 0}
434 * @throws NullPointerException if {@code threadFactory} is null
435 */
436 public ScheduledThreadPoolExecutor(int corePoolSize,
437 ThreadFactory threadFactory) {
438 super(corePoolSize, Integer.MAX_VALUE,
439 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
440 new DelayedWorkQueue(), threadFactory);
441 }
442
443 /**
444 * Creates a new {@code ScheduledThreadPoolExecutor} with the
445 * given initial parameters.
446 *
447 * @param corePoolSize the number of threads to keep in the pool, even
448 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
449 * @param handler the handler to use when execution is blocked
450 * because the thread bounds and queue capacities are reached
451 * @throws IllegalArgumentException if {@code corePoolSize < 0}
452 * @throws NullPointerException if {@code handler} is null
453 */
454 public ScheduledThreadPoolExecutor(int corePoolSize,
455 RejectedExecutionHandler handler) {
456 super(corePoolSize, Integer.MAX_VALUE,
457 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
458 new DelayedWorkQueue(), handler);
459 }
460
461 /**
462 * Creates a new {@code ScheduledThreadPoolExecutor} with the
463 * given initial parameters.
464 *
465 * @param corePoolSize the number of threads to keep in the pool, even
466 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
467 * @param threadFactory the factory to use when the executor
468 * creates a new thread
469 * @param handler the handler to use when execution is blocked
470 * because the thread bounds and queue capacities are reached
471 * @throws IllegalArgumentException if {@code corePoolSize < 0}
472 * @throws NullPointerException if {@code threadFactory} or
473 * {@code handler} is null
474 */
475 public ScheduledThreadPoolExecutor(int corePoolSize,
476 ThreadFactory threadFactory,
477 RejectedExecutionHandler handler) {
478 super(corePoolSize, Integer.MAX_VALUE,
479 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
480 new DelayedWorkQueue(), threadFactory, handler);
481 }
482
483 /**
484 * Returns the nanoTime-based trigger time of a delayed action.
485 */
486 private long triggerTime(long delay, TimeUnit unit) {
487 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
488 }
489
490 /**
491 * Returns the nanoTime-based trigger time of a delayed action.
492 */
493 long triggerTime(long delay) {
494 return System.nanoTime() +
495 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
496 }
497
498 /**
499 * Constrains the values of all delays in the queue to be within
500 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
501 * This may occur if a task is eligible to be dequeued, but has
502 * not yet been, while some other task is added with a delay of
503 * Long.MAX_VALUE.
504 */
505 private long overflowFree(long delay) {
506 Delayed head = (Delayed) super.getQueue().peek();
507 if (head != null) {
508 long headDelay = head.getDelay(NANOSECONDS);
509 if (headDelay < 0 && (delay - headDelay < 0))
510 delay = Long.MAX_VALUE + headDelay;
511 }
512 return delay;
513 }
514
515 /**
516 * @throws RejectedExecutionException {@inheritDoc}
517 * @throws NullPointerException {@inheritDoc}
518 */
519 public ScheduledFuture<?> schedule(Runnable command,
520 long delay,
521 TimeUnit unit) {
522 if (command == null || unit == null)
523 throw new NullPointerException();
524 RunnableScheduledFuture<Void> t = decorateTask(command,
525 new ScheduledFutureTask<Void>(command, null,
526 triggerTime(delay, unit),
527 sequencer.getAndIncrement()));
528 delayedExecute(t);
529 return t;
530 }
531
532 /**
533 * @throws RejectedExecutionException {@inheritDoc}
534 * @throws NullPointerException {@inheritDoc}
535 */
536 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
537 long delay,
538 TimeUnit unit) {
539 if (callable == null || unit == null)
540 throw new NullPointerException();
541 RunnableScheduledFuture<V> t = decorateTask(callable,
542 new ScheduledFutureTask<V>(callable,
543 triggerTime(delay, unit),
544 sequencer.getAndIncrement()));
545 delayedExecute(t);
546 return t;
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 * @throws IllegalArgumentException {@inheritDoc}
553 */
554 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
555 long initialDelay,
556 long period,
557 TimeUnit unit) {
558 if (command == null || unit == null)
559 throw new NullPointerException();
560 if (period <= 0L)
561 throw new IllegalArgumentException();
562 ScheduledFutureTask<Void> sft =
563 new ScheduledFutureTask<Void>(command,
564 null,
565 triggerTime(initialDelay, unit),
566 unit.toNanos(period),
567 sequencer.getAndIncrement());
568 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
569 sft.outerTask = t;
570 delayedExecute(t);
571 return t;
572 }
573
574 /**
575 * @throws RejectedExecutionException {@inheritDoc}
576 * @throws NullPointerException {@inheritDoc}
577 * @throws IllegalArgumentException {@inheritDoc}
578 */
579 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
580 long initialDelay,
581 long delay,
582 TimeUnit unit) {
583 if (command == null || unit == null)
584 throw new NullPointerException();
585 if (delay <= 0L)
586 throw new IllegalArgumentException();
587 ScheduledFutureTask<Void> sft =
588 new ScheduledFutureTask<Void>(command,
589 null,
590 triggerTime(initialDelay, unit),
591 -unit.toNanos(delay),
592 sequencer.getAndIncrement());
593 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
594 sft.outerTask = t;
595 delayedExecute(t);
596 return t;
597 }
598
599 /**
600 * Executes {@code command} with zero required delay.
601 * This has effect equivalent to
602 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
603 * Note that inspections of the queue and of the list returned by
604 * {@code shutdownNow} will access the zero-delayed
605 * {@link ScheduledFuture}, not the {@code command} itself.
606 *
607 * <p>A consequence of the use of {@code ScheduledFuture} objects is
608 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
609 * called with a null second {@code Throwable} argument, even if the
610 * {@code command} terminated abruptly. Instead, the {@code Throwable}
611 * thrown by such a task can be obtained via {@link Future#get}.
612 *
613 * @throws RejectedExecutionException at discretion of
614 * {@code RejectedExecutionHandler}, if the task
615 * cannot be accepted for execution because the
616 * executor has been shut down
617 * @throws NullPointerException {@inheritDoc}
618 */
619 public void execute(Runnable command) {
620 schedule(command, 0, NANOSECONDS);
621 }
622
623 // Override AbstractExecutorService methods
624
625 /**
626 * @throws RejectedExecutionException {@inheritDoc}
627 * @throws NullPointerException {@inheritDoc}
628 */
629 public Future<?> submit(Runnable task) {
630 return schedule(task, 0, NANOSECONDS);
631 }
632
633 /**
634 * @throws RejectedExecutionException {@inheritDoc}
635 * @throws NullPointerException {@inheritDoc}
636 */
637 public <T> Future<T> submit(Runnable task, T result) {
638 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
639 }
640
641 /**
642 * @throws RejectedExecutionException {@inheritDoc}
643 * @throws NullPointerException {@inheritDoc}
644 */
645 public <T> Future<T> submit(Callable<T> task) {
646 return schedule(task, 0, NANOSECONDS);
647 }
648
649 /**
650 * Sets the policy on whether to continue executing existing
651 * periodic tasks even when this executor has been {@code shutdown}.
652 * In this case, these tasks will only terminate upon
653 * {@code shutdownNow} or after setting the policy to
654 * {@code false} when already shutdown.
655 * This value is by default {@code false}.
656 *
657 * @param value if {@code true}, continue after shutdown, else don't
658 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
659 */
660 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
661 continueExistingPeriodicTasksAfterShutdown = value;
662 if (!value && isShutdown())
663 onShutdown();
664 }
665
666 /**
667 * Gets the policy on whether to continue executing existing
668 * periodic tasks even when this executor has been {@code shutdown}.
669 * In this case, these tasks will only terminate upon
670 * {@code shutdownNow} or after setting the policy to
671 * {@code false} when already shutdown.
672 * This value is by default {@code false}.
673 *
674 * @return {@code true} if will continue after shutdown
675 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
676 */
677 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
678 return continueExistingPeriodicTasksAfterShutdown;
679 }
680
681 /**
682 * Sets the policy on whether to execute existing delayed
683 * tasks even when this executor has been {@code shutdown}.
684 * In this case, these tasks will only terminate upon
685 * {@code shutdownNow}, or after setting the policy to
686 * {@code false} when already shutdown.
687 * This value is by default {@code true}.
688 *
689 * @param value if {@code true}, execute after shutdown, else don't
690 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
691 */
692 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
693 executeExistingDelayedTasksAfterShutdown = value;
694 if (!value && isShutdown())
695 onShutdown();
696 }
697
698 /**
699 * Gets the policy on whether to execute existing delayed
700 * tasks even when this executor has been {@code shutdown}.
701 * In this case, these tasks will only terminate upon
702 * {@code shutdownNow}, or after setting the policy to
703 * {@code false} when already shutdown.
704 * This value is by default {@code true}.
705 *
706 * @return {@code true} if will execute after shutdown
707 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
708 */
709 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
710 return executeExistingDelayedTasksAfterShutdown;
711 }
712
713 /**
714 * Sets the policy on whether cancelled tasks should be immediately
715 * removed from the work queue at time of cancellation. This value is
716 * by default {@code false}.
717 *
718 * @param value if {@code true}, remove on cancellation, else don't
719 * @see #getRemoveOnCancelPolicy
720 * @since 1.7
721 */
722 public void setRemoveOnCancelPolicy(boolean value) {
723 removeOnCancel = value;
724 }
725
726 /**
727 * Gets the policy on whether cancelled tasks should be immediately
728 * removed from the work queue at time of cancellation. This value is
729 * by default {@code false}.
730 *
731 * @return {@code true} if cancelled tasks are immediately removed
732 * from the queue
733 * @see #setRemoveOnCancelPolicy
734 * @since 1.7
735 */
736 public boolean getRemoveOnCancelPolicy() {
737 return removeOnCancel;
738 }
739
740 /**
741 * Initiates an orderly shutdown in which previously submitted
742 * tasks are executed, but no new tasks will be accepted.
743 * Invocation has no additional effect if already shut down.
744 *
745 * <p>This method does not wait for previously submitted tasks to
746 * complete execution. Use {@link #awaitTermination awaitTermination}
747 * to do that.
748 *
749 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
750 * has been set {@code false}, existing delayed tasks whose delays
751 * have not yet elapsed are cancelled. And unless the {@code
752 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
753 * {@code true}, future executions of existing periodic tasks will
754 * be cancelled.
755 *
756 * @throws SecurityException {@inheritDoc}
757 */
758 public void shutdown() {
759 super.shutdown();
760 }
761
762 /**
763 * Attempts to stop all actively executing tasks, halts the
764 * processing of waiting tasks, and returns a list of the tasks
765 * that were awaiting execution. These tasks are drained (removed)
766 * from the task queue upon return from this method.
767 *
768 * <p>This method does not wait for actively executing tasks to
769 * terminate. Use {@link #awaitTermination awaitTermination} to
770 * do that.
771 *
772 * <p>There are no guarantees beyond best-effort attempts to stop
773 * processing actively executing tasks. This implementation
774 * interrupts tasks via {@link Thread#interrupt}; any task that
775 * fails to respond to interrupts may never terminate.
776 *
777 * @return list of tasks that never commenced execution.
778 * Each element of this list is a {@link ScheduledFuture}.
779 * For tasks submitted via one of the {@code schedule}
780 * methods, the element will be identical to the returned
781 * {@code ScheduledFuture}. For tasks submitted using
782 * {@link #execute execute}, the element will be a
783 * zero-delay {@code ScheduledFuture}.
784 * @throws SecurityException {@inheritDoc}
785 */
786 public List<Runnable> shutdownNow() {
787 return super.shutdownNow();
788 }
789
790 /**
791 * Returns the task queue used by this executor. Access to the
792 * task queue is intended primarily for debugging and monitoring.
793 * This queue may be in active use. Retrieving the task queue
794 * does not prevent queued tasks from executing.
795 *
796 * <p>Each element of this queue is a {@link ScheduledFuture}.
797 * For tasks submitted via one of the {@code schedule} methods, the
798 * element will be identical to the returned {@code ScheduledFuture}.
799 * For tasks submitted using {@link #execute execute}, the element
800 * will be a zero-delay {@code ScheduledFuture}.
801 *
802 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
803 * tasks in the order in which they will execute.
804 *
805 * @return the task queue
806 */
807 public BlockingQueue<Runnable> getQueue() {
808 return super.getQueue();
809 }
810
811 /**
812 * Specialized delay queue. To mesh with TPE declarations, this
813 * class must be declared as a BlockingQueue<Runnable> even though
814 * it can only hold RunnableScheduledFutures.
815 */
816 static class DelayedWorkQueue extends AbstractQueue<Runnable>
817 implements BlockingQueue<Runnable> {
818
819 /*
820 * A DelayedWorkQueue is based on a heap-based data structure
821 * like those in DelayQueue and PriorityQueue, except that
822 * every ScheduledFutureTask also records its index into the
823 * heap array. This eliminates the need to find a task upon
824 * cancellation, greatly speeding up removal (down from O(n)
825 * to O(log n)), and reducing garbage retention that would
826 * otherwise occur by waiting for the element to rise to top
827 * before clearing. But because the queue may also hold
828 * RunnableScheduledFutures that are not ScheduledFutureTasks,
829 * we are not guaranteed to have such indices available, in
830 * which case we fall back to linear search. (We expect that
831 * most tasks will not be decorated, and that the faster cases
832 * will be much more common.)
833 *
834 * All heap operations must record index changes -- mainly
835 * within siftUp and siftDown. Upon removal, a task's
836 * heapIndex is set to -1. Note that ScheduledFutureTasks can
837 * appear at most once in the queue (this need not be true for
838 * other kinds of tasks or work queues), so are uniquely
839 * identified by heapIndex.
840 */
841
842 private static final int INITIAL_CAPACITY = 16;
843 private RunnableScheduledFuture<?>[] queue =
844 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
845 private final ReentrantLock lock = new ReentrantLock();
846 private int size;
847
848 /**
849 * Thread designated to wait for the task at the head of the
850 * queue. This variant of the Leader-Follower pattern
851 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
852 * minimize unnecessary timed waiting. When a thread becomes
853 * the leader, it waits only for the next delay to elapse, but
854 * other threads await indefinitely. The leader thread must
855 * signal some other thread before returning from take() or
856 * poll(...), unless some other thread becomes leader in the
857 * interim. Whenever the head of the queue is replaced with a
858 * task with an earlier expiration time, the leader field is
859 * invalidated by being reset to null, and some waiting
860 * thread, but not necessarily the current leader, is
861 * signalled. So waiting threads must be prepared to acquire
862 * and lose leadership while waiting.
863 */
864 private Thread leader;
865
866 /**
867 * Condition signalled when a newer task becomes available at the
868 * head of the queue or a new thread may need to become leader.
869 */
870 private final Condition available = lock.newCondition();
871
872 /**
873 * Sets f's heapIndex if it is a ScheduledFutureTask.
874 */
875 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
876 if (f instanceof ScheduledFutureTask)
877 ((ScheduledFutureTask)f).heapIndex = idx;
878 }
879
880 /**
881 * Sifts element added at bottom up to its heap-ordered spot.
882 * Call only when holding lock.
883 */
884 private void siftUp(int k, RunnableScheduledFuture<?> key) {
885 while (k > 0) {
886 int parent = (k - 1) >>> 1;
887 RunnableScheduledFuture<?> e = queue[parent];
888 if (key.compareTo(e) >= 0)
889 break;
890 queue[k] = e;
891 setIndex(e, k);
892 k = parent;
893 }
894 queue[k] = key;
895 setIndex(key, k);
896 }
897
898 /**
899 * Sifts element added at top down to its heap-ordered spot.
900 * Call only when holding lock.
901 */
902 private void siftDown(int k, RunnableScheduledFuture<?> key) {
903 int half = size >>> 1;
904 while (k < half) {
905 int child = (k << 1) + 1;
906 RunnableScheduledFuture<?> c = queue[child];
907 int right = child + 1;
908 if (right < size && c.compareTo(queue[right]) > 0)
909 c = queue[child = right];
910 if (key.compareTo(c) <= 0)
911 break;
912 queue[k] = c;
913 setIndex(c, k);
914 k = child;
915 }
916 queue[k] = key;
917 setIndex(key, k);
918 }
919
920 /**
921 * Resizes the heap array. Call only when holding lock.
922 */
923 private void grow() {
924 int oldCapacity = queue.length;
925 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
926 if (newCapacity < 0) // overflow
927 newCapacity = Integer.MAX_VALUE;
928 queue = Arrays.copyOf(queue, newCapacity);
929 }
930
931 /**
932 * Finds index of given object, or -1 if absent.
933 */
934 private int indexOf(Object x) {
935 if (x != null) {
936 if (x instanceof ScheduledFutureTask) {
937 int i = ((ScheduledFutureTask) x).heapIndex;
938 // Sanity check; x could conceivably be a
939 // ScheduledFutureTask from some other pool.
940 if (i >= 0 && i < size && queue[i] == x)
941 return i;
942 } else {
943 for (int i = 0; i < size; i++)
944 if (x.equals(queue[i]))
945 return i;
946 }
947 }
948 return -1;
949 }
950
951 public boolean contains(Object x) {
952 final ReentrantLock lock = this.lock;
953 lock.lock();
954 try {
955 return indexOf(x) != -1;
956 } finally {
957 lock.unlock();
958 }
959 }
960
961 public boolean remove(Object x) {
962 final ReentrantLock lock = this.lock;
963 lock.lock();
964 try {
965 int i = indexOf(x);
966 if (i < 0)
967 return false;
968
969 setIndex(queue[i], -1);
970 int s = --size;
971 RunnableScheduledFuture<?> replacement = queue[s];
972 queue[s] = null;
973 if (s != i) {
974 siftDown(i, replacement);
975 if (queue[i] == replacement)
976 siftUp(i, replacement);
977 }
978 return true;
979 } finally {
980 lock.unlock();
981 }
982 }
983
984 public int size() {
985 final ReentrantLock lock = this.lock;
986 lock.lock();
987 try {
988 return size;
989 } finally {
990 lock.unlock();
991 }
992 }
993
994 public boolean isEmpty() {
995 return size() == 0;
996 }
997
998 public int remainingCapacity() {
999 return Integer.MAX_VALUE;
1000 }
1001
1002 public RunnableScheduledFuture<?> peek() {
1003 final ReentrantLock lock = this.lock;
1004 lock.lock();
1005 try {
1006 return queue[0];
1007 } finally {
1008 lock.unlock();
1009 }
1010 }
1011
1012 public boolean offer(Runnable x) {
1013 if (x == null)
1014 throw new NullPointerException();
1015 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1016 final ReentrantLock lock = this.lock;
1017 lock.lock();
1018 try {
1019 int i = size;
1020 if (i >= queue.length)
1021 grow();
1022 size = i + 1;
1023 if (i == 0) {
1024 queue[0] = e;
1025 setIndex(e, 0);
1026 } else {
1027 siftUp(i, e);
1028 }
1029 if (queue[0] == e) {
1030 leader = null;
1031 available.signal();
1032 }
1033 } finally {
1034 lock.unlock();
1035 }
1036 return true;
1037 }
1038
1039 public void put(Runnable e) {
1040 offer(e);
1041 }
1042
1043 public boolean add(Runnable e) {
1044 return offer(e);
1045 }
1046
1047 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1048 return offer(e);
1049 }
1050
1051 /**
1052 * Performs common bookkeeping for poll and take: Replaces
1053 * first element with last and sifts it down. Call only when
1054 * holding lock.
1055 * @param f the task to remove and return
1056 */
1057 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1058 int s = --size;
1059 RunnableScheduledFuture<?> x = queue[s];
1060 queue[s] = null;
1061 if (s != 0)
1062 siftDown(0, x);
1063 setIndex(f, -1);
1064 return f;
1065 }
1066
1067 public RunnableScheduledFuture<?> poll() {
1068 final ReentrantLock lock = this.lock;
1069 lock.lock();
1070 try {
1071 RunnableScheduledFuture<?> first = queue[0];
1072 return (first == null || first.getDelay(NANOSECONDS) > 0)
1073 ? null
1074 : finishPoll(first);
1075 } finally {
1076 lock.unlock();
1077 }
1078 }
1079
1080 public RunnableScheduledFuture<?> take() throws InterruptedException {
1081 final ReentrantLock lock = this.lock;
1082 lock.lockInterruptibly();
1083 try {
1084 for (;;) {
1085 RunnableScheduledFuture<?> first = queue[0];
1086 if (first == null)
1087 available.await();
1088 else {
1089 long delay = first.getDelay(NANOSECONDS);
1090 if (delay <= 0L)
1091 return finishPoll(first);
1092 first = null; // don't retain ref while waiting
1093 if (leader != null)
1094 available.await();
1095 else {
1096 Thread thisThread = Thread.currentThread();
1097 leader = thisThread;
1098 try {
1099 available.awaitNanos(delay);
1100 } finally {
1101 if (leader == thisThread)
1102 leader = null;
1103 }
1104 }
1105 }
1106 }
1107 } finally {
1108 if (leader == null && queue[0] != null)
1109 available.signal();
1110 lock.unlock();
1111 }
1112 }
1113
1114 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1115 throws InterruptedException {
1116 long nanos = unit.toNanos(timeout);
1117 final ReentrantLock lock = this.lock;
1118 lock.lockInterruptibly();
1119 try {
1120 for (;;) {
1121 RunnableScheduledFuture<?> first = queue[0];
1122 if (first == null) {
1123 if (nanos <= 0L)
1124 return null;
1125 else
1126 nanos = available.awaitNanos(nanos);
1127 } else {
1128 long delay = first.getDelay(NANOSECONDS);
1129 if (delay <= 0L)
1130 return finishPoll(first);
1131 if (nanos <= 0L)
1132 return null;
1133 first = null; // don't retain ref while waiting
1134 if (nanos < delay || leader != null)
1135 nanos = available.awaitNanos(nanos);
1136 else {
1137 Thread thisThread = Thread.currentThread();
1138 leader = thisThread;
1139 try {
1140 long timeLeft = available.awaitNanos(delay);
1141 nanos -= delay - timeLeft;
1142 } finally {
1143 if (leader == thisThread)
1144 leader = null;
1145 }
1146 }
1147 }
1148 }
1149 } finally {
1150 if (leader == null && queue[0] != null)
1151 available.signal();
1152 lock.unlock();
1153 }
1154 }
1155
1156 public void clear() {
1157 final ReentrantLock lock = this.lock;
1158 lock.lock();
1159 try {
1160 for (int i = 0; i < size; i++) {
1161 RunnableScheduledFuture<?> t = queue[i];
1162 if (t != null) {
1163 queue[i] = null;
1164 setIndex(t, -1);
1165 }
1166 }
1167 size = 0;
1168 } finally {
1169 lock.unlock();
1170 }
1171 }
1172
1173 /**
1174 * Returns first element only if it is expired.
1175 * Used only by drainTo. Call only when holding lock.
1176 */
1177 private RunnableScheduledFuture<?> peekExpired() {
1178 // assert lock.isHeldByCurrentThread();
1179 RunnableScheduledFuture<?> first = queue[0];
1180 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1181 null : first;
1182 }
1183
1184 public int drainTo(Collection<? super Runnable> c) {
1185 if (c == null)
1186 throw new NullPointerException();
1187 if (c == this)
1188 throw new IllegalArgumentException();
1189 final ReentrantLock lock = this.lock;
1190 lock.lock();
1191 try {
1192 RunnableScheduledFuture<?> first;
1193 int n = 0;
1194 while ((first = peekExpired()) != null) {
1195 c.add(first); // In this order, in case add() throws.
1196 finishPoll(first);
1197 ++n;
1198 }
1199 return n;
1200 } finally {
1201 lock.unlock();
1202 }
1203 }
1204
1205 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1206 if (c == null)
1207 throw new NullPointerException();
1208 if (c == this)
1209 throw new IllegalArgumentException();
1210 if (maxElements <= 0)
1211 return 0;
1212 final ReentrantLock lock = this.lock;
1213 lock.lock();
1214 try {
1215 RunnableScheduledFuture<?> first;
1216 int n = 0;
1217 while (n < maxElements && (first = peekExpired()) != null) {
1218 c.add(first); // In this order, in case add() throws.
1219 finishPoll(first);
1220 ++n;
1221 }
1222 return n;
1223 } finally {
1224 lock.unlock();
1225 }
1226 }
1227
1228 public Object[] toArray() {
1229 final ReentrantLock lock = this.lock;
1230 lock.lock();
1231 try {
1232 return Arrays.copyOf(queue, size, Object[].class);
1233 } finally {
1234 lock.unlock();
1235 }
1236 }
1237
1238 @SuppressWarnings("unchecked")
1239 public <T> T[] toArray(T[] a) {
1240 final ReentrantLock lock = this.lock;
1241 lock.lock();
1242 try {
1243 if (a.length < size)
1244 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1245 System.arraycopy(queue, 0, a, 0, size);
1246 if (a.length > size)
1247 a[size] = null;
1248 return a;
1249 } finally {
1250 lock.unlock();
1251 }
1252 }
1253
1254 public Iterator<Runnable> iterator() {
1255 return new Itr(Arrays.copyOf(queue, size));
1256 }
1257
1258 /**
1259 * Snapshot iterator that works off copy of underlying q array.
1260 */
1261 private class Itr implements Iterator<Runnable> {
1262 final RunnableScheduledFuture<?>[] array;
1263 int cursor; // index of next element to return; initially 0
1264 int lastRet = -1; // index of last element returned; -1 if no such
1265
1266 Itr(RunnableScheduledFuture<?>[] array) {
1267 this.array = array;
1268 }
1269
1270 public boolean hasNext() {
1271 return cursor < array.length;
1272 }
1273
1274 public Runnable next() {
1275 if (cursor >= array.length)
1276 throw new NoSuchElementException();
1277 lastRet = cursor;
1278 return array[cursor++];
1279 }
1280
1281 public void remove() {
1282 if (lastRet < 0)
1283 throw new IllegalStateException();
1284 DelayedWorkQueue.this.remove(array[lastRet]);
1285 lastRet = -1;
1286 }
1287 }
1288 }
1289 }