ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.8
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.7: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 import static java.util.concurrent.TimeUnit.MILLISECONDS;
11 import java.util.concurrent.atomic.AtomicLong;
12 import java.util.concurrent.locks.Condition;
13 import java.util.concurrent.locks.ReentrantLock;
14 import java.util.*;
15
16 /**
17 * A {@link ThreadPoolExecutor} that can additionally schedule
18 * commands to run after a given delay, or to execute periodically.
19 * This class is preferable to {@link java.util.Timer} when multiple
20 * worker threads are needed, or when the additional flexibility or
21 * capabilities of {@link ThreadPoolExecutor} (which this class
22 * extends) are required.
23 *
24 * <p>Delayed tasks execute no sooner than they are enabled, but
25 * without any real-time guarantees about when, after they are
26 * enabled, they will commence. Tasks scheduled for exactly the same
27 * execution time are enabled in first-in-first-out (FIFO) order of
28 * submission.
29 *
30 * <p>When a submitted task is cancelled before it is run, execution
31 * is suppressed. By default, such a cancelled task is not
32 * automatically removed from the work queue until its delay elapses.
33 * While this enables further inspection and monitoring, it may also
34 * cause unbounded retention of cancelled tasks. To avoid this, use
35 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
36 * removed from the work queue at time of cancellation.
37 *
38 * <p>Successive executions of a periodic task scheduled via
39 * {@link #scheduleAtFixedRate} or
40 * {@link #scheduleWithFixedDelay} do not overlap. While different
41 * executions may be performed by different threads, the effects of
42 * prior executions <a
43 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
44 * those of subsequent ones.
45 *
46 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
47 * of the inherited tuning methods are not useful for it. In
48 * particular, because it acts as a fixed-sized pool using
49 * {@code corePoolSize} threads and an unbounded queue, adjustments
50 * to {@code maximumPoolSize} have no useful effect. Additionally, it
51 * is almost never a good idea to set {@code corePoolSize} to zero or
52 * use {@code allowCoreThreadTimeOut} because this may leave the pool
53 * without threads to handle tasks once they become eligible to run.
54 *
55 * <p><b>Extension notes:</b> This class overrides the
56 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
57 * {@link AbstractExecutorService#submit(Runnable) submit}
58 * methods to generate internal {@link ScheduledFuture} objects to
59 * control per-task delays and scheduling. To preserve
60 * functionality, any further overrides of these methods in
61 * subclasses must invoke superclass versions, which effectively
62 * disables additional task customization. However, this class
63 * provides alternative protected extension method
64 * {@code decorateTask} (one version each for {@code Runnable} and
65 * {@code Callable}) that can be used to customize the concrete task
66 * types used to execute commands entered via {@code execute},
67 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
68 * and {@code scheduleWithFixedDelay}. By default, a
69 * {@code ScheduledThreadPoolExecutor} uses a task type extending
70 * {@link FutureTask}. However, this may be modified or replaced using
71 * subclasses of the form:
72 *
73 * <pre> {@code
74 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
75 *
76 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
77 *
78 * protected <V> RunnableScheduledFuture<V> decorateTask(
79 * Runnable r, RunnableScheduledFuture<V> task) {
80 * return new CustomTask<V>(r, task);
81 * }
82 *
83 * protected <V> RunnableScheduledFuture<V> decorateTask(
84 * Callable<V> c, RunnableScheduledFuture<V> task) {
85 * return new CustomTask<V>(c, task);
86 * }
87 * // ... add constructors, etc.
88 * }}</pre>
89 *
90 * @since 1.5
91 * @author Doug Lea
92 */
93 public class ScheduledThreadPoolExecutor
94 extends ThreadPoolExecutor
95 implements ScheduledExecutorService {
96
97 /*
98 * This class specializes ThreadPoolExecutor implementation by
99 *
100 * 1. Using a custom task type, ScheduledFutureTask for
101 * tasks, even those that don't require scheduling (i.e.,
102 * those submitted using ExecutorService execute, not
103 * ScheduledExecutorService methods) which are treated as
104 * delayed tasks with a delay of zero.
105 *
106 * 2. Using a custom queue (DelayedWorkQueue), a variant of
107 * unbounded DelayQueue. The lack of capacity constraint and
108 * the fact that corePoolSize and maximumPoolSize are
109 * effectively identical simplifies some execution mechanics
110 * (see delayedExecute) compared to ThreadPoolExecutor.
111 *
112 * 3. Supporting optional run-after-shutdown parameters, which
113 * leads to overrides of shutdown methods to remove and cancel
114 * tasks that should NOT be run after shutdown, as well as
115 * different recheck logic when task (re)submission overlaps
116 * with a shutdown.
117 *
118 * 4. Task decoration methods to allow interception and
119 * instrumentation, which are needed because subclasses cannot
120 * otherwise override submit methods to get this effect. These
121 * don't have any impact on pool control logic though.
122 */
123
124 /**
125 * False if should cancel/suppress periodic tasks on shutdown.
126 */
127 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
128
129 /**
130 * False if should cancel non-periodic tasks on shutdown.
131 */
132 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
133
134 /**
135 * True if ScheduledFutureTask.cancel should remove from queue.
136 */
137 private volatile boolean removeOnCancel = false;
138
139 /**
140 * Sequence number to break scheduling ties, and in turn to
141 * guarantee FIFO order among tied entries.
142 */
143 private static final AtomicLong sequencer = new AtomicLong();
144
145 /**
146 * Returns current nanosecond time.
147 */
148 final long now() {
149 return System.nanoTime();
150 }
151
152 private class ScheduledFutureTask<V>
153 extends FutureTask<V> implements RunnableScheduledFuture<V> {
154
155 /** Sequence number to break ties FIFO */
156 private final long sequenceNumber;
157
158 /** The time the task is enabled to execute in nanoTime units */
159 private long time;
160
161 /**
162 * Period in nanoseconds for repeating tasks.
163 * A positive value indicates fixed-rate execution.
164 * A negative value indicates fixed-delay execution.
165 * A value of 0 indicates a non-repeating (one-shot) task.
166 */
167 private final long period;
168
169 /** The actual task to be re-enqueued by reExecutePeriodic */
170 RunnableScheduledFuture<V> outerTask = this;
171
172 /**
173 * Index into delay queue, to support faster cancellation.
174 */
175 int heapIndex;
176
177 /**
178 * Creates a one-shot action with given nanoTime-based trigger time.
179 */
180 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
181 super(r, result);
182 this.time = triggerTime;
183 this.period = 0;
184 this.sequenceNumber = sequencer.getAndIncrement();
185 }
186
187 /**
188 * Creates a periodic action with given nanoTime-based initial
189 * trigger time and period.
190 */
191 ScheduledFutureTask(Runnable r, V result, long triggerTime,
192 long period) {
193 super(r, result);
194 this.time = triggerTime;
195 this.period = period;
196 this.sequenceNumber = sequencer.getAndIncrement();
197 }
198
199 /**
200 * Creates a one-shot action with given nanoTime-based trigger time.
201 */
202 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
203 super(callable);
204 this.time = triggerTime;
205 this.period = 0;
206 this.sequenceNumber = sequencer.getAndIncrement();
207 }
208
209 public long getDelay(TimeUnit unit) {
210 return unit.convert(time - now(), NANOSECONDS);
211 }
212
213 public int compareTo(Delayed other) {
214 if (other == this) // compare zero if same object
215 return 0;
216 if (other instanceof ScheduledFutureTask) {
217 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
218 long diff = time - x.time;
219 if (diff < 0)
220 return -1;
221 else if (diff > 0)
222 return 1;
223 else if (sequenceNumber < x.sequenceNumber)
224 return -1;
225 else
226 return 1;
227 }
228 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
229 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
230 }
231
232 /**
233 * Returns {@code true} if this is a periodic (not a one-shot) action.
234 *
235 * @return {@code true} if periodic
236 */
237 public boolean isPeriodic() {
238 return period != 0;
239 }
240
241 /**
242 * Sets the next time to run for a periodic task.
243 */
244 private void setNextRunTime() {
245 long p = period;
246 if (p > 0)
247 time += p;
248 else
249 time = triggerTime(-p);
250 }
251
252 public boolean cancel(boolean mayInterruptIfRunning) {
253 boolean cancelled = super.cancel(mayInterruptIfRunning);
254 if (cancelled && removeOnCancel && heapIndex >= 0)
255 remove(this);
256 return cancelled;
257 }
258
259 /**
260 * Overrides FutureTask version so as to reset/requeue if periodic.
261 */
262 public void run() {
263 boolean periodic = isPeriodic();
264 if (!canRunInCurrentRunState(periodic))
265 cancel(false);
266 else if (!periodic)
267 ScheduledFutureTask.super.run();
268 else if (ScheduledFutureTask.super.runAndReset()) {
269 setNextRunTime();
270 reExecutePeriodic(outerTask);
271 }
272 }
273 }
274
275 /**
276 * Returns true if can run a task given current run state
277 * and run-after-shutdown parameters.
278 *
279 * @param periodic true if this task periodic, false if delayed
280 */
281 boolean canRunInCurrentRunState(boolean periodic) {
282 return isRunningOrShutdown(periodic ?
283 continueExistingPeriodicTasksAfterShutdown :
284 executeExistingDelayedTasksAfterShutdown);
285 }
286
287 /**
288 * Main execution method for delayed or periodic tasks. If pool
289 * is shut down, rejects the task. Otherwise adds task to queue
290 * and starts a thread, if necessary, to run it. (We cannot
291 * prestart the thread to run the task because the task (probably)
292 * shouldn't be run yet.) If the pool is shut down while the task
293 * is being added, cancel and remove it if required by state and
294 * run-after-shutdown parameters.
295 *
296 * @param task the task
297 */
298 private void delayedExecute(RunnableScheduledFuture<?> task) {
299 if (isShutdown())
300 reject(task);
301 else {
302 super.getQueue().add(task);
303 if (isShutdown() &&
304 !canRunInCurrentRunState(task.isPeriodic()) &&
305 remove(task))
306 task.cancel(false);
307 else
308 ensurePrestart();
309 }
310 }
311
312 /**
313 * Requeues a periodic task unless current run state precludes it.
314 * Same idea as delayedExecute except drops task rather than rejecting.
315 *
316 * @param task the task
317 */
318 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
319 if (canRunInCurrentRunState(true)) {
320 super.getQueue().add(task);
321 if (!canRunInCurrentRunState(true) && remove(task))
322 task.cancel(false);
323 else
324 ensurePrestart();
325 }
326 }
327
328 /**
329 * Cancels and clears the queue of all tasks that should not be run
330 * due to shutdown policy. Invoked within super.shutdown.
331 */
332 @Override void onShutdown() {
333 BlockingQueue<Runnable> q = super.getQueue();
334 boolean keepDelayed =
335 getExecuteExistingDelayedTasksAfterShutdownPolicy();
336 boolean keepPeriodic =
337 getContinueExistingPeriodicTasksAfterShutdownPolicy();
338 if (!keepDelayed && !keepPeriodic) {
339 for (Object e : q.toArray())
340 if (e instanceof RunnableScheduledFuture<?>)
341 ((RunnableScheduledFuture<?>) e).cancel(false);
342 q.clear();
343 }
344 else {
345 // Traverse snapshot to avoid iterator exceptions
346 for (Object e : q.toArray()) {
347 if (e instanceof RunnableScheduledFuture) {
348 RunnableScheduledFuture<?> t =
349 (RunnableScheduledFuture<?>)e;
350 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
351 t.isCancelled()) { // also remove if already cancelled
352 if (q.remove(t))
353 t.cancel(false);
354 }
355 }
356 }
357 }
358 tryTerminate();
359 }
360
361 /**
362 * Modifies or replaces the task used to execute a runnable.
363 * This method can be used to override the concrete
364 * class used for managing internal tasks.
365 * The default implementation simply returns the given task.
366 *
367 * @param runnable the submitted Runnable
368 * @param task the task created to execute the runnable
369 * @param <V> the type of the task's result
370 * @return a task that can execute the runnable
371 * @since 1.6
372 */
373 protected <V> RunnableScheduledFuture<V> decorateTask(
374 Runnable runnable, RunnableScheduledFuture<V> task) {
375 return task;
376 }
377
378 /**
379 * Modifies or replaces the task used to execute a callable.
380 * This method can be used to override the concrete
381 * class used for managing internal tasks.
382 * The default implementation simply returns the given task.
383 *
384 * @param callable the submitted Callable
385 * @param task the task created to execute the callable
386 * @param <V> the type of the task's result
387 * @return a task that can execute the callable
388 * @since 1.6
389 */
390 protected <V> RunnableScheduledFuture<V> decorateTask(
391 Callable<V> callable, RunnableScheduledFuture<V> task) {
392 return task;
393 }
394
395 /**
396 * The default keep-alive time for pool threads.
397 *
398 * Normally, this value is unused because all pool threads will be
399 * core threads, but if a user creates a pool with a corePoolSize
400 * of zero (against our advice), we keep a thread alive as long as
401 * there are queued tasks. If the keep alive time is zero (the
402 * historic value), we end up hot-spinning in getTask, wasting a
403 * CPU. But on the other hand, if we set the value too high, and
404 * users create a one-shot pool which they don't cleanly shutdown,
405 * the pool's non-daemon threads will prevent JVM termination. A
406 * small but non-zero value (relative to a JVM's lifetime) seems
407 * best.
408 */
409 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
410
411 /**
412 * Creates a new {@code ScheduledThreadPoolExecutor} with the
413 * given core pool size.
414 *
415 * @param corePoolSize the number of threads to keep in the pool, even
416 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
417 * @throws IllegalArgumentException if {@code corePoolSize < 0}
418 */
419 public ScheduledThreadPoolExecutor(int corePoolSize) {
420 super(corePoolSize, Integer.MAX_VALUE,
421 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
422 new DelayedWorkQueue());
423 }
424
425 /**
426 * Creates a new {@code ScheduledThreadPoolExecutor} with the
427 * given initial parameters.
428 *
429 * @param corePoolSize the number of threads to keep in the pool, even
430 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
431 * @param threadFactory the factory to use when the executor
432 * creates a new thread
433 * @throws IllegalArgumentException if {@code corePoolSize < 0}
434 * @throws NullPointerException if {@code threadFactory} is null
435 */
436 public ScheduledThreadPoolExecutor(int corePoolSize,
437 ThreadFactory threadFactory) {
438 super(corePoolSize, Integer.MAX_VALUE,
439 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
440 new DelayedWorkQueue(), threadFactory);
441 }
442
443 /**
444 * Creates a new {@code ScheduledThreadPoolExecutor} with the
445 * given initial parameters.
446 *
447 * @param corePoolSize the number of threads to keep in the pool, even
448 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
449 * @param handler the handler to use when execution is blocked
450 * because the thread bounds and queue capacities are reached
451 * @throws IllegalArgumentException if {@code corePoolSize < 0}
452 * @throws NullPointerException if {@code handler} is null
453 */
454 public ScheduledThreadPoolExecutor(int corePoolSize,
455 RejectedExecutionHandler handler) {
456 super(corePoolSize, Integer.MAX_VALUE,
457 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
458 new DelayedWorkQueue(), handler);
459 }
460
461 /**
462 * Creates a new {@code ScheduledThreadPoolExecutor} with the
463 * given initial parameters.
464 *
465 * @param corePoolSize the number of threads to keep in the pool, even
466 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
467 * @param threadFactory the factory to use when the executor
468 * creates a new thread
469 * @param handler the handler to use when execution is blocked
470 * because the thread bounds and queue capacities are reached
471 * @throws IllegalArgumentException if {@code corePoolSize < 0}
472 * @throws NullPointerException if {@code threadFactory} or
473 * {@code handler} is null
474 */
475 public ScheduledThreadPoolExecutor(int corePoolSize,
476 ThreadFactory threadFactory,
477 RejectedExecutionHandler handler) {
478 super(corePoolSize, Integer.MAX_VALUE,
479 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
480 new DelayedWorkQueue(), threadFactory, handler);
481 }
482
483 /**
484 * Returns the nanoTime-based trigger time of a delayed action.
485 */
486 private long triggerTime(long delay, TimeUnit unit) {
487 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
488 }
489
490 /**
491 * Returns the nanoTime-based trigger time of a delayed action.
492 */
493 long triggerTime(long delay) {
494 return now() +
495 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
496 }
497
498 /**
499 * Constrains the values of all delays in the queue to be within
500 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
501 * This may occur if a task is eligible to be dequeued, but has
502 * not yet been, while some other task is added with a delay of
503 * Long.MAX_VALUE.
504 */
505 private long overflowFree(long delay) {
506 Delayed head = (Delayed) super.getQueue().peek();
507 if (head != null) {
508 long headDelay = head.getDelay(NANOSECONDS);
509 if (headDelay < 0 && (delay - headDelay < 0))
510 delay = Long.MAX_VALUE + headDelay;
511 }
512 return delay;
513 }
514
515 /**
516 * @throws RejectedExecutionException {@inheritDoc}
517 * @throws NullPointerException {@inheritDoc}
518 */
519 public ScheduledFuture<?> schedule(Runnable command,
520 long delay,
521 TimeUnit unit) {
522 if (command == null || unit == null)
523 throw new NullPointerException();
524 RunnableScheduledFuture<Void> t = decorateTask(command,
525 new ScheduledFutureTask<Void>(command, null,
526 triggerTime(delay, unit)));
527 delayedExecute(t);
528 return t;
529 }
530
531 /**
532 * @throws RejectedExecutionException {@inheritDoc}
533 * @throws NullPointerException {@inheritDoc}
534 */
535 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
536 long delay,
537 TimeUnit unit) {
538 if (callable == null || unit == null)
539 throw new NullPointerException();
540 RunnableScheduledFuture<V> t = decorateTask(callable,
541 new ScheduledFutureTask<V>(callable,
542 triggerTime(delay, unit)));
543 delayedExecute(t);
544 return t;
545 }
546
547 /**
548 * @throws RejectedExecutionException {@inheritDoc}
549 * @throws NullPointerException {@inheritDoc}
550 * @throws IllegalArgumentException {@inheritDoc}
551 */
552 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
553 long initialDelay,
554 long period,
555 TimeUnit unit) {
556 if (command == null || unit == null)
557 throw new NullPointerException();
558 if (period <= 0)
559 throw new IllegalArgumentException();
560 ScheduledFutureTask<Void> sft =
561 new ScheduledFutureTask<Void>(command,
562 null,
563 triggerTime(initialDelay, unit),
564 unit.toNanos(period));
565 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
566 sft.outerTask = t;
567 delayedExecute(t);
568 return t;
569 }
570
571 /**
572 * @throws RejectedExecutionException {@inheritDoc}
573 * @throws NullPointerException {@inheritDoc}
574 * @throws IllegalArgumentException {@inheritDoc}
575 */
576 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
577 long initialDelay,
578 long delay,
579 TimeUnit unit) {
580 if (command == null || unit == null)
581 throw new NullPointerException();
582 if (delay <= 0)
583 throw new IllegalArgumentException();
584 ScheduledFutureTask<Void> sft =
585 new ScheduledFutureTask<Void>(command,
586 null,
587 triggerTime(initialDelay, unit),
588 unit.toNanos(-delay));
589 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
590 sft.outerTask = t;
591 delayedExecute(t);
592 return t;
593 }
594
595 /**
596 * Executes {@code command} with zero required delay.
597 * This has effect equivalent to
598 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
599 * Note that inspections of the queue and of the list returned by
600 * {@code shutdownNow} will access the zero-delayed
601 * {@link ScheduledFuture}, not the {@code command} itself.
602 *
603 * <p>A consequence of the use of {@code ScheduledFuture} objects is
604 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
605 * called with a null second {@code Throwable} argument, even if the
606 * {@code command} terminated abruptly. Instead, the {@code Throwable}
607 * thrown by such a task can be obtained via {@link Future#get}.
608 *
609 * @throws RejectedExecutionException at discretion of
610 * {@code RejectedExecutionHandler}, if the task
611 * cannot be accepted for execution because the
612 * executor has been shut down
613 * @throws NullPointerException {@inheritDoc}
614 */
615 public void execute(Runnable command) {
616 schedule(command, 0, NANOSECONDS);
617 }
618
619 // Override AbstractExecutorService methods
620
621 /**
622 * @throws RejectedExecutionException {@inheritDoc}
623 * @throws NullPointerException {@inheritDoc}
624 */
625 public Future<?> submit(Runnable task) {
626 return schedule(task, 0, NANOSECONDS);
627 }
628
629 /**
630 * @throws RejectedExecutionException {@inheritDoc}
631 * @throws NullPointerException {@inheritDoc}
632 */
633 public <T> Future<T> submit(Runnable task, T result) {
634 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
635 }
636
637 /**
638 * @throws RejectedExecutionException {@inheritDoc}
639 * @throws NullPointerException {@inheritDoc}
640 */
641 public <T> Future<T> submit(Callable<T> task) {
642 return schedule(task, 0, NANOSECONDS);
643 }
644
645 /**
646 * Sets the policy on whether to continue executing existing
647 * periodic tasks even when this executor has been {@code shutdown}.
648 * In this case, these tasks will only terminate upon
649 * {@code shutdownNow} or after setting the policy to
650 * {@code false} when already shutdown.
651 * This value is by default {@code false}.
652 *
653 * @param value if {@code true}, continue after shutdown, else don't
654 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
655 */
656 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
657 continueExistingPeriodicTasksAfterShutdown = value;
658 if (!value && isShutdown())
659 onShutdown();
660 }
661
662 /**
663 * Gets the policy on whether to continue executing existing
664 * periodic tasks even when this executor has been {@code shutdown}.
665 * In this case, these tasks will only terminate upon
666 * {@code shutdownNow} or after setting the policy to
667 * {@code false} when already shutdown.
668 * This value is by default {@code false}.
669 *
670 * @return {@code true} if will continue after shutdown
671 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
672 */
673 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
674 return continueExistingPeriodicTasksAfterShutdown;
675 }
676
677 /**
678 * Sets the policy on whether to execute existing delayed
679 * tasks even when this executor has been {@code shutdown}.
680 * In this case, these tasks will only terminate upon
681 * {@code shutdownNow}, or after setting the policy to
682 * {@code false} when already shutdown.
683 * This value is by default {@code true}.
684 *
685 * @param value if {@code true}, execute after shutdown, else don't
686 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
687 */
688 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
689 executeExistingDelayedTasksAfterShutdown = value;
690 if (!value && isShutdown())
691 onShutdown();
692 }
693
694 /**
695 * Gets the policy on whether to execute existing delayed
696 * tasks even when this executor has been {@code shutdown}.
697 * In this case, these tasks will only terminate upon
698 * {@code shutdownNow}, or after setting the policy to
699 * {@code false} when already shutdown.
700 * This value is by default {@code true}.
701 *
702 * @return {@code true} if will execute after shutdown
703 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
704 */
705 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
706 return executeExistingDelayedTasksAfterShutdown;
707 }
708
709 /**
710 * Sets the policy on whether cancelled tasks should be immediately
711 * removed from the work queue at time of cancellation. This value is
712 * by default {@code false}.
713 *
714 * @param value if {@code true}, remove on cancellation, else don't
715 * @see #getRemoveOnCancelPolicy
716 * @since 1.7
717 */
718 public void setRemoveOnCancelPolicy(boolean value) {
719 removeOnCancel = value;
720 }
721
722 /**
723 * Gets the policy on whether cancelled tasks should be immediately
724 * removed from the work queue at time of cancellation. This value is
725 * by default {@code false}.
726 *
727 * @return {@code true} if cancelled tasks are immediately removed
728 * from the queue
729 * @see #setRemoveOnCancelPolicy
730 * @since 1.7
731 */
732 public boolean getRemoveOnCancelPolicy() {
733 return removeOnCancel;
734 }
735
736 /**
737 * Initiates an orderly shutdown in which previously submitted
738 * tasks are executed, but no new tasks will be accepted.
739 * Invocation has no additional effect if already shut down.
740 *
741 * <p>This method does not wait for previously submitted tasks to
742 * complete execution. Use {@link #awaitTermination awaitTermination}
743 * to do that.
744 *
745 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
746 * has been set {@code false}, existing delayed tasks whose delays
747 * have not yet elapsed are cancelled. And unless the {@code
748 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
749 * {@code true}, future executions of existing periodic tasks will
750 * be cancelled.
751 *
752 * @throws SecurityException {@inheritDoc}
753 */
754 public void shutdown() {
755 super.shutdown();
756 }
757
758 /**
759 * Attempts to stop all actively executing tasks, halts the
760 * processing of waiting tasks, and returns a list of the tasks
761 * that were awaiting execution.
762 *
763 * <p>This method does not wait for actively executing tasks to
764 * terminate. Use {@link #awaitTermination awaitTermination} to
765 * do that.
766 *
767 * <p>There are no guarantees beyond best-effort attempts to stop
768 * processing actively executing tasks. This implementation
769 * cancels tasks via {@link Thread#interrupt}, so any task that
770 * fails to respond to interrupts may never terminate.
771 *
772 * @return list of tasks that never commenced execution.
773 * Each element of this list is a {@link ScheduledFuture}.
774 * For tasks submitted via one of the {@code schedule}
775 * methods, the element will be identical to the returned
776 * {@code ScheduledFuture}. For tasks submitted using
777 * {@link #execute}, the element will be a zero-delay {@code
778 * ScheduledFuture}.
779 * @throws SecurityException {@inheritDoc}
780 */
781 public List<Runnable> shutdownNow() {
782 return super.shutdownNow();
783 }
784
785 /**
786 * Returns the task queue used by this executor.
787 * Each element of this list is a {@link ScheduledFuture}.
788 * For tasks submitted via one of the {@code schedule} methods, the
789 * element will be identical to the returned {@code ScheduledFuture}.
790 * For tasks submitted using {@link #execute}, the element will be a
791 * zero-delay {@code ScheduledFuture}.
792 *
793 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
794 * tasks in the order in which they will execute.
795 *
796 * @return the task queue
797 */
798 public BlockingQueue<Runnable> getQueue() {
799 return super.getQueue();
800 }
801
802 /**
803 * Specialized delay queue. To mesh with TPE declarations, this
804 * class must be declared as a BlockingQueue<Runnable> even though
805 * it can only hold RunnableScheduledFutures.
806 */
807 static class DelayedWorkQueue extends AbstractQueue<Runnable>
808 implements BlockingQueue<Runnable> {
809
810 /*
811 * A DelayedWorkQueue is based on a heap-based data structure
812 * like those in DelayQueue and PriorityQueue, except that
813 * every ScheduledFutureTask also records its index into the
814 * heap array. This eliminates the need to find a task upon
815 * cancellation, greatly speeding up removal (down from O(n)
816 * to O(log n)), and reducing garbage retention that would
817 * otherwise occur by waiting for the element to rise to top
818 * before clearing. But because the queue may also hold
819 * RunnableScheduledFutures that are not ScheduledFutureTasks,
820 * we are not guaranteed to have such indices available, in
821 * which case we fall back to linear search. (We expect that
822 * most tasks will not be decorated, and that the faster cases
823 * will be much more common.)
824 *
825 * All heap operations must record index changes -- mainly
826 * within siftUp and siftDown. Upon removal, a task's
827 * heapIndex is set to -1. Note that ScheduledFutureTasks can
828 * appear at most once in the queue (this need not be true for
829 * other kinds of tasks or work queues), so are uniquely
830 * identified by heapIndex.
831 */
832
833 private static final int INITIAL_CAPACITY = 16;
834 private RunnableScheduledFuture<?>[] queue =
835 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
836 private final ReentrantLock lock = new ReentrantLock();
837 private int size;
838
839 /**
840 * Thread designated to wait for the task at the head of the
841 * queue. This variant of the Leader-Follower pattern
842 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
843 * minimize unnecessary timed waiting. When a thread becomes
844 * the leader, it waits only for the next delay to elapse, but
845 * other threads await indefinitely. The leader thread must
846 * signal some other thread before returning from take() or
847 * poll(...), unless some other thread becomes leader in the
848 * interim. Whenever the head of the queue is replaced with a
849 * task with an earlier expiration time, the leader field is
850 * invalidated by being reset to null, and some waiting
851 * thread, but not necessarily the current leader, is
852 * signalled. So waiting threads must be prepared to acquire
853 * and lose leadership while waiting.
854 */
855 private Thread leader;
856
857 /**
858 * Condition signalled when a newer task becomes available at the
859 * head of the queue or a new thread may need to become leader.
860 */
861 private final Condition available = lock.newCondition();
862
863 /**
864 * Sets f's heapIndex if it is a ScheduledFutureTask.
865 */
866 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
867 if (f instanceof ScheduledFutureTask)
868 ((ScheduledFutureTask)f).heapIndex = idx;
869 }
870
871 /**
872 * Sifts element added at bottom up to its heap-ordered spot.
873 * Call only when holding lock.
874 */
875 private void siftUp(int k, RunnableScheduledFuture<?> key) {
876 while (k > 0) {
877 int parent = (k - 1) >>> 1;
878 RunnableScheduledFuture<?> e = queue[parent];
879 if (key.compareTo(e) >= 0)
880 break;
881 queue[k] = e;
882 setIndex(e, k);
883 k = parent;
884 }
885 queue[k] = key;
886 setIndex(key, k);
887 }
888
889 /**
890 * Sifts element added at top down to its heap-ordered spot.
891 * Call only when holding lock.
892 */
893 private void siftDown(int k, RunnableScheduledFuture<?> key) {
894 int half = size >>> 1;
895 while (k < half) {
896 int child = (k << 1) + 1;
897 RunnableScheduledFuture<?> c = queue[child];
898 int right = child + 1;
899 if (right < size && c.compareTo(queue[right]) > 0)
900 c = queue[child = right];
901 if (key.compareTo(c) <= 0)
902 break;
903 queue[k] = c;
904 setIndex(c, k);
905 k = child;
906 }
907 queue[k] = key;
908 setIndex(key, k);
909 }
910
911 /**
912 * Resizes the heap array. Call only when holding lock.
913 */
914 private void grow() {
915 int oldCapacity = queue.length;
916 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
917 if (newCapacity < 0) // overflow
918 newCapacity = Integer.MAX_VALUE;
919 queue = Arrays.copyOf(queue, newCapacity);
920 }
921
922 /**
923 * Finds index of given object, or -1 if absent.
924 */
925 private int indexOf(Object x) {
926 if (x != null) {
927 if (x instanceof ScheduledFutureTask) {
928 int i = ((ScheduledFutureTask) x).heapIndex;
929 // Sanity check; x could conceivably be a
930 // ScheduledFutureTask from some other pool.
931 if (i >= 0 && i < size && queue[i] == x)
932 return i;
933 } else {
934 for (int i = 0; i < size; i++)
935 if (x.equals(queue[i]))
936 return i;
937 }
938 }
939 return -1;
940 }
941
942 public boolean contains(Object x) {
943 final ReentrantLock lock = this.lock;
944 lock.lock();
945 try {
946 return indexOf(x) != -1;
947 } finally {
948 lock.unlock();
949 }
950 }
951
952 public boolean remove(Object x) {
953 final ReentrantLock lock = this.lock;
954 lock.lock();
955 try {
956 int i = indexOf(x);
957 if (i < 0)
958 return false;
959
960 setIndex(queue[i], -1);
961 int s = --size;
962 RunnableScheduledFuture<?> replacement = queue[s];
963 queue[s] = null;
964 if (s != i) {
965 siftDown(i, replacement);
966 if (queue[i] == replacement)
967 siftUp(i, replacement);
968 }
969 return true;
970 } finally {
971 lock.unlock();
972 }
973 }
974
975 public int size() {
976 final ReentrantLock lock = this.lock;
977 lock.lock();
978 try {
979 return size;
980 } finally {
981 lock.unlock();
982 }
983 }
984
985 public boolean isEmpty() {
986 return size() == 0;
987 }
988
989 public int remainingCapacity() {
990 return Integer.MAX_VALUE;
991 }
992
993 public RunnableScheduledFuture<?> peek() {
994 final ReentrantLock lock = this.lock;
995 lock.lock();
996 try {
997 return queue[0];
998 } finally {
999 lock.unlock();
1000 }
1001 }
1002
1003 public boolean offer(Runnable x) {
1004 if (x == null)
1005 throw new NullPointerException();
1006 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1007 final ReentrantLock lock = this.lock;
1008 lock.lock();
1009 try {
1010 int i = size;
1011 if (i >= queue.length)
1012 grow();
1013 size = i + 1;
1014 if (i == 0) {
1015 queue[0] = e;
1016 setIndex(e, 0);
1017 } else {
1018 siftUp(i, e);
1019 }
1020 if (queue[0] == e) {
1021 leader = null;
1022 available.signal();
1023 }
1024 } finally {
1025 lock.unlock();
1026 }
1027 return true;
1028 }
1029
1030 public void put(Runnable e) {
1031 offer(e);
1032 }
1033
1034 public boolean add(Runnable e) {
1035 return offer(e);
1036 }
1037
1038 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1039 return offer(e);
1040 }
1041
1042 /**
1043 * Performs common bookkeeping for poll and take: Replaces
1044 * first element with last and sifts it down. Call only when
1045 * holding lock.
1046 * @param f the task to remove and return
1047 */
1048 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1049 int s = --size;
1050 RunnableScheduledFuture<?> x = queue[s];
1051 queue[s] = null;
1052 if (s != 0)
1053 siftDown(0, x);
1054 setIndex(f, -1);
1055 return f;
1056 }
1057
1058 public RunnableScheduledFuture<?> poll() {
1059 final ReentrantLock lock = this.lock;
1060 lock.lock();
1061 try {
1062 RunnableScheduledFuture<?> first = queue[0];
1063 if (first == null || first.getDelay(NANOSECONDS) > 0)
1064 return null;
1065 else
1066 return finishPoll(first);
1067 } finally {
1068 lock.unlock();
1069 }
1070 }
1071
1072 public RunnableScheduledFuture<?> take() throws InterruptedException {
1073 final ReentrantLock lock = this.lock;
1074 lock.lockInterruptibly();
1075 try {
1076 for (;;) {
1077 RunnableScheduledFuture<?> first = queue[0];
1078 if (first == null)
1079 available.await();
1080 else {
1081 long delay = first.getDelay(NANOSECONDS);
1082 if (delay <= 0)
1083 return finishPoll(first);
1084 first = null; // don't retain ref while waiting
1085 if (leader != null)
1086 available.await();
1087 else {
1088 Thread thisThread = Thread.currentThread();
1089 leader = thisThread;
1090 try {
1091 available.awaitNanos(delay);
1092 } finally {
1093 if (leader == thisThread)
1094 leader = null;
1095 }
1096 }
1097 }
1098 }
1099 } finally {
1100 if (leader == null && queue[0] != null)
1101 available.signal();
1102 lock.unlock();
1103 }
1104 }
1105
1106 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1107 throws InterruptedException {
1108 long nanos = unit.toNanos(timeout);
1109 final ReentrantLock lock = this.lock;
1110 lock.lockInterruptibly();
1111 try {
1112 for (;;) {
1113 RunnableScheduledFuture<?> first = queue[0];
1114 if (first == null) {
1115 if (nanos <= 0)
1116 return null;
1117 else
1118 nanos = available.awaitNanos(nanos);
1119 } else {
1120 long delay = first.getDelay(NANOSECONDS);
1121 if (delay <= 0)
1122 return finishPoll(first);
1123 if (nanos <= 0)
1124 return null;
1125 first = null; // don't retain ref while waiting
1126 if (nanos < delay || leader != null)
1127 nanos = available.awaitNanos(nanos);
1128 else {
1129 Thread thisThread = Thread.currentThread();
1130 leader = thisThread;
1131 try {
1132 long timeLeft = available.awaitNanos(delay);
1133 nanos -= delay - timeLeft;
1134 } finally {
1135 if (leader == thisThread)
1136 leader = null;
1137 }
1138 }
1139 }
1140 }
1141 } finally {
1142 if (leader == null && queue[0] != null)
1143 available.signal();
1144 lock.unlock();
1145 }
1146 }
1147
1148 public void clear() {
1149 final ReentrantLock lock = this.lock;
1150 lock.lock();
1151 try {
1152 for (int i = 0; i < size; i++) {
1153 RunnableScheduledFuture<?> t = queue[i];
1154 if (t != null) {
1155 queue[i] = null;
1156 setIndex(t, -1);
1157 }
1158 }
1159 size = 0;
1160 } finally {
1161 lock.unlock();
1162 }
1163 }
1164
1165 /**
1166 * Returns first element only if it is expired.
1167 * Used only by drainTo. Call only when holding lock.
1168 */
1169 private RunnableScheduledFuture<?> peekExpired() {
1170 // assert lock.isHeldByCurrentThread();
1171 RunnableScheduledFuture<?> first = queue[0];
1172 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1173 null : first;
1174 }
1175
1176 public int drainTo(Collection<? super Runnable> c) {
1177 if (c == null)
1178 throw new NullPointerException();
1179 if (c == this)
1180 throw new IllegalArgumentException();
1181 final ReentrantLock lock = this.lock;
1182 lock.lock();
1183 try {
1184 RunnableScheduledFuture<?> first;
1185 int n = 0;
1186 while ((first = peekExpired()) != null) {
1187 c.add(first); // In this order, in case add() throws.
1188 finishPoll(first);
1189 ++n;
1190 }
1191 return n;
1192 } finally {
1193 lock.unlock();
1194 }
1195 }
1196
1197 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1198 if (c == null)
1199 throw new NullPointerException();
1200 if (c == this)
1201 throw new IllegalArgumentException();
1202 if (maxElements <= 0)
1203 return 0;
1204 final ReentrantLock lock = this.lock;
1205 lock.lock();
1206 try {
1207 RunnableScheduledFuture<?> first;
1208 int n = 0;
1209 while (n < maxElements && (first = peekExpired()) != null) {
1210 c.add(first); // In this order, in case add() throws.
1211 finishPoll(first);
1212 ++n;
1213 }
1214 return n;
1215 } finally {
1216 lock.unlock();
1217 }
1218 }
1219
1220 public Object[] toArray() {
1221 final ReentrantLock lock = this.lock;
1222 lock.lock();
1223 try {
1224 return Arrays.copyOf(queue, size, Object[].class);
1225 } finally {
1226 lock.unlock();
1227 }
1228 }
1229
1230 @SuppressWarnings("unchecked")
1231 public <T> T[] toArray(T[] a) {
1232 final ReentrantLock lock = this.lock;
1233 lock.lock();
1234 try {
1235 if (a.length < size)
1236 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1237 System.arraycopy(queue, 0, a, 0, size);
1238 if (a.length > size)
1239 a[size] = null;
1240 return a;
1241 } finally {
1242 lock.unlock();
1243 }
1244 }
1245
1246 public Iterator<Runnable> iterator() {
1247 return new Itr(Arrays.copyOf(queue, size));
1248 }
1249
1250 /**
1251 * Snapshot iterator that works off copy of underlying q array.
1252 */
1253 private class Itr implements Iterator<Runnable> {
1254 final RunnableScheduledFuture<?>[] array;
1255 int cursor = 0; // index of next element to return
1256 int lastRet = -1; // index of last element, or -1 if no such
1257
1258 Itr(RunnableScheduledFuture<?>[] array) {
1259 this.array = array;
1260 }
1261
1262 public boolean hasNext() {
1263 return cursor < array.length;
1264 }
1265
1266 public Runnable next() {
1267 if (cursor >= array.length)
1268 throw new NoSuchElementException();
1269 lastRet = cursor;
1270 return array[cursor++];
1271 }
1272
1273 public void remove() {
1274 if (lastRet < 0)
1275 throw new IllegalStateException();
1276 DelayedWorkQueue.this.remove(array[lastRet]);
1277 lastRet = -1;
1278 }
1279 }
1280 }
1281 }