ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.88
Committed: Fri Apr 24 15:05:36 2015 UTC (9 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.87: +1 -1 lines
Log Message:
Avoid possibility of stale getDelay

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 /**
23 * A {@link ThreadPoolExecutor} that can additionally schedule
24 * commands to run after a given delay, or to execute periodically.
25 * This class is preferable to {@link java.util.Timer} when multiple
26 * worker threads are needed, or when the additional flexibility or
27 * capabilities of {@link ThreadPoolExecutor} (which this class
28 * extends) are required.
29 *
30 * <p>Delayed tasks execute no sooner than they are enabled, but
31 * without any real-time guarantees about when, after they are
32 * enabled, they will commence. Tasks scheduled for exactly the same
33 * execution time are enabled in first-in-first-out (FIFO) order of
34 * submission.
35 *
36 * <p>When a submitted task is cancelled before it is run, execution
37 * is suppressed. By default, such a cancelled task is not
38 * automatically removed from the work queue until its delay elapses.
39 * While this enables further inspection and monitoring, it may also
40 * cause unbounded retention of cancelled tasks. To avoid this, use
41 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42 * removed from the work queue at time of cancellation.
43 *
44 * <p>Successive executions of a periodic task scheduled via
45 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 * do not overlap. While different executions may be performed by
48 * different threads, the effects of prior executions
49 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 * those of subsequent ones.
51 *
52 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 * of the inherited tuning methods are not useful for it. In
54 * particular, because it acts as a fixed-sized pool using
55 * {@code corePoolSize} threads and an unbounded queue, adjustments
56 * to {@code maximumPoolSize} have no useful effect. Additionally, it
57 * is almost never a good idea to set {@code corePoolSize} to zero or
58 * use {@code allowCoreThreadTimeOut} because this may leave the pool
59 * without threads to handle tasks once they become eligible to run.
60 *
61 * <p><b>Extension notes:</b> This class overrides the
62 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 * {@link AbstractExecutorService#submit(Runnable) submit}
64 * methods to generate internal {@link ScheduledFuture} objects to
65 * control per-task delays and scheduling. To preserve
66 * functionality, any further overrides of these methods in
67 * subclasses must invoke superclass versions, which effectively
68 * disables additional task customization. However, this class
69 * provides alternative protected extension method
70 * {@code decorateTask} (one version each for {@code Runnable} and
71 * {@code Callable}) that can be used to customize the concrete task
72 * types used to execute commands entered via {@code execute},
73 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74 * and {@code scheduleWithFixedDelay}. By default, a
75 * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 * {@link FutureTask}. However, this may be modified or replaced using
77 * subclasses of the form:
78 *
79 * <pre> {@code
80 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81 *
82 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 *
84 * protected <V> RunnableScheduledFuture<V> decorateTask(
85 * Runnable r, RunnableScheduledFuture<V> task) {
86 * return new CustomTask<V>(r, task);
87 * }
88 *
89 * protected <V> RunnableScheduledFuture<V> decorateTask(
90 * Callable<V> c, RunnableScheduledFuture<V> task) {
91 * return new CustomTask<V>(c, task);
92 * }
93 * // ... add constructors, etc.
94 * }}</pre>
95 *
96 * @since 1.5
97 * @author Doug Lea
98 */
99 public class ScheduledThreadPoolExecutor
100 extends ThreadPoolExecutor
101 implements ScheduledExecutorService {
102
103 /*
104 * This class specializes ThreadPoolExecutor implementation by
105 *
106 * 1. Using a custom task type, ScheduledFutureTask for
107 * tasks, even those that don't require scheduling (i.e.,
108 * those submitted using ExecutorService execute, not
109 * ScheduledExecutorService methods) which are treated as
110 * delayed tasks with a delay of zero.
111 *
112 * 2. Using a custom queue (DelayedWorkQueue), a variant of
113 * unbounded DelayQueue. The lack of capacity constraint and
114 * the fact that corePoolSize and maximumPoolSize are
115 * effectively identical simplifies some execution mechanics
116 * (see delayedExecute) compared to ThreadPoolExecutor.
117 *
118 * 3. Supporting optional run-after-shutdown parameters, which
119 * leads to overrides of shutdown methods to remove and cancel
120 * tasks that should NOT be run after shutdown, as well as
121 * different recheck logic when task (re)submission overlaps
122 * with a shutdown.
123 *
124 * 4. Task decoration methods to allow interception and
125 * instrumentation, which are needed because subclasses cannot
126 * otherwise override submit methods to get this effect. These
127 * don't have any impact on pool control logic though.
128 */
129
130 /**
131 * False if should cancel/suppress periodic tasks on shutdown.
132 */
133 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
134
135 /**
136 * False if should cancel non-periodic tasks on shutdown.
137 */
138 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
139
140 /**
141 * True if ScheduledFutureTask.cancel should remove from queue.
142 */
143 private volatile boolean removeOnCancel;
144
145 /**
146 * Sequence number to break scheduling ties, and in turn to
147 * guarantee FIFO order among tied entries.
148 */
149 private static final AtomicLong sequencer = new AtomicLong();
150
151 /**
152 * Returns current nanosecond time.
153 */
154 static final long now() {
155 return System.nanoTime();
156 }
157
158 private class ScheduledFutureTask<V>
159 extends FutureTask<V> implements RunnableScheduledFuture<V> {
160
161 /** Sequence number to break ties FIFO */
162 private final long sequenceNumber;
163
164 /** The time the task is enabled to execute in nanoTime units */
165 private volatile long time;
166
167 /**
168 * Period in nanoseconds for repeating tasks.
169 * A positive value indicates fixed-rate execution.
170 * A negative value indicates fixed-delay execution.
171 * A value of 0 indicates a non-repeating (one-shot) task.
172 */
173 private final long period;
174
175 /** The actual task to be re-enqueued by reExecutePeriodic */
176 RunnableScheduledFuture<V> outerTask = this;
177
178 /**
179 * Index into delay queue, to support faster cancellation.
180 */
181 int heapIndex;
182
183 /**
184 * Creates a one-shot action with given nanoTime-based trigger time.
185 */
186 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
187 super(r, result);
188 this.time = triggerTime;
189 this.period = 0;
190 this.sequenceNumber = sequencer.getAndIncrement();
191 }
192
193 /**
194 * Creates a periodic action with given nanoTime-based initial
195 * trigger time and period.
196 */
197 ScheduledFutureTask(Runnable r, V result, long triggerTime,
198 long period) {
199 super(r, result);
200 this.time = triggerTime;
201 this.period = period;
202 this.sequenceNumber = sequencer.getAndIncrement();
203 }
204
205 /**
206 * Creates a one-shot action with given nanoTime-based trigger time.
207 */
208 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
209 super(callable);
210 this.time = triggerTime;
211 this.period = 0;
212 this.sequenceNumber = sequencer.getAndIncrement();
213 }
214
215 public long getDelay(TimeUnit unit) {
216 return unit.convert(time - now(), NANOSECONDS);
217 }
218
219 public int compareTo(Delayed other) {
220 if (other == this) // compare zero if same object
221 return 0;
222 if (other instanceof ScheduledFutureTask) {
223 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
224 long diff = time - x.time;
225 if (diff < 0)
226 return -1;
227 else if (diff > 0)
228 return 1;
229 else if (sequenceNumber < x.sequenceNumber)
230 return -1;
231 else
232 return 1;
233 }
234 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
235 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
236 }
237
238 /**
239 * Returns {@code true} if this is a periodic (not a one-shot) action.
240 *
241 * @return {@code true} if periodic
242 */
243 public boolean isPeriodic() {
244 return period != 0;
245 }
246
247 /**
248 * Sets the next time to run for a periodic task.
249 */
250 private void setNextRunTime() {
251 long p = period;
252 if (p > 0)
253 time += p;
254 else
255 time = triggerTime(-p);
256 }
257
258 public boolean cancel(boolean mayInterruptIfRunning) {
259 boolean cancelled = super.cancel(mayInterruptIfRunning);
260 if (cancelled && removeOnCancel && heapIndex >= 0)
261 remove(this);
262 return cancelled;
263 }
264
265 /**
266 * Overrides FutureTask version so as to reset/requeue if periodic.
267 */
268 public void run() {
269 boolean periodic = isPeriodic();
270 if (!canRunInCurrentRunState(periodic))
271 cancel(false);
272 else if (!periodic)
273 ScheduledFutureTask.super.run();
274 else if (ScheduledFutureTask.super.runAndReset()) {
275 setNextRunTime();
276 reExecutePeriodic(outerTask);
277 }
278 }
279 }
280
281 /**
282 * Returns true if can run a task given current run state
283 * and run-after-shutdown parameters.
284 *
285 * @param periodic true if this task periodic, false if delayed
286 */
287 boolean canRunInCurrentRunState(boolean periodic) {
288 return isRunningOrShutdown(periodic ?
289 continueExistingPeriodicTasksAfterShutdown :
290 executeExistingDelayedTasksAfterShutdown);
291 }
292
293 /**
294 * Main execution method for delayed or periodic tasks. If pool
295 * is shut down, rejects the task. Otherwise adds task to queue
296 * and starts a thread, if necessary, to run it. (We cannot
297 * prestart the thread to run the task because the task (probably)
298 * shouldn't be run yet.) If the pool is shut down while the task
299 * is being added, cancel and remove it if required by state and
300 * run-after-shutdown parameters.
301 *
302 * @param task the task
303 */
304 private void delayedExecute(RunnableScheduledFuture<?> task) {
305 if (isShutdown())
306 reject(task);
307 else {
308 super.getQueue().add(task);
309 if (isShutdown() &&
310 !canRunInCurrentRunState(task.isPeriodic()) &&
311 remove(task))
312 task.cancel(false);
313 else
314 ensurePrestart();
315 }
316 }
317
318 /**
319 * Requeues a periodic task unless current run state precludes it.
320 * Same idea as delayedExecute except drops task rather than rejecting.
321 *
322 * @param task the task
323 */
324 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
325 if (canRunInCurrentRunState(true)) {
326 super.getQueue().add(task);
327 if (!canRunInCurrentRunState(true) && remove(task))
328 task.cancel(false);
329 else
330 ensurePrestart();
331 }
332 }
333
334 /**
335 * Cancels and clears the queue of all tasks that should not be run
336 * due to shutdown policy. Invoked within super.shutdown.
337 */
338 @Override void onShutdown() {
339 BlockingQueue<Runnable> q = super.getQueue();
340 boolean keepDelayed =
341 getExecuteExistingDelayedTasksAfterShutdownPolicy();
342 boolean keepPeriodic =
343 getContinueExistingPeriodicTasksAfterShutdownPolicy();
344 if (!keepDelayed && !keepPeriodic) {
345 for (Object e : q.toArray())
346 if (e instanceof RunnableScheduledFuture<?>)
347 ((RunnableScheduledFuture<?>) e).cancel(false);
348 q.clear();
349 }
350 else {
351 // Traverse snapshot to avoid iterator exceptions
352 for (Object e : q.toArray()) {
353 if (e instanceof RunnableScheduledFuture) {
354 RunnableScheduledFuture<?> t =
355 (RunnableScheduledFuture<?>)e;
356 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
357 t.isCancelled()) { // also remove if already cancelled
358 if (q.remove(t))
359 t.cancel(false);
360 }
361 }
362 }
363 }
364 tryTerminate();
365 }
366
367 /**
368 * Modifies or replaces the task used to execute a runnable.
369 * This method can be used to override the concrete
370 * class used for managing internal tasks.
371 * The default implementation simply returns the given task.
372 *
373 * @param runnable the submitted Runnable
374 * @param task the task created to execute the runnable
375 * @param <V> the type of the task's result
376 * @return a task that can execute the runnable
377 * @since 1.6
378 */
379 protected <V> RunnableScheduledFuture<V> decorateTask(
380 Runnable runnable, RunnableScheduledFuture<V> task) {
381 return task;
382 }
383
384 /**
385 * Modifies or replaces the task used to execute a callable.
386 * This method can be used to override the concrete
387 * class used for managing internal tasks.
388 * The default implementation simply returns the given task.
389 *
390 * @param callable the submitted Callable
391 * @param task the task created to execute the callable
392 * @param <V> the type of the task's result
393 * @return a task that can execute the callable
394 * @since 1.6
395 */
396 protected <V> RunnableScheduledFuture<V> decorateTask(
397 Callable<V> callable, RunnableScheduledFuture<V> task) {
398 return task;
399 }
400
401 /**
402 * The default keep-alive time for pool threads.
403 *
404 * Normally, this value is unused because all pool threads will be
405 * core threads, but if a user creates a pool with a corePoolSize
406 * of zero (against our advice), we keep a thread alive as long as
407 * there are queued tasks. If the keep alive time is zero (the
408 * historic value), we end up hot-spinning in getTask, wasting a
409 * CPU. But on the other hand, if we set the value too high, and
410 * users create a one-shot pool which they don't cleanly shutdown,
411 * the pool's non-daemon threads will prevent JVM termination. A
412 * small but non-zero value (relative to a JVM's lifetime) seems
413 * best.
414 */
415 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
416
417 /**
418 * Creates a new {@code ScheduledThreadPoolExecutor} with the
419 * given core pool size.
420 *
421 * @param corePoolSize the number of threads to keep in the pool, even
422 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
423 * @throws IllegalArgumentException if {@code corePoolSize < 0}
424 */
425 public ScheduledThreadPoolExecutor(int corePoolSize) {
426 super(corePoolSize, Integer.MAX_VALUE,
427 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
428 new DelayedWorkQueue());
429 }
430
431 /**
432 * Creates a new {@code ScheduledThreadPoolExecutor} with the
433 * given initial parameters.
434 *
435 * @param corePoolSize the number of threads to keep in the pool, even
436 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
437 * @param threadFactory the factory to use when the executor
438 * creates a new thread
439 * @throws IllegalArgumentException if {@code corePoolSize < 0}
440 * @throws NullPointerException if {@code threadFactory} is null
441 */
442 public ScheduledThreadPoolExecutor(int corePoolSize,
443 ThreadFactory threadFactory) {
444 super(corePoolSize, Integer.MAX_VALUE,
445 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
446 new DelayedWorkQueue(), threadFactory);
447 }
448
449 /**
450 * Creates a new {@code ScheduledThreadPoolExecutor} with the
451 * given initial parameters.
452 *
453 * @param corePoolSize the number of threads to keep in the pool, even
454 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
455 * @param handler the handler to use when execution is blocked
456 * because the thread bounds and queue capacities are reached
457 * @throws IllegalArgumentException if {@code corePoolSize < 0}
458 * @throws NullPointerException if {@code handler} is null
459 */
460 public ScheduledThreadPoolExecutor(int corePoolSize,
461 RejectedExecutionHandler handler) {
462 super(corePoolSize, Integer.MAX_VALUE,
463 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
464 new DelayedWorkQueue(), handler);
465 }
466
467 /**
468 * Creates a new {@code ScheduledThreadPoolExecutor} with the
469 * given initial parameters.
470 *
471 * @param corePoolSize the number of threads to keep in the pool, even
472 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
473 * @param threadFactory the factory to use when the executor
474 * creates a new thread
475 * @param handler the handler to use when execution is blocked
476 * because the thread bounds and queue capacities are reached
477 * @throws IllegalArgumentException if {@code corePoolSize < 0}
478 * @throws NullPointerException if {@code threadFactory} or
479 * {@code handler} is null
480 */
481 public ScheduledThreadPoolExecutor(int corePoolSize,
482 ThreadFactory threadFactory,
483 RejectedExecutionHandler handler) {
484 super(corePoolSize, Integer.MAX_VALUE,
485 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
486 new DelayedWorkQueue(), threadFactory, handler);
487 }
488
489 /**
490 * Returns the nanoTime-based trigger time of a delayed action.
491 */
492 private long triggerTime(long delay, TimeUnit unit) {
493 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
494 }
495
496 /**
497 * Returns the nanoTime-based trigger time of a delayed action.
498 */
499 long triggerTime(long delay) {
500 return now() +
501 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
502 }
503
504 /**
505 * Constrains the values of all delays in the queue to be within
506 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
507 * This may occur if a task is eligible to be dequeued, but has
508 * not yet been, while some other task is added with a delay of
509 * Long.MAX_VALUE.
510 */
511 private long overflowFree(long delay) {
512 Delayed head = (Delayed) super.getQueue().peek();
513 if (head != null) {
514 long headDelay = head.getDelay(NANOSECONDS);
515 if (headDelay < 0 && (delay - headDelay < 0))
516 delay = Long.MAX_VALUE + headDelay;
517 }
518 return delay;
519 }
520
521 /**
522 * @throws RejectedExecutionException {@inheritDoc}
523 * @throws NullPointerException {@inheritDoc}
524 */
525 public ScheduledFuture<?> schedule(Runnable command,
526 long delay,
527 TimeUnit unit) {
528 if (command == null || unit == null)
529 throw new NullPointerException();
530 RunnableScheduledFuture<Void> t = decorateTask(command,
531 new ScheduledFutureTask<Void>(command, null,
532 triggerTime(delay, unit)));
533 delayedExecute(t);
534 return t;
535 }
536
537 /**
538 * @throws RejectedExecutionException {@inheritDoc}
539 * @throws NullPointerException {@inheritDoc}
540 */
541 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
542 long delay,
543 TimeUnit unit) {
544 if (callable == null || unit == null)
545 throw new NullPointerException();
546 RunnableScheduledFuture<V> t = decorateTask(callable,
547 new ScheduledFutureTask<V>(callable,
548 triggerTime(delay, unit)));
549 delayedExecute(t);
550 return t;
551 }
552
553 /**
554 * @throws RejectedExecutionException {@inheritDoc}
555 * @throws NullPointerException {@inheritDoc}
556 * @throws IllegalArgumentException {@inheritDoc}
557 */
558 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
559 long initialDelay,
560 long period,
561 TimeUnit unit) {
562 if (command == null || unit == null)
563 throw new NullPointerException();
564 if (period <= 0)
565 throw new IllegalArgumentException();
566 ScheduledFutureTask<Void> sft =
567 new ScheduledFutureTask<Void>(command,
568 null,
569 triggerTime(initialDelay, unit),
570 unit.toNanos(period));
571 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
572 sft.outerTask = t;
573 delayedExecute(t);
574 return t;
575 }
576
577 /**
578 * @throws RejectedExecutionException {@inheritDoc}
579 * @throws NullPointerException {@inheritDoc}
580 * @throws IllegalArgumentException {@inheritDoc}
581 */
582 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
583 long initialDelay,
584 long delay,
585 TimeUnit unit) {
586 if (command == null || unit == null)
587 throw new NullPointerException();
588 if (delay <= 0)
589 throw new IllegalArgumentException();
590 ScheduledFutureTask<Void> sft =
591 new ScheduledFutureTask<Void>(command,
592 null,
593 triggerTime(initialDelay, unit),
594 unit.toNanos(-delay));
595 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
596 sft.outerTask = t;
597 delayedExecute(t);
598 return t;
599 }
600
601 /**
602 * Executes {@code command} with zero required delay.
603 * This has effect equivalent to
604 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
605 * Note that inspections of the queue and of the list returned by
606 * {@code shutdownNow} will access the zero-delayed
607 * {@link ScheduledFuture}, not the {@code command} itself.
608 *
609 * <p>A consequence of the use of {@code ScheduledFuture} objects is
610 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
611 * called with a null second {@code Throwable} argument, even if the
612 * {@code command} terminated abruptly. Instead, the {@code Throwable}
613 * thrown by such a task can be obtained via {@link Future#get}.
614 *
615 * @throws RejectedExecutionException at discretion of
616 * {@code RejectedExecutionHandler}, if the task
617 * cannot be accepted for execution because the
618 * executor has been shut down
619 * @throws NullPointerException {@inheritDoc}
620 */
621 public void execute(Runnable command) {
622 schedule(command, 0, NANOSECONDS);
623 }
624
625 // Override AbstractExecutorService methods
626
627 /**
628 * @throws RejectedExecutionException {@inheritDoc}
629 * @throws NullPointerException {@inheritDoc}
630 */
631 public Future<?> submit(Runnable task) {
632 return schedule(task, 0, NANOSECONDS);
633 }
634
635 /**
636 * @throws RejectedExecutionException {@inheritDoc}
637 * @throws NullPointerException {@inheritDoc}
638 */
639 public <T> Future<T> submit(Runnable task, T result) {
640 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
641 }
642
643 /**
644 * @throws RejectedExecutionException {@inheritDoc}
645 * @throws NullPointerException {@inheritDoc}
646 */
647 public <T> Future<T> submit(Callable<T> task) {
648 return schedule(task, 0, NANOSECONDS);
649 }
650
651 /**
652 * Sets the policy on whether to continue executing existing
653 * periodic tasks even when this executor has been {@code shutdown}.
654 * In this case, these tasks will only terminate upon
655 * {@code shutdownNow} or after setting the policy to
656 * {@code false} when already shutdown.
657 * This value is by default {@code false}.
658 *
659 * @param value if {@code true}, continue after shutdown, else don't
660 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
661 */
662 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
663 continueExistingPeriodicTasksAfterShutdown = value;
664 if (!value && isShutdown())
665 onShutdown();
666 }
667
668 /**
669 * Gets the policy on whether to continue executing existing
670 * periodic tasks even when this executor has been {@code shutdown}.
671 * In this case, these tasks will only terminate upon
672 * {@code shutdownNow} or after setting the policy to
673 * {@code false} when already shutdown.
674 * This value is by default {@code false}.
675 *
676 * @return {@code true} if will continue after shutdown
677 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
678 */
679 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
680 return continueExistingPeriodicTasksAfterShutdown;
681 }
682
683 /**
684 * Sets the policy on whether to execute existing delayed
685 * tasks even when this executor has been {@code shutdown}.
686 * In this case, these tasks will only terminate upon
687 * {@code shutdownNow}, or after setting the policy to
688 * {@code false} when already shutdown.
689 * This value is by default {@code true}.
690 *
691 * @param value if {@code true}, execute after shutdown, else don't
692 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
693 */
694 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
695 executeExistingDelayedTasksAfterShutdown = value;
696 if (!value && isShutdown())
697 onShutdown();
698 }
699
700 /**
701 * Gets the policy on whether to execute existing delayed
702 * tasks even when this executor has been {@code shutdown}.
703 * In this case, these tasks will only terminate upon
704 * {@code shutdownNow}, or after setting the policy to
705 * {@code false} when already shutdown.
706 * This value is by default {@code true}.
707 *
708 * @return {@code true} if will execute after shutdown
709 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
710 */
711 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
712 return executeExistingDelayedTasksAfterShutdown;
713 }
714
715 /**
716 * Sets the policy on whether cancelled tasks should be immediately
717 * removed from the work queue at time of cancellation. This value is
718 * by default {@code false}.
719 *
720 * @param value if {@code true}, remove on cancellation, else don't
721 * @see #getRemoveOnCancelPolicy
722 * @since 1.7
723 */
724 public void setRemoveOnCancelPolicy(boolean value) {
725 removeOnCancel = value;
726 }
727
728 /**
729 * Gets the policy on whether cancelled tasks should be immediately
730 * removed from the work queue at time of cancellation. This value is
731 * by default {@code false}.
732 *
733 * @return {@code true} if cancelled tasks are immediately removed
734 * from the queue
735 * @see #setRemoveOnCancelPolicy
736 * @since 1.7
737 */
738 public boolean getRemoveOnCancelPolicy() {
739 return removeOnCancel;
740 }
741
742 /**
743 * Initiates an orderly shutdown in which previously submitted
744 * tasks are executed, but no new tasks will be accepted.
745 * Invocation has no additional effect if already shut down.
746 *
747 * <p>This method does not wait for previously submitted tasks to
748 * complete execution. Use {@link #awaitTermination awaitTermination}
749 * to do that.
750 *
751 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
752 * has been set {@code false}, existing delayed tasks whose delays
753 * have not yet elapsed are cancelled. And unless the {@code
754 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
755 * {@code true}, future executions of existing periodic tasks will
756 * be cancelled.
757 *
758 * @throws SecurityException {@inheritDoc}
759 */
760 public void shutdown() {
761 super.shutdown();
762 }
763
764 /**
765 * Attempts to stop all actively executing tasks, halts the
766 * processing of waiting tasks, and returns a list of the tasks
767 * that were awaiting execution.
768 *
769 * <p>This method does not wait for actively executing tasks to
770 * terminate. Use {@link #awaitTermination awaitTermination} to
771 * do that.
772 *
773 * <p>There are no guarantees beyond best-effort attempts to stop
774 * processing actively executing tasks. This implementation
775 * cancels tasks via {@link Thread#interrupt}, so any task that
776 * fails to respond to interrupts may never terminate.
777 *
778 * @return list of tasks that never commenced execution.
779 * Each element of this list is a {@link ScheduledFuture}.
780 * For tasks submitted via one of the {@code schedule}
781 * methods, the element will be identical to the returned
782 * {@code ScheduledFuture}. For tasks submitted using
783 * {@link #execute execute}, the element will be a
784 * zero-delay {@code ScheduledFuture}.
785 * @throws SecurityException {@inheritDoc}
786 */
787 public List<Runnable> shutdownNow() {
788 return super.shutdownNow();
789 }
790
791 /**
792 * Returns the task queue used by this executor.
793 * Each element of this list is a {@link ScheduledFuture}.
794 * For tasks submitted via one of the {@code schedule} methods, the
795 * element will be identical to the returned {@code ScheduledFuture}.
796 * For tasks submitted using {@link #execute execute}, the element
797 * will be a zero-delay {@code ScheduledFuture}.
798 *
799 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
800 * tasks in the order in which they will execute.
801 *
802 * @return the task queue
803 */
804 public BlockingQueue<Runnable> getQueue() {
805 return super.getQueue();
806 }
807
808 /**
809 * Specialized delay queue. To mesh with TPE declarations, this
810 * class must be declared as a BlockingQueue<Runnable> even though
811 * it can only hold RunnableScheduledFutures.
812 */
813 static class DelayedWorkQueue extends AbstractQueue<Runnable>
814 implements BlockingQueue<Runnable> {
815
816 /*
817 * A DelayedWorkQueue is based on a heap-based data structure
818 * like those in DelayQueue and PriorityQueue, except that
819 * every ScheduledFutureTask also records its index into the
820 * heap array. This eliminates the need to find a task upon
821 * cancellation, greatly speeding up removal (down from O(n)
822 * to O(log n)), and reducing garbage retention that would
823 * otherwise occur by waiting for the element to rise to top
824 * before clearing. But because the queue may also hold
825 * RunnableScheduledFutures that are not ScheduledFutureTasks,
826 * we are not guaranteed to have such indices available, in
827 * which case we fall back to linear search. (We expect that
828 * most tasks will not be decorated, and that the faster cases
829 * will be much more common.)
830 *
831 * All heap operations must record index changes -- mainly
832 * within siftUp and siftDown. Upon removal, a task's
833 * heapIndex is set to -1. Note that ScheduledFutureTasks can
834 * appear at most once in the queue (this need not be true for
835 * other kinds of tasks or work queues), so are uniquely
836 * identified by heapIndex.
837 */
838
839 private static final int INITIAL_CAPACITY = 16;
840 private RunnableScheduledFuture<?>[] queue =
841 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
842 private final ReentrantLock lock = new ReentrantLock();
843 private int size;
844
845 /**
846 * Thread designated to wait for the task at the head of the
847 * queue. This variant of the Leader-Follower pattern
848 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
849 * minimize unnecessary timed waiting. When a thread becomes
850 * the leader, it waits only for the next delay to elapse, but
851 * other threads await indefinitely. The leader thread must
852 * signal some other thread before returning from take() or
853 * poll(...), unless some other thread becomes leader in the
854 * interim. Whenever the head of the queue is replaced with a
855 * task with an earlier expiration time, the leader field is
856 * invalidated by being reset to null, and some waiting
857 * thread, but not necessarily the current leader, is
858 * signalled. So waiting threads must be prepared to acquire
859 * and lose leadership while waiting.
860 */
861 private Thread leader;
862
863 /**
864 * Condition signalled when a newer task becomes available at the
865 * head of the queue or a new thread may need to become leader.
866 */
867 private final Condition available = lock.newCondition();
868
869 /**
870 * Sets f's heapIndex if it is a ScheduledFutureTask.
871 */
872 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
873 if (f instanceof ScheduledFutureTask)
874 ((ScheduledFutureTask)f).heapIndex = idx;
875 }
876
877 /**
878 * Sifts element added at bottom up to its heap-ordered spot.
879 * Call only when holding lock.
880 */
881 private void siftUp(int k, RunnableScheduledFuture<?> key) {
882 while (k > 0) {
883 int parent = (k - 1) >>> 1;
884 RunnableScheduledFuture<?> e = queue[parent];
885 if (key.compareTo(e) >= 0)
886 break;
887 queue[k] = e;
888 setIndex(e, k);
889 k = parent;
890 }
891 queue[k] = key;
892 setIndex(key, k);
893 }
894
895 /**
896 * Sifts element added at top down to its heap-ordered spot.
897 * Call only when holding lock.
898 */
899 private void siftDown(int k, RunnableScheduledFuture<?> key) {
900 int half = size >>> 1;
901 while (k < half) {
902 int child = (k << 1) + 1;
903 RunnableScheduledFuture<?> c = queue[child];
904 int right = child + 1;
905 if (right < size && c.compareTo(queue[right]) > 0)
906 c = queue[child = right];
907 if (key.compareTo(c) <= 0)
908 break;
909 queue[k] = c;
910 setIndex(c, k);
911 k = child;
912 }
913 queue[k] = key;
914 setIndex(key, k);
915 }
916
917 /**
918 * Resizes the heap array. Call only when holding lock.
919 */
920 private void grow() {
921 int oldCapacity = queue.length;
922 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
923 if (newCapacity < 0) // overflow
924 newCapacity = Integer.MAX_VALUE;
925 queue = Arrays.copyOf(queue, newCapacity);
926 }
927
928 /**
929 * Finds index of given object, or -1 if absent.
930 */
931 private int indexOf(Object x) {
932 if (x != null) {
933 if (x instanceof ScheduledFutureTask) {
934 int i = ((ScheduledFutureTask) x).heapIndex;
935 // Sanity check; x could conceivably be a
936 // ScheduledFutureTask from some other pool.
937 if (i >= 0 && i < size && queue[i] == x)
938 return i;
939 } else {
940 for (int i = 0; i < size; i++)
941 if (x.equals(queue[i]))
942 return i;
943 }
944 }
945 return -1;
946 }
947
948 public boolean contains(Object x) {
949 final ReentrantLock lock = this.lock;
950 lock.lock();
951 try {
952 return indexOf(x) != -1;
953 } finally {
954 lock.unlock();
955 }
956 }
957
958 public boolean remove(Object x) {
959 final ReentrantLock lock = this.lock;
960 lock.lock();
961 try {
962 int i = indexOf(x);
963 if (i < 0)
964 return false;
965
966 setIndex(queue[i], -1);
967 int s = --size;
968 RunnableScheduledFuture<?> replacement = queue[s];
969 queue[s] = null;
970 if (s != i) {
971 siftDown(i, replacement);
972 if (queue[i] == replacement)
973 siftUp(i, replacement);
974 }
975 return true;
976 } finally {
977 lock.unlock();
978 }
979 }
980
981 public int size() {
982 final ReentrantLock lock = this.lock;
983 lock.lock();
984 try {
985 return size;
986 } finally {
987 lock.unlock();
988 }
989 }
990
991 public boolean isEmpty() {
992 return size() == 0;
993 }
994
995 public int remainingCapacity() {
996 return Integer.MAX_VALUE;
997 }
998
999 public RunnableScheduledFuture<?> peek() {
1000 final ReentrantLock lock = this.lock;
1001 lock.lock();
1002 try {
1003 return queue[0];
1004 } finally {
1005 lock.unlock();
1006 }
1007 }
1008
1009 public boolean offer(Runnable x) {
1010 if (x == null)
1011 throw new NullPointerException();
1012 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1013 final ReentrantLock lock = this.lock;
1014 lock.lock();
1015 try {
1016 int i = size;
1017 if (i >= queue.length)
1018 grow();
1019 size = i + 1;
1020 if (i == 0) {
1021 queue[0] = e;
1022 setIndex(e, 0);
1023 } else {
1024 siftUp(i, e);
1025 }
1026 if (queue[0] == e) {
1027 leader = null;
1028 available.signal();
1029 }
1030 } finally {
1031 lock.unlock();
1032 }
1033 return true;
1034 }
1035
1036 public void put(Runnable e) {
1037 offer(e);
1038 }
1039
1040 public boolean add(Runnable e) {
1041 return offer(e);
1042 }
1043
1044 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1045 return offer(e);
1046 }
1047
1048 /**
1049 * Performs common bookkeeping for poll and take: Replaces
1050 * first element with last and sifts it down. Call only when
1051 * holding lock.
1052 * @param f the task to remove and return
1053 */
1054 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1055 int s = --size;
1056 RunnableScheduledFuture<?> x = queue[s];
1057 queue[s] = null;
1058 if (s != 0)
1059 siftDown(0, x);
1060 setIndex(f, -1);
1061 return f;
1062 }
1063
1064 public RunnableScheduledFuture<?> poll() {
1065 final ReentrantLock lock = this.lock;
1066 lock.lock();
1067 try {
1068 RunnableScheduledFuture<?> first = queue[0];
1069 return (first == null || first.getDelay(NANOSECONDS) > 0)
1070 ? null
1071 : finishPoll(first);
1072 } finally {
1073 lock.unlock();
1074 }
1075 }
1076
1077 public RunnableScheduledFuture<?> take() throws InterruptedException {
1078 final ReentrantLock lock = this.lock;
1079 lock.lockInterruptibly();
1080 try {
1081 for (;;) {
1082 RunnableScheduledFuture<?> first = queue[0];
1083 if (first == null)
1084 available.await();
1085 else {
1086 long delay = first.getDelay(NANOSECONDS);
1087 if (delay <= 0)
1088 return finishPoll(first);
1089 first = null; // don't retain ref while waiting
1090 if (leader != null)
1091 available.await();
1092 else {
1093 Thread thisThread = Thread.currentThread();
1094 leader = thisThread;
1095 try {
1096 available.awaitNanos(delay);
1097 } finally {
1098 if (leader == thisThread)
1099 leader = null;
1100 }
1101 }
1102 }
1103 }
1104 } finally {
1105 if (leader == null && queue[0] != null)
1106 available.signal();
1107 lock.unlock();
1108 }
1109 }
1110
1111 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1112 throws InterruptedException {
1113 long nanos = unit.toNanos(timeout);
1114 final ReentrantLock lock = this.lock;
1115 lock.lockInterruptibly();
1116 try {
1117 for (;;) {
1118 RunnableScheduledFuture<?> first = queue[0];
1119 if (first == null) {
1120 if (nanos <= 0)
1121 return null;
1122 else
1123 nanos = available.awaitNanos(nanos);
1124 } else {
1125 long delay = first.getDelay(NANOSECONDS);
1126 if (delay <= 0)
1127 return finishPoll(first);
1128 if (nanos <= 0)
1129 return null;
1130 first = null; // don't retain ref while waiting
1131 if (nanos < delay || leader != null)
1132 nanos = available.awaitNanos(nanos);
1133 else {
1134 Thread thisThread = Thread.currentThread();
1135 leader = thisThread;
1136 try {
1137 long timeLeft = available.awaitNanos(delay);
1138 nanos -= delay - timeLeft;
1139 } finally {
1140 if (leader == thisThread)
1141 leader = null;
1142 }
1143 }
1144 }
1145 }
1146 } finally {
1147 if (leader == null && queue[0] != null)
1148 available.signal();
1149 lock.unlock();
1150 }
1151 }
1152
1153 public void clear() {
1154 final ReentrantLock lock = this.lock;
1155 lock.lock();
1156 try {
1157 for (int i = 0; i < size; i++) {
1158 RunnableScheduledFuture<?> t = queue[i];
1159 if (t != null) {
1160 queue[i] = null;
1161 setIndex(t, -1);
1162 }
1163 }
1164 size = 0;
1165 } finally {
1166 lock.unlock();
1167 }
1168 }
1169
1170 /**
1171 * Returns first element only if it is expired.
1172 * Used only by drainTo. Call only when holding lock.
1173 */
1174 private RunnableScheduledFuture<?> peekExpired() {
1175 // assert lock.isHeldByCurrentThread();
1176 RunnableScheduledFuture<?> first = queue[0];
1177 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1178 null : first;
1179 }
1180
1181 public int drainTo(Collection<? super Runnable> c) {
1182 if (c == null)
1183 throw new NullPointerException();
1184 if (c == this)
1185 throw new IllegalArgumentException();
1186 final ReentrantLock lock = this.lock;
1187 lock.lock();
1188 try {
1189 RunnableScheduledFuture<?> first;
1190 int n = 0;
1191 while ((first = peekExpired()) != null) {
1192 c.add(first); // In this order, in case add() throws.
1193 finishPoll(first);
1194 ++n;
1195 }
1196 return n;
1197 } finally {
1198 lock.unlock();
1199 }
1200 }
1201
1202 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1203 if (c == null)
1204 throw new NullPointerException();
1205 if (c == this)
1206 throw new IllegalArgumentException();
1207 if (maxElements <= 0)
1208 return 0;
1209 final ReentrantLock lock = this.lock;
1210 lock.lock();
1211 try {
1212 RunnableScheduledFuture<?> first;
1213 int n = 0;
1214 while (n < maxElements && (first = peekExpired()) != null) {
1215 c.add(first); // In this order, in case add() throws.
1216 finishPoll(first);
1217 ++n;
1218 }
1219 return n;
1220 } finally {
1221 lock.unlock();
1222 }
1223 }
1224
1225 public Object[] toArray() {
1226 final ReentrantLock lock = this.lock;
1227 lock.lock();
1228 try {
1229 return Arrays.copyOf(queue, size, Object[].class);
1230 } finally {
1231 lock.unlock();
1232 }
1233 }
1234
1235 @SuppressWarnings("unchecked")
1236 public <T> T[] toArray(T[] a) {
1237 final ReentrantLock lock = this.lock;
1238 lock.lock();
1239 try {
1240 if (a.length < size)
1241 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1242 System.arraycopy(queue, 0, a, 0, size);
1243 if (a.length > size)
1244 a[size] = null;
1245 return a;
1246 } finally {
1247 lock.unlock();
1248 }
1249 }
1250
1251 public Iterator<Runnable> iterator() {
1252 return new Itr(Arrays.copyOf(queue, size));
1253 }
1254
1255 /**
1256 * Snapshot iterator that works off copy of underlying q array.
1257 */
1258 private class Itr implements Iterator<Runnable> {
1259 final RunnableScheduledFuture<?>[] array;
1260 int cursor; // index of next element to return; initially 0
1261 int lastRet = -1; // index of last element returned; -1 if no such
1262
1263 Itr(RunnableScheduledFuture<?>[] array) {
1264 this.array = array;
1265 }
1266
1267 public boolean hasNext() {
1268 return cursor < array.length;
1269 }
1270
1271 public Runnable next() {
1272 if (cursor >= array.length)
1273 throw new NoSuchElementException();
1274 lastRet = cursor;
1275 return array[cursor++];
1276 }
1277
1278 public void remove() {
1279 if (lastRet < 0)
1280 throw new IllegalStateException();
1281 DelayedWorkQueue.this.remove(array[lastRet]);
1282 lastRet = -1;
1283 }
1284 }
1285 }
1286 }