ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.75
Committed: Sun Feb 16 14:03:53 2014 UTC (10 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.74: +28 -27 lines
Log Message:
tidy internal documentation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.atomic.AtomicLong;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13
14 /**
15 * A {@link ThreadPoolExecutor} that can additionally schedule
16 * commands to run after a given delay, or to execute periodically.
17 * This class is preferable to {@link java.util.Timer} when multiple
18 * worker threads are needed, or when the additional flexibility or
19 * capabilities of {@link ThreadPoolExecutor} (which this class
20 * extends) are required.
21 *
22 * <p>Delayed tasks execute no sooner than they are enabled, but
23 * without any real-time guarantees about when, after they are
24 * enabled, they will commence. Tasks scheduled for exactly the same
25 * execution time are enabled in first-in-first-out (FIFO) order of
26 * submission.
27 *
28 * <p>When a submitted task is cancelled before it is run, execution
29 * is suppressed. By default, such a cancelled task is not
30 * automatically removed from the work queue until its delay elapses.
31 * While this enables further inspection and monitoring, it may also
32 * cause unbounded retention of cancelled tasks. To avoid this, use
33 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
34 * removed from the work queue at time of cancellation.
35 *
36 * <p>Successive executions of a periodic task scheduled via
37 * {@link #scheduleAtFixedRate} or
38 * {@link #scheduleWithFixedDelay} do not overlap. While different
39 * executions may be performed by different threads, the effects of
40 * prior executions <a
41 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
42 * those of subsequent ones.
43 *
44 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
45 * of the inherited tuning methods are not useful for it. In
46 * particular, because it acts as a fixed-sized pool using
47 * {@code corePoolSize} threads and an unbounded queue, adjustments
48 * to {@code maximumPoolSize} have no useful effect. Additionally, it
49 * is almost never a good idea to set {@code corePoolSize} to zero or
50 * use {@code allowCoreThreadTimeOut} because this may leave the pool
51 * without threads to handle tasks once they become eligible to run.
52 *
53 * <p><b>Extension notes:</b> This class overrides the
54 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
55 * {@link AbstractExecutorService#submit(Runnable) submit}
56 * methods to generate internal {@link ScheduledFuture} objects to
57 * control per-task delays and scheduling. To preserve
58 * functionality, any further overrides of these methods in
59 * subclasses must invoke superclass versions, which effectively
60 * disables additional task customization. However, this class
61 * provides alternative protected extension method
62 * {@code decorateTask} (one version each for {@code Runnable} and
63 * {@code Callable}) that can be used to customize the concrete task
64 * types used to execute commands entered via {@code execute},
65 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
66 * and {@code scheduleWithFixedDelay}. By default, a
67 * {@code ScheduledThreadPoolExecutor} uses a task type extending
68 * {@link FutureTask}. However, this may be modified or replaced using
69 * subclasses of the form:
70 *
71 * <pre> {@code
72 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
73 *
74 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
75 *
76 * protected <V> RunnableScheduledFuture<V> decorateTask(
77 * Runnable r, RunnableScheduledFuture<V> task) {
78 * return new CustomTask<V>(r, task);
79 * }
80 *
81 * protected <V> RunnableScheduledFuture<V> decorateTask(
82 * Callable<V> c, RunnableScheduledFuture<V> task) {
83 * return new CustomTask<V>(c, task);
84 * }
85 * // ... add constructors, etc.
86 * }}</pre>
87 *
88 * @since 1.5
89 * @author Doug Lea
90 */
91 public class ScheduledThreadPoolExecutor
92 extends ThreadPoolExecutor
93 implements ScheduledExecutorService {
94
95 /*
96 * This class specializes ThreadPoolExecutor implementation by
97 *
98 * 1. Using a custom task type, ScheduledFutureTask for
99 * tasks, even those that don't require scheduling (i.e.,
100 * those submitted using ExecutorService execute, not
101 * ScheduledExecutorService methods) which are treated as
102 * delayed tasks with a delay of zero.
103 *
104 * 2. Using a custom queue (DelayedWorkQueue), a variant of
105 * unbounded DelayQueue. The lack of capacity constraint and
106 * the fact that corePoolSize and maximumPoolSize are
107 * effectively identical simplifies some execution mechanics
108 * (see delayedExecute) compared to ThreadPoolExecutor.
109 *
110 * 3. Supporting optional run-after-shutdown parameters, which
111 * leads to overrides of shutdown methods to remove and cancel
112 * tasks that should NOT be run after shutdown, as well as
113 * different recheck logic when task (re)submission overlaps
114 * with a shutdown.
115 *
116 * 4. Task decoration methods to allow interception and
117 * instrumentation, which are needed because subclasses cannot
118 * otherwise override submit methods to get this effect. These
119 * don't have any impact on pool control logic though.
120 */
121
122 /**
123 * False if should cancel/suppress periodic tasks on shutdown.
124 */
125 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
126
127 /**
128 * False if should cancel non-periodic tasks on shutdown.
129 */
130 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
131
132 /**
133 * True if ScheduledFutureTask.cancel should remove from queue.
134 */
135 private volatile boolean removeOnCancel = false;
136
137 /**
138 * Sequence number to break scheduling ties, and in turn to
139 * guarantee FIFO order among tied entries.
140 */
141 private static final AtomicLong sequencer = new AtomicLong();
142
143 /**
144 * Returns current nanosecond time.
145 */
146 final long now() {
147 return System.nanoTime();
148 }
149
150 private class ScheduledFutureTask<V>
151 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152
153 /** Sequence number to break ties FIFO */
154 private final long sequenceNumber;
155
156 /** The time the task is enabled to execute in nanoTime units */
157 private long time;
158
159 /**
160 * Period in nanoseconds for repeating tasks.
161 * A positive value indicates fixed-rate execution.
162 * A negative value indicates fixed-delay execution.
163 * A value of 0 indicates a non-repeating (one-shot) task.
164 */
165 private final long period;
166
167 /** The actual task to be re-enqueued by reExecutePeriodic */
168 RunnableScheduledFuture<V> outerTask = this;
169
170 /**
171 * Index into delay queue, to support faster cancellation.
172 */
173 int heapIndex;
174
175 /**
176 * Creates a one-shot action with given nanoTime-based trigger time.
177 */
178 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
179 super(r, result);
180 this.time = triggerTime;
181 this.period = 0;
182 this.sequenceNumber = sequencer.getAndIncrement();
183 }
184
185 /**
186 * Creates a periodic action with given nanoTime-based initial
187 * trigger time and period.
188 */
189 ScheduledFutureTask(Runnable r, V result, long triggerTime,
190 long period) {
191 super(r, result);
192 this.time = triggerTime;
193 this.period = period;
194 this.sequenceNumber = sequencer.getAndIncrement();
195 }
196
197 /**
198 * Creates a one-shot action with given nanoTime-based trigger time.
199 */
200 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
201 super(callable);
202 this.time = triggerTime;
203 this.period = 0;
204 this.sequenceNumber = sequencer.getAndIncrement();
205 }
206
207 public long getDelay(TimeUnit unit) {
208 return unit.convert(time - now(), NANOSECONDS);
209 }
210
211 public int compareTo(Delayed other) {
212 if (other == this) // compare zero if same object
213 return 0;
214 if (other instanceof ScheduledFutureTask) {
215 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
216 long diff = time - x.time;
217 if (diff < 0)
218 return -1;
219 else if (diff > 0)
220 return 1;
221 else if (sequenceNumber < x.sequenceNumber)
222 return -1;
223 else
224 return 1;
225 }
226 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
227 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
228 }
229
230 /**
231 * Returns {@code true} if this is a periodic (not a one-shot) action.
232 *
233 * @return {@code true} if periodic
234 */
235 public boolean isPeriodic() {
236 return period != 0;
237 }
238
239 /**
240 * Sets the next time to run for a periodic task.
241 */
242 private void setNextRunTime() {
243 long p = period;
244 if (p > 0)
245 time += p;
246 else
247 time = triggerTime(-p);
248 }
249
250 public boolean cancel(boolean mayInterruptIfRunning) {
251 boolean cancelled = super.cancel(mayInterruptIfRunning);
252 if (cancelled && removeOnCancel && heapIndex >= 0)
253 remove(this);
254 return cancelled;
255 }
256
257 /**
258 * Overrides FutureTask version so as to reset/requeue if periodic.
259 */
260 public void run() {
261 boolean periodic = isPeriodic();
262 if (!canRunInCurrentRunState(periodic))
263 cancel(false);
264 else if (!periodic)
265 ScheduledFutureTask.super.run();
266 else if (ScheduledFutureTask.super.runAndReset()) {
267 setNextRunTime();
268 reExecutePeriodic(outerTask);
269 }
270 }
271 }
272
273 /**
274 * Returns true if can run a task given current run state
275 * and run-after-shutdown parameters.
276 *
277 * @param periodic true if this task periodic, false if delayed
278 */
279 boolean canRunInCurrentRunState(boolean periodic) {
280 return isRunningOrShutdown(periodic ?
281 continueExistingPeriodicTasksAfterShutdown :
282 executeExistingDelayedTasksAfterShutdown);
283 }
284
285 /**
286 * Main execution method for delayed or periodic tasks. If pool
287 * is shut down, rejects the task. Otherwise adds task to queue
288 * and starts a thread, if necessary, to run it. (We cannot
289 * prestart the thread to run the task because the task (probably)
290 * shouldn't be run yet.) If the pool is shut down while the task
291 * is being added, cancel and remove it if required by state and
292 * run-after-shutdown parameters.
293 *
294 * @param task the task
295 */
296 private void delayedExecute(RunnableScheduledFuture<?> task) {
297 if (isShutdown())
298 reject(task);
299 else {
300 super.getQueue().add(task);
301 if (isShutdown() &&
302 !canRunInCurrentRunState(task.isPeriodic()) &&
303 remove(task))
304 task.cancel(false);
305 else
306 ensurePrestart();
307 }
308 }
309
310 /**
311 * Requeues a periodic task unless current run state precludes it.
312 * Same idea as delayedExecute except drops task rather than rejecting.
313 *
314 * @param task the task
315 */
316 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
317 if (canRunInCurrentRunState(true)) {
318 super.getQueue().add(task);
319 if (!canRunInCurrentRunState(true) && remove(task))
320 task.cancel(false);
321 else
322 ensurePrestart();
323 }
324 }
325
326 /**
327 * Cancels and clears the queue of all tasks that should not be run
328 * due to shutdown policy. Invoked within super.shutdown.
329 */
330 @Override void onShutdown() {
331 BlockingQueue<Runnable> q = super.getQueue();
332 boolean keepDelayed =
333 getExecuteExistingDelayedTasksAfterShutdownPolicy();
334 boolean keepPeriodic =
335 getContinueExistingPeriodicTasksAfterShutdownPolicy();
336 if (!keepDelayed && !keepPeriodic) {
337 for (Object e : q.toArray())
338 if (e instanceof RunnableScheduledFuture<?>)
339 ((RunnableScheduledFuture<?>) e).cancel(false);
340 q.clear();
341 }
342 else {
343 // Traverse snapshot to avoid iterator exceptions
344 for (Object e : q.toArray()) {
345 if (e instanceof RunnableScheduledFuture) {
346 RunnableScheduledFuture<?> t =
347 (RunnableScheduledFuture<?>)e;
348 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
349 t.isCancelled()) { // also remove if already cancelled
350 if (q.remove(t))
351 t.cancel(false);
352 }
353 }
354 }
355 }
356 tryTerminate();
357 }
358
359 /**
360 * Modifies or replaces the task used to execute a runnable.
361 * This method can be used to override the concrete
362 * class used for managing internal tasks.
363 * The default implementation simply returns the given task.
364 *
365 * @param runnable the submitted Runnable
366 * @param task the task created to execute the runnable
367 * @param <V> the type of the task's result
368 * @return a task that can execute the runnable
369 * @since 1.6
370 */
371 protected <V> RunnableScheduledFuture<V> decorateTask(
372 Runnable runnable, RunnableScheduledFuture<V> task) {
373 return task;
374 }
375
376 /**
377 * Modifies or replaces the task used to execute a callable.
378 * This method can be used to override the concrete
379 * class used for managing internal tasks.
380 * The default implementation simply returns the given task.
381 *
382 * @param callable the submitted Callable
383 * @param task the task created to execute the callable
384 * @param <V> the type of the task's result
385 * @return a task that can execute the callable
386 * @since 1.6
387 */
388 protected <V> RunnableScheduledFuture<V> decorateTask(
389 Callable<V> callable, RunnableScheduledFuture<V> task) {
390 return task;
391 }
392
393 /**
394 * Creates a new {@code ScheduledThreadPoolExecutor} with the
395 * given core pool size.
396 *
397 * @param corePoolSize the number of threads to keep in the pool, even
398 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
399 * @throws IllegalArgumentException if {@code corePoolSize < 0}
400 */
401 public ScheduledThreadPoolExecutor(int corePoolSize) {
402 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
403 new DelayedWorkQueue());
404 }
405
406 /**
407 * Creates a new {@code ScheduledThreadPoolExecutor} with the
408 * given initial parameters.
409 *
410 * @param corePoolSize the number of threads to keep in the pool, even
411 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
412 * @param threadFactory the factory to use when the executor
413 * creates a new thread
414 * @throws IllegalArgumentException if {@code corePoolSize < 0}
415 * @throws NullPointerException if {@code threadFactory} is null
416 */
417 public ScheduledThreadPoolExecutor(int corePoolSize,
418 ThreadFactory threadFactory) {
419 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
420 new DelayedWorkQueue(), threadFactory);
421 }
422
423 /**
424 * Creates a new ScheduledThreadPoolExecutor with the given
425 * initial parameters.
426 *
427 * @param corePoolSize the number of threads to keep in the pool, even
428 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
429 * @param handler the handler to use when execution is blocked
430 * because the thread bounds and queue capacities are reached
431 * @throws IllegalArgumentException if {@code corePoolSize < 0}
432 * @throws NullPointerException if {@code handler} is null
433 */
434 public ScheduledThreadPoolExecutor(int corePoolSize,
435 RejectedExecutionHandler handler) {
436 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
437 new DelayedWorkQueue(), handler);
438 }
439
440 /**
441 * Creates a new ScheduledThreadPoolExecutor with the given
442 * initial parameters.
443 *
444 * @param corePoolSize the number of threads to keep in the pool, even
445 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
446 * @param threadFactory the factory to use when the executor
447 * creates a new thread
448 * @param handler the handler to use when execution is blocked
449 * because the thread bounds and queue capacities are reached
450 * @throws IllegalArgumentException if {@code corePoolSize < 0}
451 * @throws NullPointerException if {@code threadFactory} or
452 * {@code handler} is null
453 */
454 public ScheduledThreadPoolExecutor(int corePoolSize,
455 ThreadFactory threadFactory,
456 RejectedExecutionHandler handler) {
457 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
458 new DelayedWorkQueue(), threadFactory, handler);
459 }
460
461 /**
462 * Returns the nanoTime-based trigger time of a delayed action.
463 */
464 private long triggerTime(long delay, TimeUnit unit) {
465 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
466 }
467
468 /**
469 * Returns the nanoTime-based trigger time of a delayed action.
470 */
471 long triggerTime(long delay) {
472 return now() +
473 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
474 }
475
476 /**
477 * Constrains the values of all delays in the queue to be within
478 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
479 * This may occur if a task is eligible to be dequeued, but has
480 * not yet been, while some other task is added with a delay of
481 * Long.MAX_VALUE.
482 */
483 private long overflowFree(long delay) {
484 Delayed head = (Delayed) super.getQueue().peek();
485 if (head != null) {
486 long headDelay = head.getDelay(NANOSECONDS);
487 if (headDelay < 0 && (delay - headDelay < 0))
488 delay = Long.MAX_VALUE + headDelay;
489 }
490 return delay;
491 }
492
493 /**
494 * @throws RejectedExecutionException {@inheritDoc}
495 * @throws NullPointerException {@inheritDoc}
496 */
497 public ScheduledFuture<?> schedule(Runnable command,
498 long delay,
499 TimeUnit unit) {
500 if (command == null || unit == null)
501 throw new NullPointerException();
502 RunnableScheduledFuture<?> t = decorateTask(command,
503 new ScheduledFutureTask<Void>(command, null,
504 triggerTime(delay, unit)));
505 delayedExecute(t);
506 return t;
507 }
508
509 /**
510 * @throws RejectedExecutionException {@inheritDoc}
511 * @throws NullPointerException {@inheritDoc}
512 */
513 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
514 long delay,
515 TimeUnit unit) {
516 if (callable == null || unit == null)
517 throw new NullPointerException();
518 RunnableScheduledFuture<V> t = decorateTask(callable,
519 new ScheduledFutureTask<V>(callable,
520 triggerTime(delay, unit)));
521 delayedExecute(t);
522 return t;
523 }
524
525 /**
526 * @throws RejectedExecutionException {@inheritDoc}
527 * @throws NullPointerException {@inheritDoc}
528 * @throws IllegalArgumentException {@inheritDoc}
529 */
530 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
531 long initialDelay,
532 long period,
533 TimeUnit unit) {
534 if (command == null || unit == null)
535 throw new NullPointerException();
536 if (period <= 0)
537 throw new IllegalArgumentException();
538 ScheduledFutureTask<Void> sft =
539 new ScheduledFutureTask<Void>(command,
540 null,
541 triggerTime(initialDelay, unit),
542 unit.toNanos(period));
543 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
544 sft.outerTask = t;
545 delayedExecute(t);
546 return t;
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 * @throws IllegalArgumentException {@inheritDoc}
553 */
554 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
555 long initialDelay,
556 long delay,
557 TimeUnit unit) {
558 if (command == null || unit == null)
559 throw new NullPointerException();
560 if (delay <= 0)
561 throw new IllegalArgumentException();
562 ScheduledFutureTask<Void> sft =
563 new ScheduledFutureTask<Void>(command,
564 null,
565 triggerTime(initialDelay, unit),
566 unit.toNanos(-delay));
567 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
568 sft.outerTask = t;
569 delayedExecute(t);
570 return t;
571 }
572
573 /**
574 * Executes {@code command} with zero required delay.
575 * This has effect equivalent to
576 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
577 * Note that inspections of the queue and of the list returned by
578 * {@code shutdownNow} will access the zero-delayed
579 * {@link ScheduledFuture}, not the {@code command} itself.
580 *
581 * <p>A consequence of the use of {@code ScheduledFuture} objects is
582 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
583 * called with a null second {@code Throwable} argument, even if the
584 * {@code command} terminated abruptly. Instead, the {@code Throwable}
585 * thrown by such a task can be obtained via {@link Future#get}.
586 *
587 * @throws RejectedExecutionException at discretion of
588 * {@code RejectedExecutionHandler}, if the task
589 * cannot be accepted for execution because the
590 * executor has been shut down
591 * @throws NullPointerException {@inheritDoc}
592 */
593 public void execute(Runnable command) {
594 schedule(command, 0, NANOSECONDS);
595 }
596
597 // Override AbstractExecutorService methods
598
599 /**
600 * @throws RejectedExecutionException {@inheritDoc}
601 * @throws NullPointerException {@inheritDoc}
602 */
603 public Future<?> submit(Runnable task) {
604 return schedule(task, 0, NANOSECONDS);
605 }
606
607 /**
608 * @throws RejectedExecutionException {@inheritDoc}
609 * @throws NullPointerException {@inheritDoc}
610 */
611 public <T> Future<T> submit(Runnable task, T result) {
612 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
613 }
614
615 /**
616 * @throws RejectedExecutionException {@inheritDoc}
617 * @throws NullPointerException {@inheritDoc}
618 */
619 public <T> Future<T> submit(Callable<T> task) {
620 return schedule(task, 0, NANOSECONDS);
621 }
622
623 /**
624 * Sets the policy on whether to continue executing existing
625 * periodic tasks even when this executor has been {@code shutdown}.
626 * In this case, these tasks will only terminate upon
627 * {@code shutdownNow} or after setting the policy to
628 * {@code false} when already shutdown.
629 * This value is by default {@code false}.
630 *
631 * @param value if {@code true}, continue after shutdown, else don't
632 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
633 */
634 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
635 continueExistingPeriodicTasksAfterShutdown = value;
636 if (!value && isShutdown())
637 onShutdown();
638 }
639
640 /**
641 * Gets the policy on whether to continue executing existing
642 * periodic tasks even when this executor has been {@code shutdown}.
643 * In this case, these tasks will only terminate upon
644 * {@code shutdownNow} or after setting the policy to
645 * {@code false} when already shutdown.
646 * This value is by default {@code false}.
647 *
648 * @return {@code true} if will continue after shutdown
649 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
650 */
651 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
652 return continueExistingPeriodicTasksAfterShutdown;
653 }
654
655 /**
656 * Sets the policy on whether to execute existing delayed
657 * tasks even when this executor has been {@code shutdown}.
658 * In this case, these tasks will only terminate upon
659 * {@code shutdownNow}, or after setting the policy to
660 * {@code false} when already shutdown.
661 * This value is by default {@code true}.
662 *
663 * @param value if {@code true}, execute after shutdown, else don't
664 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
665 */
666 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
667 executeExistingDelayedTasksAfterShutdown = value;
668 if (!value && isShutdown())
669 onShutdown();
670 }
671
672 /**
673 * Gets the policy on whether to execute existing delayed
674 * tasks even when this executor has been {@code shutdown}.
675 * In this case, these tasks will only terminate upon
676 * {@code shutdownNow}, or after setting the policy to
677 * {@code false} when already shutdown.
678 * This value is by default {@code true}.
679 *
680 * @return {@code true} if will execute after shutdown
681 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
682 */
683 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
684 return executeExistingDelayedTasksAfterShutdown;
685 }
686
687 /**
688 * Sets the policy on whether cancelled tasks should be immediately
689 * removed from the work queue at time of cancellation. This value is
690 * by default {@code false}.
691 *
692 * @param value if {@code true}, remove on cancellation, else don't
693 * @see #getRemoveOnCancelPolicy
694 * @since 1.7
695 */
696 public void setRemoveOnCancelPolicy(boolean value) {
697 removeOnCancel = value;
698 }
699
700 /**
701 * Gets the policy on whether cancelled tasks should be immediately
702 * removed from the work queue at time of cancellation. This value is
703 * by default {@code false}.
704 *
705 * @return {@code true} if cancelled tasks are immediately removed
706 * from the queue
707 * @see #setRemoveOnCancelPolicy
708 * @since 1.7
709 */
710 public boolean getRemoveOnCancelPolicy() {
711 return removeOnCancel;
712 }
713
714 /**
715 * Initiates an orderly shutdown in which previously submitted
716 * tasks are executed, but no new tasks will be accepted.
717 * Invocation has no additional effect if already shut down.
718 *
719 * <p>This method does not wait for previously submitted tasks to
720 * complete execution. Use {@link #awaitTermination awaitTermination}
721 * to do that.
722 *
723 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
724 * has been set {@code false}, existing delayed tasks whose delays
725 * have not yet elapsed are cancelled. And unless the {@code
726 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
727 * {@code true}, future executions of existing periodic tasks will
728 * be cancelled.
729 *
730 * @throws SecurityException {@inheritDoc}
731 */
732 public void shutdown() {
733 super.shutdown();
734 }
735
736 /**
737 * Attempts to stop all actively executing tasks, halts the
738 * processing of waiting tasks, and returns a list of the tasks
739 * that were awaiting execution.
740 *
741 * <p>This method does not wait for actively executing tasks to
742 * terminate. Use {@link #awaitTermination awaitTermination} to
743 * do that.
744 *
745 * <p>There are no guarantees beyond best-effort attempts to stop
746 * processing actively executing tasks. This implementation
747 * cancels tasks via {@link Thread#interrupt}, so any task that
748 * fails to respond to interrupts may never terminate.
749 *
750 * @return list of tasks that never commenced execution.
751 * Each element of this list is a {@link ScheduledFuture},
752 * including those tasks submitted using {@code execute},
753 * which are for scheduling purposes used as the basis of a
754 * zero-delay {@code ScheduledFuture}.
755 * @throws SecurityException {@inheritDoc}
756 */
757 public List<Runnable> shutdownNow() {
758 return super.shutdownNow();
759 }
760
761 /**
762 * Returns the task queue used by this executor. Each element of
763 * this queue is a {@link ScheduledFuture}, including those
764 * tasks submitted using {@code execute} which are for scheduling
765 * purposes used as the basis of a zero-delay
766 * {@code ScheduledFuture}. Iteration over this queue is
767 * <em>not</em> guaranteed to traverse tasks in the order in
768 * which they will execute.
769 *
770 * @return the task queue
771 */
772 public BlockingQueue<Runnable> getQueue() {
773 return super.getQueue();
774 }
775
776 /**
777 * Specialized delay queue. To mesh with TPE declarations, this
778 * class must be declared as a BlockingQueue<Runnable> even though
779 * it can only hold RunnableScheduledFutures.
780 */
781 static class DelayedWorkQueue extends AbstractQueue<Runnable>
782 implements BlockingQueue<Runnable> {
783
784 /*
785 * A DelayedWorkQueue is based on a heap-based data structure
786 * like those in DelayQueue and PriorityQueue, except that
787 * every ScheduledFutureTask also records its index into the
788 * heap array. This eliminates the need to find a task upon
789 * cancellation, greatly speeding up removal (down from O(n)
790 * to O(log n)), and reducing garbage retention that would
791 * otherwise occur by waiting for the element to rise to top
792 * before clearing. But because the queue may also hold
793 * RunnableScheduledFutures that are not ScheduledFutureTasks,
794 * we are not guaranteed to have such indices available, in
795 * which case we fall back to linear search. (We expect that
796 * most tasks will not be decorated, and that the faster cases
797 * will be much more common.)
798 *
799 * All heap operations must record index changes -- mainly
800 * within siftUp and siftDown. Upon removal, a task's
801 * heapIndex is set to -1. Note that ScheduledFutureTasks can
802 * appear at most once in the queue (this need not be true for
803 * other kinds of tasks or work queues), so are uniquely
804 * identified by heapIndex.
805 */
806
807 private static final int INITIAL_CAPACITY = 16;
808 private RunnableScheduledFuture<?>[] queue =
809 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
810 private final ReentrantLock lock = new ReentrantLock();
811 private int size = 0;
812
813 /**
814 * Thread designated to wait for the task at the head of the
815 * queue. This variant of the Leader-Follower pattern
816 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
817 * minimize unnecessary timed waiting. When a thread becomes
818 * the leader, it waits only for the next delay to elapse, but
819 * other threads await indefinitely. The leader thread must
820 * signal some other thread before returning from take() or
821 * poll(...), unless some other thread becomes leader in the
822 * interim. Whenever the head of the queue is replaced with a
823 * task with an earlier expiration time, the leader field is
824 * invalidated by being reset to null, and some waiting
825 * thread, but not necessarily the current leader, is
826 * signalled. So waiting threads must be prepared to acquire
827 * and lose leadership while waiting.
828 */
829 private Thread leader = null;
830
831 /**
832 * Condition signalled when a newer task becomes available at the
833 * head of the queue or a new thread may need to become leader.
834 */
835 private final Condition available = lock.newCondition();
836
837 /**
838 * Sets f's heapIndex if it is a ScheduledFutureTask.
839 */
840 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
841 if (f instanceof ScheduledFutureTask)
842 ((ScheduledFutureTask)f).heapIndex = idx;
843 }
844
845 /**
846 * Sifts element added at bottom up to its heap-ordered spot.
847 * Call only when holding lock.
848 */
849 private void siftUp(int k, RunnableScheduledFuture<?> key) {
850 while (k > 0) {
851 int parent = (k - 1) >>> 1;
852 RunnableScheduledFuture<?> e = queue[parent];
853 if (key.compareTo(e) >= 0)
854 break;
855 queue[k] = e;
856 setIndex(e, k);
857 k = parent;
858 }
859 queue[k] = key;
860 setIndex(key, k);
861 }
862
863 /**
864 * Sifts element added at top down to its heap-ordered spot.
865 * Call only when holding lock.
866 */
867 private void siftDown(int k, RunnableScheduledFuture<?> key) {
868 int half = size >>> 1;
869 while (k < half) {
870 int child = (k << 1) + 1;
871 RunnableScheduledFuture<?> c = queue[child];
872 int right = child + 1;
873 if (right < size && c.compareTo(queue[right]) > 0)
874 c = queue[child = right];
875 if (key.compareTo(c) <= 0)
876 break;
877 queue[k] = c;
878 setIndex(c, k);
879 k = child;
880 }
881 queue[k] = key;
882 setIndex(key, k);
883 }
884
885 /**
886 * Resizes the heap array. Call only when holding lock.
887 */
888 private void grow() {
889 int oldCapacity = queue.length;
890 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
891 if (newCapacity < 0) // overflow
892 newCapacity = Integer.MAX_VALUE;
893 queue = Arrays.copyOf(queue, newCapacity);
894 }
895
896 /**
897 * Finds index of given object, or -1 if absent.
898 */
899 private int indexOf(Object x) {
900 if (x != null) {
901 if (x instanceof ScheduledFutureTask) {
902 int i = ((ScheduledFutureTask) x).heapIndex;
903 // Sanity check; x could conceivably be a
904 // ScheduledFutureTask from some other pool.
905 if (i >= 0 && i < size && queue[i] == x)
906 return i;
907 } else {
908 for (int i = 0; i < size; i++)
909 if (x.equals(queue[i]))
910 return i;
911 }
912 }
913 return -1;
914 }
915
916 public boolean contains(Object x) {
917 final ReentrantLock lock = this.lock;
918 lock.lock();
919 try {
920 return indexOf(x) != -1;
921 } finally {
922 lock.unlock();
923 }
924 }
925
926 public boolean remove(Object x) {
927 final ReentrantLock lock = this.lock;
928 lock.lock();
929 try {
930 int i = indexOf(x);
931 if (i < 0)
932 return false;
933
934 setIndex(queue[i], -1);
935 int s = --size;
936 RunnableScheduledFuture<?> replacement = queue[s];
937 queue[s] = null;
938 if (s != i) {
939 siftDown(i, replacement);
940 if (queue[i] == replacement)
941 siftUp(i, replacement);
942 }
943 return true;
944 } finally {
945 lock.unlock();
946 }
947 }
948
949 public int size() {
950 final ReentrantLock lock = this.lock;
951 lock.lock();
952 try {
953 return size;
954 } finally {
955 lock.unlock();
956 }
957 }
958
959 public boolean isEmpty() {
960 return size() == 0;
961 }
962
963 public int remainingCapacity() {
964 return Integer.MAX_VALUE;
965 }
966
967 public RunnableScheduledFuture<?> peek() {
968 final ReentrantLock lock = this.lock;
969 lock.lock();
970 try {
971 return queue[0];
972 } finally {
973 lock.unlock();
974 }
975 }
976
977 public boolean offer(Runnable x) {
978 if (x == null)
979 throw new NullPointerException();
980 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
981 final ReentrantLock lock = this.lock;
982 lock.lock();
983 try {
984 int i = size;
985 if (i >= queue.length)
986 grow();
987 size = i + 1;
988 if (i == 0) {
989 queue[0] = e;
990 setIndex(e, 0);
991 } else {
992 siftUp(i, e);
993 }
994 if (queue[0] == e) {
995 leader = null;
996 available.signal();
997 }
998 } finally {
999 lock.unlock();
1000 }
1001 return true;
1002 }
1003
1004 public void put(Runnable e) {
1005 offer(e);
1006 }
1007
1008 public boolean add(Runnable e) {
1009 return offer(e);
1010 }
1011
1012 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1013 return offer(e);
1014 }
1015
1016 /**
1017 * Performs common bookkeeping for poll and take: Replaces
1018 * first element with last and sifts it down. Call only when
1019 * holding lock.
1020 * @param f the task to remove and return
1021 */
1022 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1023 int s = --size;
1024 RunnableScheduledFuture<?> x = queue[s];
1025 queue[s] = null;
1026 if (s != 0)
1027 siftDown(0, x);
1028 setIndex(f, -1);
1029 return f;
1030 }
1031
1032 public RunnableScheduledFuture<?> poll() {
1033 final ReentrantLock lock = this.lock;
1034 lock.lock();
1035 try {
1036 RunnableScheduledFuture<?> first = queue[0];
1037 if (first == null || first.getDelay(NANOSECONDS) > 0)
1038 return null;
1039 else
1040 return finishPoll(first);
1041 } finally {
1042 lock.unlock();
1043 }
1044 }
1045
1046 public RunnableScheduledFuture<?> take() throws InterruptedException {
1047 final ReentrantLock lock = this.lock;
1048 lock.lockInterruptibly();
1049 try {
1050 for (;;) {
1051 RunnableScheduledFuture<?> first = queue[0];
1052 if (first == null)
1053 available.await();
1054 else {
1055 long delay = first.getDelay(NANOSECONDS);
1056 if (delay <= 0)
1057 return finishPoll(first);
1058 first = null; // don't retain ref while waiting
1059 if (leader != null)
1060 available.await();
1061 else {
1062 Thread thisThread = Thread.currentThread();
1063 leader = thisThread;
1064 try {
1065 available.awaitNanos(delay);
1066 } finally {
1067 if (leader == thisThread)
1068 leader = null;
1069 }
1070 }
1071 }
1072 }
1073 } finally {
1074 if (leader == null && queue[0] != null)
1075 available.signal();
1076 lock.unlock();
1077 }
1078 }
1079
1080 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1081 throws InterruptedException {
1082 long nanos = unit.toNanos(timeout);
1083 final ReentrantLock lock = this.lock;
1084 lock.lockInterruptibly();
1085 try {
1086 for (;;) {
1087 RunnableScheduledFuture<?> first = queue[0];
1088 if (first == null) {
1089 if (nanos <= 0)
1090 return null;
1091 else
1092 nanos = available.awaitNanos(nanos);
1093 } else {
1094 long delay = first.getDelay(NANOSECONDS);
1095 if (delay <= 0)
1096 return finishPoll(first);
1097 if (nanos <= 0)
1098 return null;
1099 first = null; // don't retain ref while waiting
1100 if (nanos < delay || leader != null)
1101 nanos = available.awaitNanos(nanos);
1102 else {
1103 Thread thisThread = Thread.currentThread();
1104 leader = thisThread;
1105 try {
1106 long timeLeft = available.awaitNanos(delay);
1107 nanos -= delay - timeLeft;
1108 } finally {
1109 if (leader == thisThread)
1110 leader = null;
1111 }
1112 }
1113 }
1114 }
1115 } finally {
1116 if (leader == null && queue[0] != null)
1117 available.signal();
1118 lock.unlock();
1119 }
1120 }
1121
1122 public void clear() {
1123 final ReentrantLock lock = this.lock;
1124 lock.lock();
1125 try {
1126 for (int i = 0; i < size; i++) {
1127 RunnableScheduledFuture<?> t = queue[i];
1128 if (t != null) {
1129 queue[i] = null;
1130 setIndex(t, -1);
1131 }
1132 }
1133 size = 0;
1134 } finally {
1135 lock.unlock();
1136 }
1137 }
1138
1139 /**
1140 * Returns first element only if it is expired.
1141 * Used only by drainTo. Call only when holding lock.
1142 */
1143 private RunnableScheduledFuture<?> peekExpired() {
1144 // assert lock.isHeldByCurrentThread();
1145 RunnableScheduledFuture<?> first = queue[0];
1146 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1147 null : first;
1148 }
1149
1150 public int drainTo(Collection<? super Runnable> c) {
1151 if (c == null)
1152 throw new NullPointerException();
1153 if (c == this)
1154 throw new IllegalArgumentException();
1155 final ReentrantLock lock = this.lock;
1156 lock.lock();
1157 try {
1158 RunnableScheduledFuture<?> first;
1159 int n = 0;
1160 while ((first = peekExpired()) != null) {
1161 c.add(first); // In this order, in case add() throws.
1162 finishPoll(first);
1163 ++n;
1164 }
1165 return n;
1166 } finally {
1167 lock.unlock();
1168 }
1169 }
1170
1171 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1172 if (c == null)
1173 throw new NullPointerException();
1174 if (c == this)
1175 throw new IllegalArgumentException();
1176 if (maxElements <= 0)
1177 return 0;
1178 final ReentrantLock lock = this.lock;
1179 lock.lock();
1180 try {
1181 RunnableScheduledFuture<?> first;
1182 int n = 0;
1183 while (n < maxElements && (first = peekExpired()) != null) {
1184 c.add(first); // In this order, in case add() throws.
1185 finishPoll(first);
1186 ++n;
1187 }
1188 return n;
1189 } finally {
1190 lock.unlock();
1191 }
1192 }
1193
1194 public Object[] toArray() {
1195 final ReentrantLock lock = this.lock;
1196 lock.lock();
1197 try {
1198 return Arrays.copyOf(queue, size, Object[].class);
1199 } finally {
1200 lock.unlock();
1201 }
1202 }
1203
1204 @SuppressWarnings("unchecked")
1205 public <T> T[] toArray(T[] a) {
1206 final ReentrantLock lock = this.lock;
1207 lock.lock();
1208 try {
1209 if (a.length < size)
1210 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1211 System.arraycopy(queue, 0, a, 0, size);
1212 if (a.length > size)
1213 a[size] = null;
1214 return a;
1215 } finally {
1216 lock.unlock();
1217 }
1218 }
1219
1220 public Iterator<Runnable> iterator() {
1221 return new Itr(Arrays.copyOf(queue, size));
1222 }
1223
1224 /**
1225 * Snapshot iterator that works off copy of underlying q array.
1226 */
1227 private class Itr implements Iterator<Runnable> {
1228 final RunnableScheduledFuture<?>[] array;
1229 int cursor = 0; // index of next element to return
1230 int lastRet = -1; // index of last element, or -1 if no such
1231
1232 Itr(RunnableScheduledFuture<?>[] array) {
1233 this.array = array;
1234 }
1235
1236 public boolean hasNext() {
1237 return cursor < array.length;
1238 }
1239
1240 public Runnable next() {
1241 if (cursor >= array.length)
1242 throw new NoSuchElementException();
1243 lastRet = cursor;
1244 return array[cursor++];
1245 }
1246
1247 public void remove() {
1248 if (lastRet < 0)
1249 throw new IllegalStateException();
1250 DelayedWorkQueue.this.remove(array[lastRet]);
1251 lastRet = -1;
1252 }
1253 }
1254 }
1255 }