ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.77
Committed: Wed Feb 19 20:33:25 2014 UTC (10 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.76: +15 -11 lines
Log Message:
clarify nature of ScheduledFuture elements in queue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.atomic.AtomicLong;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13
14 /**
15 * A {@link ThreadPoolExecutor} that can additionally schedule
16 * commands to run after a given delay, or to execute periodically.
17 * This class is preferable to {@link java.util.Timer} when multiple
18 * worker threads are needed, or when the additional flexibility or
19 * capabilities of {@link ThreadPoolExecutor} (which this class
20 * extends) are required.
21 *
22 * <p>Delayed tasks execute no sooner than they are enabled, but
23 * without any real-time guarantees about when, after they are
24 * enabled, they will commence. Tasks scheduled for exactly the same
25 * execution time are enabled in first-in-first-out (FIFO) order of
26 * submission.
27 *
28 * <p>When a submitted task is cancelled before it is run, execution
29 * is suppressed. By default, such a cancelled task is not
30 * automatically removed from the work queue until its delay elapses.
31 * While this enables further inspection and monitoring, it may also
32 * cause unbounded retention of cancelled tasks. To avoid this, use
33 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
34 * removed from the work queue at time of cancellation.
35 *
36 * <p>Successive executions of a periodic task scheduled via
37 * {@link #scheduleAtFixedRate} or
38 * {@link #scheduleWithFixedDelay} do not overlap. While different
39 * executions may be performed by different threads, the effects of
40 * prior executions <a
41 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
42 * those of subsequent ones.
43 *
44 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
45 * of the inherited tuning methods are not useful for it. In
46 * particular, because it acts as a fixed-sized pool using
47 * {@code corePoolSize} threads and an unbounded queue, adjustments
48 * to {@code maximumPoolSize} have no useful effect. Additionally, it
49 * is almost never a good idea to set {@code corePoolSize} to zero or
50 * use {@code allowCoreThreadTimeOut} because this may leave the pool
51 * without threads to handle tasks once they become eligible to run.
52 *
53 * <p><b>Extension notes:</b> This class overrides the
54 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
55 * {@link AbstractExecutorService#submit(Runnable) submit}
56 * methods to generate internal {@link ScheduledFuture} objects to
57 * control per-task delays and scheduling. To preserve
58 * functionality, any further overrides of these methods in
59 * subclasses must invoke superclass versions, which effectively
60 * disables additional task customization. However, this class
61 * provides alternative protected extension method
62 * {@code decorateTask} (one version each for {@code Runnable} and
63 * {@code Callable}) that can be used to customize the concrete task
64 * types used to execute commands entered via {@code execute},
65 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
66 * and {@code scheduleWithFixedDelay}. By default, a
67 * {@code ScheduledThreadPoolExecutor} uses a task type extending
68 * {@link FutureTask}. However, this may be modified or replaced using
69 * subclasses of the form:
70 *
71 * <pre> {@code
72 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
73 *
74 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
75 *
76 * protected <V> RunnableScheduledFuture<V> decorateTask(
77 * Runnable r, RunnableScheduledFuture<V> task) {
78 * return new CustomTask<V>(r, task);
79 * }
80 *
81 * protected <V> RunnableScheduledFuture<V> decorateTask(
82 * Callable<V> c, RunnableScheduledFuture<V> task) {
83 * return new CustomTask<V>(c, task);
84 * }
85 * // ... add constructors, etc.
86 * }}</pre>
87 *
88 * @since 1.5
89 * @author Doug Lea
90 */
91 public class ScheduledThreadPoolExecutor
92 extends ThreadPoolExecutor
93 implements ScheduledExecutorService {
94
95 /*
96 * This class specializes ThreadPoolExecutor implementation by
97 *
98 * 1. Using a custom task type, ScheduledFutureTask for
99 * tasks, even those that don't require scheduling (i.e.,
100 * those submitted using ExecutorService execute, not
101 * ScheduledExecutorService methods) which are treated as
102 * delayed tasks with a delay of zero.
103 *
104 * 2. Using a custom queue (DelayedWorkQueue), a variant of
105 * unbounded DelayQueue. The lack of capacity constraint and
106 * the fact that corePoolSize and maximumPoolSize are
107 * effectively identical simplifies some execution mechanics
108 * (see delayedExecute) compared to ThreadPoolExecutor.
109 *
110 * 3. Supporting optional run-after-shutdown parameters, which
111 * leads to overrides of shutdown methods to remove and cancel
112 * tasks that should NOT be run after shutdown, as well as
113 * different recheck logic when task (re)submission overlaps
114 * with a shutdown.
115 *
116 * 4. Task decoration methods to allow interception and
117 * instrumentation, which are needed because subclasses cannot
118 * otherwise override submit methods to get this effect. These
119 * don't have any impact on pool control logic though.
120 */
121
122 /**
123 * False if should cancel/suppress periodic tasks on shutdown.
124 */
125 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
126
127 /**
128 * False if should cancel non-periodic tasks on shutdown.
129 */
130 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
131
132 /**
133 * True if ScheduledFutureTask.cancel should remove from queue.
134 */
135 private volatile boolean removeOnCancel = false;
136
137 /**
138 * Sequence number to break scheduling ties, and in turn to
139 * guarantee FIFO order among tied entries.
140 */
141 private static final AtomicLong sequencer = new AtomicLong();
142
143 /**
144 * Returns current nanosecond time.
145 */
146 final long now() {
147 return System.nanoTime();
148 }
149
150 private class ScheduledFutureTask<V>
151 extends FutureTask<V> implements RunnableScheduledFuture<V> {
152
153 /** Sequence number to break ties FIFO */
154 private final long sequenceNumber;
155
156 /** The time the task is enabled to execute in nanoTime units */
157 private long time;
158
159 /**
160 * Period in nanoseconds for repeating tasks.
161 * A positive value indicates fixed-rate execution.
162 * A negative value indicates fixed-delay execution.
163 * A value of 0 indicates a non-repeating (one-shot) task.
164 */
165 private final long period;
166
167 /** The actual task to be re-enqueued by reExecutePeriodic */
168 RunnableScheduledFuture<V> outerTask = this;
169
170 /**
171 * Index into delay queue, to support faster cancellation.
172 */
173 int heapIndex;
174
175 /**
176 * Creates a one-shot action with given nanoTime-based trigger time.
177 */
178 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
179 super(r, result);
180 this.time = triggerTime;
181 this.period = 0;
182 this.sequenceNumber = sequencer.getAndIncrement();
183 }
184
185 /**
186 * Creates a periodic action with given nanoTime-based initial
187 * trigger time and period.
188 */
189 ScheduledFutureTask(Runnable r, V result, long triggerTime,
190 long period) {
191 super(r, result);
192 this.time = triggerTime;
193 this.period = period;
194 this.sequenceNumber = sequencer.getAndIncrement();
195 }
196
197 /**
198 * Creates a one-shot action with given nanoTime-based trigger time.
199 */
200 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
201 super(callable);
202 this.time = triggerTime;
203 this.period = 0;
204 this.sequenceNumber = sequencer.getAndIncrement();
205 }
206
207 public long getDelay(TimeUnit unit) {
208 return unit.convert(time - now(), NANOSECONDS);
209 }
210
211 public int compareTo(Delayed other) {
212 if (other == this) // compare zero if same object
213 return 0;
214 if (other instanceof ScheduledFutureTask) {
215 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
216 long diff = time - x.time;
217 if (diff < 0)
218 return -1;
219 else if (diff > 0)
220 return 1;
221 else if (sequenceNumber < x.sequenceNumber)
222 return -1;
223 else
224 return 1;
225 }
226 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
227 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
228 }
229
230 /**
231 * Returns {@code true} if this is a periodic (not a one-shot) action.
232 *
233 * @return {@code true} if periodic
234 */
235 public boolean isPeriodic() {
236 return period != 0;
237 }
238
239 /**
240 * Sets the next time to run for a periodic task.
241 */
242 private void setNextRunTime() {
243 long p = period;
244 if (p > 0)
245 time += p;
246 else
247 time = triggerTime(-p);
248 }
249
250 public boolean cancel(boolean mayInterruptIfRunning) {
251 boolean cancelled = super.cancel(mayInterruptIfRunning);
252 if (cancelled && removeOnCancel && heapIndex >= 0)
253 remove(this);
254 return cancelled;
255 }
256
257 /**
258 * Overrides FutureTask version so as to reset/requeue if periodic.
259 */
260 public void run() {
261 boolean periodic = isPeriodic();
262 if (!canRunInCurrentRunState(periodic))
263 cancel(false);
264 else if (!periodic)
265 ScheduledFutureTask.super.run();
266 else if (ScheduledFutureTask.super.runAndReset()) {
267 setNextRunTime();
268 reExecutePeriodic(outerTask);
269 }
270 }
271 }
272
273 /**
274 * Returns true if can run a task given current run state
275 * and run-after-shutdown parameters.
276 *
277 * @param periodic true if this task periodic, false if delayed
278 */
279 boolean canRunInCurrentRunState(boolean periodic) {
280 return isRunningOrShutdown(periodic ?
281 continueExistingPeriodicTasksAfterShutdown :
282 executeExistingDelayedTasksAfterShutdown);
283 }
284
285 /**
286 * Main execution method for delayed or periodic tasks. If pool
287 * is shut down, rejects the task. Otherwise adds task to queue
288 * and starts a thread, if necessary, to run it. (We cannot
289 * prestart the thread to run the task because the task (probably)
290 * shouldn't be run yet.) If the pool is shut down while the task
291 * is being added, cancel and remove it if required by state and
292 * run-after-shutdown parameters.
293 *
294 * @param task the task
295 */
296 private void delayedExecute(RunnableScheduledFuture<?> task) {
297 if (isShutdown())
298 reject(task);
299 else {
300 super.getQueue().add(task);
301 if (isShutdown() &&
302 !canRunInCurrentRunState(task.isPeriodic()) &&
303 remove(task))
304 task.cancel(false);
305 else
306 ensurePrestart();
307 }
308 }
309
310 /**
311 * Requeues a periodic task unless current run state precludes it.
312 * Same idea as delayedExecute except drops task rather than rejecting.
313 *
314 * @param task the task
315 */
316 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
317 if (canRunInCurrentRunState(true)) {
318 super.getQueue().add(task);
319 if (!canRunInCurrentRunState(true) && remove(task))
320 task.cancel(false);
321 else
322 ensurePrestart();
323 }
324 }
325
326 /**
327 * Cancels and clears the queue of all tasks that should not be run
328 * due to shutdown policy. Invoked within super.shutdown.
329 */
330 @Override void onShutdown() {
331 BlockingQueue<Runnable> q = super.getQueue();
332 boolean keepDelayed =
333 getExecuteExistingDelayedTasksAfterShutdownPolicy();
334 boolean keepPeriodic =
335 getContinueExistingPeriodicTasksAfterShutdownPolicy();
336 if (!keepDelayed && !keepPeriodic) {
337 for (Object e : q.toArray())
338 if (e instanceof RunnableScheduledFuture<?>)
339 ((RunnableScheduledFuture<?>) e).cancel(false);
340 q.clear();
341 }
342 else {
343 // Traverse snapshot to avoid iterator exceptions
344 for (Object e : q.toArray()) {
345 if (e instanceof RunnableScheduledFuture) {
346 RunnableScheduledFuture<?> t =
347 (RunnableScheduledFuture<?>)e;
348 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
349 t.isCancelled()) { // also remove if already cancelled
350 if (q.remove(t))
351 t.cancel(false);
352 }
353 }
354 }
355 }
356 tryTerminate();
357 }
358
359 /**
360 * Modifies or replaces the task used to execute a runnable.
361 * This method can be used to override the concrete
362 * class used for managing internal tasks.
363 * The default implementation simply returns the given task.
364 *
365 * @param runnable the submitted Runnable
366 * @param task the task created to execute the runnable
367 * @param <V> the type of the task's result
368 * @return a task that can execute the runnable
369 * @since 1.6
370 */
371 protected <V> RunnableScheduledFuture<V> decorateTask(
372 Runnable runnable, RunnableScheduledFuture<V> task) {
373 return task;
374 }
375
376 /**
377 * Modifies or replaces the task used to execute a callable.
378 * This method can be used to override the concrete
379 * class used for managing internal tasks.
380 * The default implementation simply returns the given task.
381 *
382 * @param callable the submitted Callable
383 * @param task the task created to execute the callable
384 * @param <V> the type of the task's result
385 * @return a task that can execute the callable
386 * @since 1.6
387 */
388 protected <V> RunnableScheduledFuture<V> decorateTask(
389 Callable<V> callable, RunnableScheduledFuture<V> task) {
390 return task;
391 }
392
393 /**
394 * Creates a new {@code ScheduledThreadPoolExecutor} with the
395 * given core pool size.
396 *
397 * @param corePoolSize the number of threads to keep in the pool, even
398 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
399 * @throws IllegalArgumentException if {@code corePoolSize < 0}
400 */
401 public ScheduledThreadPoolExecutor(int corePoolSize) {
402 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
403 new DelayedWorkQueue());
404 }
405
406 /**
407 * Creates a new {@code ScheduledThreadPoolExecutor} with the
408 * given initial parameters.
409 *
410 * @param corePoolSize the number of threads to keep in the pool, even
411 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
412 * @param threadFactory the factory to use when the executor
413 * creates a new thread
414 * @throws IllegalArgumentException if {@code corePoolSize < 0}
415 * @throws NullPointerException if {@code threadFactory} is null
416 */
417 public ScheduledThreadPoolExecutor(int corePoolSize,
418 ThreadFactory threadFactory) {
419 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
420 new DelayedWorkQueue(), threadFactory);
421 }
422
423 /**
424 * Creates a new ScheduledThreadPoolExecutor with the given
425 * initial parameters.
426 *
427 * @param corePoolSize the number of threads to keep in the pool, even
428 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
429 * @param handler the handler to use when execution is blocked
430 * because the thread bounds and queue capacities are reached
431 * @throws IllegalArgumentException if {@code corePoolSize < 0}
432 * @throws NullPointerException if {@code handler} is null
433 */
434 public ScheduledThreadPoolExecutor(int corePoolSize,
435 RejectedExecutionHandler handler) {
436 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
437 new DelayedWorkQueue(), handler);
438 }
439
440 /**
441 * Creates a new ScheduledThreadPoolExecutor with the given
442 * initial parameters.
443 *
444 * @param corePoolSize the number of threads to keep in the pool, even
445 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
446 * @param threadFactory the factory to use when the executor
447 * creates a new thread
448 * @param handler the handler to use when execution is blocked
449 * because the thread bounds and queue capacities are reached
450 * @throws IllegalArgumentException if {@code corePoolSize < 0}
451 * @throws NullPointerException if {@code threadFactory} or
452 * {@code handler} is null
453 */
454 public ScheduledThreadPoolExecutor(int corePoolSize,
455 ThreadFactory threadFactory,
456 RejectedExecutionHandler handler) {
457 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
458 new DelayedWorkQueue(), threadFactory, handler);
459 }
460
461 /**
462 * Returns the nanoTime-based trigger time of a delayed action.
463 */
464 private long triggerTime(long delay, TimeUnit unit) {
465 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
466 }
467
468 /**
469 * Returns the nanoTime-based trigger time of a delayed action.
470 */
471 long triggerTime(long delay) {
472 return now() +
473 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
474 }
475
476 /**
477 * Constrains the values of all delays in the queue to be within
478 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
479 * This may occur if a task is eligible to be dequeued, but has
480 * not yet been, while some other task is added with a delay of
481 * Long.MAX_VALUE.
482 */
483 private long overflowFree(long delay) {
484 Delayed head = (Delayed) super.getQueue().peek();
485 if (head != null) {
486 long headDelay = head.getDelay(NANOSECONDS);
487 if (headDelay < 0 && (delay - headDelay < 0))
488 delay = Long.MAX_VALUE + headDelay;
489 }
490 return delay;
491 }
492
493 /**
494 * @throws RejectedExecutionException {@inheritDoc}
495 * @throws NullPointerException {@inheritDoc}
496 */
497 public ScheduledFuture<?> schedule(Runnable command,
498 long delay,
499 TimeUnit unit) {
500 if (command == null || unit == null)
501 throw new NullPointerException();
502 RunnableScheduledFuture<Void> t = decorateTask(command,
503 new ScheduledFutureTask<Void>(command, null,
504 triggerTime(delay, unit)));
505 delayedExecute(t);
506 return t;
507 }
508
509 /**
510 * @throws RejectedExecutionException {@inheritDoc}
511 * @throws NullPointerException {@inheritDoc}
512 */
513 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
514 long delay,
515 TimeUnit unit) {
516 if (callable == null || unit == null)
517 throw new NullPointerException();
518 RunnableScheduledFuture<V> t = decorateTask(callable,
519 new ScheduledFutureTask<V>(callable,
520 triggerTime(delay, unit)));
521 delayedExecute(t);
522 return t;
523 }
524
525 /**
526 * @throws RejectedExecutionException {@inheritDoc}
527 * @throws NullPointerException {@inheritDoc}
528 * @throws IllegalArgumentException {@inheritDoc}
529 */
530 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
531 long initialDelay,
532 long period,
533 TimeUnit unit) {
534 if (command == null || unit == null)
535 throw new NullPointerException();
536 if (period <= 0)
537 throw new IllegalArgumentException();
538 ScheduledFutureTask<Void> sft =
539 new ScheduledFutureTask<Void>(command,
540 null,
541 triggerTime(initialDelay, unit),
542 unit.toNanos(period));
543 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
544 sft.outerTask = t;
545 delayedExecute(t);
546 return t;
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 * @throws IllegalArgumentException {@inheritDoc}
553 */
554 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
555 long initialDelay,
556 long delay,
557 TimeUnit unit) {
558 if (command == null || unit == null)
559 throw new NullPointerException();
560 if (delay <= 0)
561 throw new IllegalArgumentException();
562 ScheduledFutureTask<Void> sft =
563 new ScheduledFutureTask<Void>(command,
564 null,
565 triggerTime(initialDelay, unit),
566 unit.toNanos(-delay));
567 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
568 sft.outerTask = t;
569 delayedExecute(t);
570 return t;
571 }
572
573 /**
574 * Executes {@code command} with zero required delay.
575 * This has effect equivalent to
576 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
577 * Note that inspections of the queue and of the list returned by
578 * {@code shutdownNow} will access the zero-delayed
579 * {@link ScheduledFuture}, not the {@code command} itself.
580 *
581 * <p>A consequence of the use of {@code ScheduledFuture} objects is
582 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
583 * called with a null second {@code Throwable} argument, even if the
584 * {@code command} terminated abruptly. Instead, the {@code Throwable}
585 * thrown by such a task can be obtained via {@link Future#get}.
586 *
587 * @throws RejectedExecutionException at discretion of
588 * {@code RejectedExecutionHandler}, if the task
589 * cannot be accepted for execution because the
590 * executor has been shut down
591 * @throws NullPointerException {@inheritDoc}
592 */
593 public void execute(Runnable command) {
594 schedule(command, 0, NANOSECONDS);
595 }
596
597 // Override AbstractExecutorService methods
598
599 /**
600 * @throws RejectedExecutionException {@inheritDoc}
601 * @throws NullPointerException {@inheritDoc}
602 */
603 public Future<?> submit(Runnable task) {
604 return schedule(task, 0, NANOSECONDS);
605 }
606
607 /**
608 * @throws RejectedExecutionException {@inheritDoc}
609 * @throws NullPointerException {@inheritDoc}
610 */
611 public <T> Future<T> submit(Runnable task, T result) {
612 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
613 }
614
615 /**
616 * @throws RejectedExecutionException {@inheritDoc}
617 * @throws NullPointerException {@inheritDoc}
618 */
619 public <T> Future<T> submit(Callable<T> task) {
620 return schedule(task, 0, NANOSECONDS);
621 }
622
623 /**
624 * Sets the policy on whether to continue executing existing
625 * periodic tasks even when this executor has been {@code shutdown}.
626 * In this case, these tasks will only terminate upon
627 * {@code shutdownNow} or after setting the policy to
628 * {@code false} when already shutdown.
629 * This value is by default {@code false}.
630 *
631 * @param value if {@code true}, continue after shutdown, else don't
632 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
633 */
634 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
635 continueExistingPeriodicTasksAfterShutdown = value;
636 if (!value && isShutdown())
637 onShutdown();
638 }
639
640 /**
641 * Gets the policy on whether to continue executing existing
642 * periodic tasks even when this executor has been {@code shutdown}.
643 * In this case, these tasks will only terminate upon
644 * {@code shutdownNow} or after setting the policy to
645 * {@code false} when already shutdown.
646 * This value is by default {@code false}.
647 *
648 * @return {@code true} if will continue after shutdown
649 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
650 */
651 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
652 return continueExistingPeriodicTasksAfterShutdown;
653 }
654
655 /**
656 * Sets the policy on whether to execute existing delayed
657 * tasks even when this executor has been {@code shutdown}.
658 * In this case, these tasks will only terminate upon
659 * {@code shutdownNow}, or after setting the policy to
660 * {@code false} when already shutdown.
661 * This value is by default {@code true}.
662 *
663 * @param value if {@code true}, execute after shutdown, else don't
664 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
665 */
666 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
667 executeExistingDelayedTasksAfterShutdown = value;
668 if (!value && isShutdown())
669 onShutdown();
670 }
671
672 /**
673 * Gets the policy on whether to execute existing delayed
674 * tasks even when this executor has been {@code shutdown}.
675 * In this case, these tasks will only terminate upon
676 * {@code shutdownNow}, or after setting the policy to
677 * {@code false} when already shutdown.
678 * This value is by default {@code true}.
679 *
680 * @return {@code true} if will execute after shutdown
681 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
682 */
683 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
684 return executeExistingDelayedTasksAfterShutdown;
685 }
686
687 /**
688 * Sets the policy on whether cancelled tasks should be immediately
689 * removed from the work queue at time of cancellation. This value is
690 * by default {@code false}.
691 *
692 * @param value if {@code true}, remove on cancellation, else don't
693 * @see #getRemoveOnCancelPolicy
694 * @since 1.7
695 */
696 public void setRemoveOnCancelPolicy(boolean value) {
697 removeOnCancel = value;
698 }
699
700 /**
701 * Gets the policy on whether cancelled tasks should be immediately
702 * removed from the work queue at time of cancellation. This value is
703 * by default {@code false}.
704 *
705 * @return {@code true} if cancelled tasks are immediately removed
706 * from the queue
707 * @see #setRemoveOnCancelPolicy
708 * @since 1.7
709 */
710 public boolean getRemoveOnCancelPolicy() {
711 return removeOnCancel;
712 }
713
714 /**
715 * Initiates an orderly shutdown in which previously submitted
716 * tasks are executed, but no new tasks will be accepted.
717 * Invocation has no additional effect if already shut down.
718 *
719 * <p>This method does not wait for previously submitted tasks to
720 * complete execution. Use {@link #awaitTermination awaitTermination}
721 * to do that.
722 *
723 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
724 * has been set {@code false}, existing delayed tasks whose delays
725 * have not yet elapsed are cancelled. And unless the {@code
726 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
727 * {@code true}, future executions of existing periodic tasks will
728 * be cancelled.
729 *
730 * @throws SecurityException {@inheritDoc}
731 */
732 public void shutdown() {
733 super.shutdown();
734 }
735
736 /**
737 * Attempts to stop all actively executing tasks, halts the
738 * processing of waiting tasks, and returns a list of the tasks
739 * that were awaiting execution.
740 *
741 * <p>This method does not wait for actively executing tasks to
742 * terminate. Use {@link #awaitTermination awaitTermination} to
743 * do that.
744 *
745 * <p>There are no guarantees beyond best-effort attempts to stop
746 * processing actively executing tasks. This implementation
747 * cancels tasks via {@link Thread#interrupt}, so any task that
748 * fails to respond to interrupts may never terminate.
749 *
750 * @return list of tasks that never commenced execution.
751 * Each element of this list is a {@link ScheduledFuture}.
752 * For tasks submitted via one of the {@code schedule}
753 * methods, the element will be identical to the returned
754 * {@code ScheduledFuture}. For tasks submitted using
755 * {@link #execute}, the element will be a zero-delay {@code
756 * ScheduledFuture}.
757 * @throws SecurityException {@inheritDoc}
758 */
759 public List<Runnable> shutdownNow() {
760 return super.shutdownNow();
761 }
762
763 /**
764 * Returns the task queue used by this executor.
765 * Each element of this list is a {@link ScheduledFuture}.
766 * For tasks submitted via one of the {@code schedule} methods, the
767 * element will be identical to the returned {@code ScheduledFuture}.
768 * For tasks submitted using {@link #execute}, the element will be a
769 * zero-delay {@code ScheduledFuture}.
770 *
771 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
772 * tasks in the order in which they will execute.
773 *
774 * @return the task queue
775 */
776 public BlockingQueue<Runnable> getQueue() {
777 return super.getQueue();
778 }
779
780 /**
781 * Specialized delay queue. To mesh with TPE declarations, this
782 * class must be declared as a BlockingQueue<Runnable> even though
783 * it can only hold RunnableScheduledFutures.
784 */
785 static class DelayedWorkQueue extends AbstractQueue<Runnable>
786 implements BlockingQueue<Runnable> {
787
788 /*
789 * A DelayedWorkQueue is based on a heap-based data structure
790 * like those in DelayQueue and PriorityQueue, except that
791 * every ScheduledFutureTask also records its index into the
792 * heap array. This eliminates the need to find a task upon
793 * cancellation, greatly speeding up removal (down from O(n)
794 * to O(log n)), and reducing garbage retention that would
795 * otherwise occur by waiting for the element to rise to top
796 * before clearing. But because the queue may also hold
797 * RunnableScheduledFutures that are not ScheduledFutureTasks,
798 * we are not guaranteed to have such indices available, in
799 * which case we fall back to linear search. (We expect that
800 * most tasks will not be decorated, and that the faster cases
801 * will be much more common.)
802 *
803 * All heap operations must record index changes -- mainly
804 * within siftUp and siftDown. Upon removal, a task's
805 * heapIndex is set to -1. Note that ScheduledFutureTasks can
806 * appear at most once in the queue (this need not be true for
807 * other kinds of tasks or work queues), so are uniquely
808 * identified by heapIndex.
809 */
810
811 private static final int INITIAL_CAPACITY = 16;
812 private RunnableScheduledFuture<?>[] queue =
813 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
814 private final ReentrantLock lock = new ReentrantLock();
815 private int size = 0;
816
817 /**
818 * Thread designated to wait for the task at the head of the
819 * queue. This variant of the Leader-Follower pattern
820 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
821 * minimize unnecessary timed waiting. When a thread becomes
822 * the leader, it waits only for the next delay to elapse, but
823 * other threads await indefinitely. The leader thread must
824 * signal some other thread before returning from take() or
825 * poll(...), unless some other thread becomes leader in the
826 * interim. Whenever the head of the queue is replaced with a
827 * task with an earlier expiration time, the leader field is
828 * invalidated by being reset to null, and some waiting
829 * thread, but not necessarily the current leader, is
830 * signalled. So waiting threads must be prepared to acquire
831 * and lose leadership while waiting.
832 */
833 private Thread leader = null;
834
835 /**
836 * Condition signalled when a newer task becomes available at the
837 * head of the queue or a new thread may need to become leader.
838 */
839 private final Condition available = lock.newCondition();
840
841 /**
842 * Sets f's heapIndex if it is a ScheduledFutureTask.
843 */
844 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
845 if (f instanceof ScheduledFutureTask)
846 ((ScheduledFutureTask)f).heapIndex = idx;
847 }
848
849 /**
850 * Sifts element added at bottom up to its heap-ordered spot.
851 * Call only when holding lock.
852 */
853 private void siftUp(int k, RunnableScheduledFuture<?> key) {
854 while (k > 0) {
855 int parent = (k - 1) >>> 1;
856 RunnableScheduledFuture<?> e = queue[parent];
857 if (key.compareTo(e) >= 0)
858 break;
859 queue[k] = e;
860 setIndex(e, k);
861 k = parent;
862 }
863 queue[k] = key;
864 setIndex(key, k);
865 }
866
867 /**
868 * Sifts element added at top down to its heap-ordered spot.
869 * Call only when holding lock.
870 */
871 private void siftDown(int k, RunnableScheduledFuture<?> key) {
872 int half = size >>> 1;
873 while (k < half) {
874 int child = (k << 1) + 1;
875 RunnableScheduledFuture<?> c = queue[child];
876 int right = child + 1;
877 if (right < size && c.compareTo(queue[right]) > 0)
878 c = queue[child = right];
879 if (key.compareTo(c) <= 0)
880 break;
881 queue[k] = c;
882 setIndex(c, k);
883 k = child;
884 }
885 queue[k] = key;
886 setIndex(key, k);
887 }
888
889 /**
890 * Resizes the heap array. Call only when holding lock.
891 */
892 private void grow() {
893 int oldCapacity = queue.length;
894 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
895 if (newCapacity < 0) // overflow
896 newCapacity = Integer.MAX_VALUE;
897 queue = Arrays.copyOf(queue, newCapacity);
898 }
899
900 /**
901 * Finds index of given object, or -1 if absent.
902 */
903 private int indexOf(Object x) {
904 if (x != null) {
905 if (x instanceof ScheduledFutureTask) {
906 int i = ((ScheduledFutureTask) x).heapIndex;
907 // Sanity check; x could conceivably be a
908 // ScheduledFutureTask from some other pool.
909 if (i >= 0 && i < size && queue[i] == x)
910 return i;
911 } else {
912 for (int i = 0; i < size; i++)
913 if (x.equals(queue[i]))
914 return i;
915 }
916 }
917 return -1;
918 }
919
920 public boolean contains(Object x) {
921 final ReentrantLock lock = this.lock;
922 lock.lock();
923 try {
924 return indexOf(x) != -1;
925 } finally {
926 lock.unlock();
927 }
928 }
929
930 public boolean remove(Object x) {
931 final ReentrantLock lock = this.lock;
932 lock.lock();
933 try {
934 int i = indexOf(x);
935 if (i < 0)
936 return false;
937
938 setIndex(queue[i], -1);
939 int s = --size;
940 RunnableScheduledFuture<?> replacement = queue[s];
941 queue[s] = null;
942 if (s != i) {
943 siftDown(i, replacement);
944 if (queue[i] == replacement)
945 siftUp(i, replacement);
946 }
947 return true;
948 } finally {
949 lock.unlock();
950 }
951 }
952
953 public int size() {
954 final ReentrantLock lock = this.lock;
955 lock.lock();
956 try {
957 return size;
958 } finally {
959 lock.unlock();
960 }
961 }
962
963 public boolean isEmpty() {
964 return size() == 0;
965 }
966
967 public int remainingCapacity() {
968 return Integer.MAX_VALUE;
969 }
970
971 public RunnableScheduledFuture<?> peek() {
972 final ReentrantLock lock = this.lock;
973 lock.lock();
974 try {
975 return queue[0];
976 } finally {
977 lock.unlock();
978 }
979 }
980
981 public boolean offer(Runnable x) {
982 if (x == null)
983 throw new NullPointerException();
984 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
985 final ReentrantLock lock = this.lock;
986 lock.lock();
987 try {
988 int i = size;
989 if (i >= queue.length)
990 grow();
991 size = i + 1;
992 if (i == 0) {
993 queue[0] = e;
994 setIndex(e, 0);
995 } else {
996 siftUp(i, e);
997 }
998 if (queue[0] == e) {
999 leader = null;
1000 available.signal();
1001 }
1002 } finally {
1003 lock.unlock();
1004 }
1005 return true;
1006 }
1007
1008 public void put(Runnable e) {
1009 offer(e);
1010 }
1011
1012 public boolean add(Runnable e) {
1013 return offer(e);
1014 }
1015
1016 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1017 return offer(e);
1018 }
1019
1020 /**
1021 * Performs common bookkeeping for poll and take: Replaces
1022 * first element with last and sifts it down. Call only when
1023 * holding lock.
1024 * @param f the task to remove and return
1025 */
1026 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1027 int s = --size;
1028 RunnableScheduledFuture<?> x = queue[s];
1029 queue[s] = null;
1030 if (s != 0)
1031 siftDown(0, x);
1032 setIndex(f, -1);
1033 return f;
1034 }
1035
1036 public RunnableScheduledFuture<?> poll() {
1037 final ReentrantLock lock = this.lock;
1038 lock.lock();
1039 try {
1040 RunnableScheduledFuture<?> first = queue[0];
1041 if (first == null || first.getDelay(NANOSECONDS) > 0)
1042 return null;
1043 else
1044 return finishPoll(first);
1045 } finally {
1046 lock.unlock();
1047 }
1048 }
1049
1050 public RunnableScheduledFuture<?> take() throws InterruptedException {
1051 final ReentrantLock lock = this.lock;
1052 lock.lockInterruptibly();
1053 try {
1054 for (;;) {
1055 RunnableScheduledFuture<?> first = queue[0];
1056 if (first == null)
1057 available.await();
1058 else {
1059 long delay = first.getDelay(NANOSECONDS);
1060 if (delay <= 0)
1061 return finishPoll(first);
1062 first = null; // don't retain ref while waiting
1063 if (leader != null)
1064 available.await();
1065 else {
1066 Thread thisThread = Thread.currentThread();
1067 leader = thisThread;
1068 try {
1069 available.awaitNanos(delay);
1070 } finally {
1071 if (leader == thisThread)
1072 leader = null;
1073 }
1074 }
1075 }
1076 }
1077 } finally {
1078 if (leader == null && queue[0] != null)
1079 available.signal();
1080 lock.unlock();
1081 }
1082 }
1083
1084 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1085 throws InterruptedException {
1086 long nanos = unit.toNanos(timeout);
1087 final ReentrantLock lock = this.lock;
1088 lock.lockInterruptibly();
1089 try {
1090 for (;;) {
1091 RunnableScheduledFuture<?> first = queue[0];
1092 if (first == null) {
1093 if (nanos <= 0)
1094 return null;
1095 else
1096 nanos = available.awaitNanos(nanos);
1097 } else {
1098 long delay = first.getDelay(NANOSECONDS);
1099 if (delay <= 0)
1100 return finishPoll(first);
1101 if (nanos <= 0)
1102 return null;
1103 first = null; // don't retain ref while waiting
1104 if (nanos < delay || leader != null)
1105 nanos = available.awaitNanos(nanos);
1106 else {
1107 Thread thisThread = Thread.currentThread();
1108 leader = thisThread;
1109 try {
1110 long timeLeft = available.awaitNanos(delay);
1111 nanos -= delay - timeLeft;
1112 } finally {
1113 if (leader == thisThread)
1114 leader = null;
1115 }
1116 }
1117 }
1118 }
1119 } finally {
1120 if (leader == null && queue[0] != null)
1121 available.signal();
1122 lock.unlock();
1123 }
1124 }
1125
1126 public void clear() {
1127 final ReentrantLock lock = this.lock;
1128 lock.lock();
1129 try {
1130 for (int i = 0; i < size; i++) {
1131 RunnableScheduledFuture<?> t = queue[i];
1132 if (t != null) {
1133 queue[i] = null;
1134 setIndex(t, -1);
1135 }
1136 }
1137 size = 0;
1138 } finally {
1139 lock.unlock();
1140 }
1141 }
1142
1143 /**
1144 * Returns first element only if it is expired.
1145 * Used only by drainTo. Call only when holding lock.
1146 */
1147 private RunnableScheduledFuture<?> peekExpired() {
1148 // assert lock.isHeldByCurrentThread();
1149 RunnableScheduledFuture<?> first = queue[0];
1150 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1151 null : first;
1152 }
1153
1154 public int drainTo(Collection<? super Runnable> c) {
1155 if (c == null)
1156 throw new NullPointerException();
1157 if (c == this)
1158 throw new IllegalArgumentException();
1159 final ReentrantLock lock = this.lock;
1160 lock.lock();
1161 try {
1162 RunnableScheduledFuture<?> first;
1163 int n = 0;
1164 while ((first = peekExpired()) != null) {
1165 c.add(first); // In this order, in case add() throws.
1166 finishPoll(first);
1167 ++n;
1168 }
1169 return n;
1170 } finally {
1171 lock.unlock();
1172 }
1173 }
1174
1175 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1176 if (c == null)
1177 throw new NullPointerException();
1178 if (c == this)
1179 throw new IllegalArgumentException();
1180 if (maxElements <= 0)
1181 return 0;
1182 final ReentrantLock lock = this.lock;
1183 lock.lock();
1184 try {
1185 RunnableScheduledFuture<?> first;
1186 int n = 0;
1187 while (n < maxElements && (first = peekExpired()) != null) {
1188 c.add(first); // In this order, in case add() throws.
1189 finishPoll(first);
1190 ++n;
1191 }
1192 return n;
1193 } finally {
1194 lock.unlock();
1195 }
1196 }
1197
1198 public Object[] toArray() {
1199 final ReentrantLock lock = this.lock;
1200 lock.lock();
1201 try {
1202 return Arrays.copyOf(queue, size, Object[].class);
1203 } finally {
1204 lock.unlock();
1205 }
1206 }
1207
1208 @SuppressWarnings("unchecked")
1209 public <T> T[] toArray(T[] a) {
1210 final ReentrantLock lock = this.lock;
1211 lock.lock();
1212 try {
1213 if (a.length < size)
1214 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1215 System.arraycopy(queue, 0, a, 0, size);
1216 if (a.length > size)
1217 a[size] = null;
1218 return a;
1219 } finally {
1220 lock.unlock();
1221 }
1222 }
1223
1224 public Iterator<Runnable> iterator() {
1225 return new Itr(Arrays.copyOf(queue, size));
1226 }
1227
1228 /**
1229 * Snapshot iterator that works off copy of underlying q array.
1230 */
1231 private class Itr implements Iterator<Runnable> {
1232 final RunnableScheduledFuture<?>[] array;
1233 int cursor = 0; // index of next element to return
1234 int lastRet = -1; // index of last element, or -1 if no such
1235
1236 Itr(RunnableScheduledFuture<?>[] array) {
1237 this.array = array;
1238 }
1239
1240 public boolean hasNext() {
1241 return cursor < array.length;
1242 }
1243
1244 public Runnable next() {
1245 if (cursor >= array.length)
1246 throw new NoSuchElementException();
1247 lastRet = cursor;
1248 return array[cursor++];
1249 }
1250
1251 public void remove() {
1252 if (lastRet < 0)
1253 throw new IllegalStateException();
1254 DelayedWorkQueue.this.remove(array[lastRet]);
1255 lastRet = -1;
1256 }
1257 }
1258 }
1259 }