ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.79
Committed: Wed Mar 26 22:39:02 2014 UTC (10 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.78: +4 -4 lines
Log Message:
more consistent constructor javadoc

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import java.util.concurrent.atomic.AtomicLong;
11 import java.util.concurrent.locks.Condition;
12 import java.util.concurrent.locks.ReentrantLock;
13 import java.util.*;
14
15 /**
16 * A {@link ThreadPoolExecutor} that can additionally schedule
17 * commands to run after a given delay, or to execute periodically.
18 * This class is preferable to {@link java.util.Timer} when multiple
19 * worker threads are needed, or when the additional flexibility or
20 * capabilities of {@link ThreadPoolExecutor} (which this class
21 * extends) are required.
22 *
23 * <p>Delayed tasks execute no sooner than they are enabled, but
24 * without any real-time guarantees about when, after they are
25 * enabled, they will commence. Tasks scheduled for exactly the same
26 * execution time are enabled in first-in-first-out (FIFO) order of
27 * submission.
28 *
29 * <p>When a submitted task is cancelled before it is run, execution
30 * is suppressed. By default, such a cancelled task is not
31 * automatically removed from the work queue until its delay elapses.
32 * While this enables further inspection and monitoring, it may also
33 * cause unbounded retention of cancelled tasks. To avoid this, use
34 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
35 * removed from the work queue at time of cancellation.
36 *
37 * <p>Successive executions of a periodic task scheduled via
38 * {@link #scheduleAtFixedRate} or
39 * {@link #scheduleWithFixedDelay} do not overlap. While different
40 * executions may be performed by different threads, the effects of
41 * prior executions <a
42 * href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
43 * those of subsequent ones.
44 *
45 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
46 * of the inherited tuning methods are not useful for it. In
47 * particular, because it acts as a fixed-sized pool using
48 * {@code corePoolSize} threads and an unbounded queue, adjustments
49 * to {@code maximumPoolSize} have no useful effect. Additionally, it
50 * is almost never a good idea to set {@code corePoolSize} to zero or
51 * use {@code allowCoreThreadTimeOut} because this may leave the pool
52 * without threads to handle tasks once they become eligible to run.
53 *
54 * <p><b>Extension notes:</b> This class overrides the
55 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
56 * {@link AbstractExecutorService#submit(Runnable) submit}
57 * methods to generate internal {@link ScheduledFuture} objects to
58 * control per-task delays and scheduling. To preserve
59 * functionality, any further overrides of these methods in
60 * subclasses must invoke superclass versions, which effectively
61 * disables additional task customization. However, this class
62 * provides alternative protected extension method
63 * {@code decorateTask} (one version each for {@code Runnable} and
64 * {@code Callable}) that can be used to customize the concrete task
65 * types used to execute commands entered via {@code execute},
66 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
67 * and {@code scheduleWithFixedDelay}. By default, a
68 * {@code ScheduledThreadPoolExecutor} uses a task type extending
69 * {@link FutureTask}. However, this may be modified or replaced using
70 * subclasses of the form:
71 *
72 * <pre> {@code
73 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
74 *
75 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
76 *
77 * protected <V> RunnableScheduledFuture<V> decorateTask(
78 * Runnable r, RunnableScheduledFuture<V> task) {
79 * return new CustomTask<V>(r, task);
80 * }
81 *
82 * protected <V> RunnableScheduledFuture<V> decorateTask(
83 * Callable<V> c, RunnableScheduledFuture<V> task) {
84 * return new CustomTask<V>(c, task);
85 * }
86 * // ... add constructors, etc.
87 * }}</pre>
88 *
89 * @since 1.5
90 * @author Doug Lea
91 */
92 public class ScheduledThreadPoolExecutor
93 extends ThreadPoolExecutor
94 implements ScheduledExecutorService {
95
96 /*
97 * This class specializes ThreadPoolExecutor implementation by
98 *
99 * 1. Using a custom task type, ScheduledFutureTask for
100 * tasks, even those that don't require scheduling (i.e.,
101 * those submitted using ExecutorService execute, not
102 * ScheduledExecutorService methods) which are treated as
103 * delayed tasks with a delay of zero.
104 *
105 * 2. Using a custom queue (DelayedWorkQueue), a variant of
106 * unbounded DelayQueue. The lack of capacity constraint and
107 * the fact that corePoolSize and maximumPoolSize are
108 * effectively identical simplifies some execution mechanics
109 * (see delayedExecute) compared to ThreadPoolExecutor.
110 *
111 * 3. Supporting optional run-after-shutdown parameters, which
112 * leads to overrides of shutdown methods to remove and cancel
113 * tasks that should NOT be run after shutdown, as well as
114 * different recheck logic when task (re)submission overlaps
115 * with a shutdown.
116 *
117 * 4. Task decoration methods to allow interception and
118 * instrumentation, which are needed because subclasses cannot
119 * otherwise override submit methods to get this effect. These
120 * don't have any impact on pool control logic though.
121 */
122
123 /**
124 * False if should cancel/suppress periodic tasks on shutdown.
125 */
126 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
127
128 /**
129 * False if should cancel non-periodic tasks on shutdown.
130 */
131 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
132
133 /**
134 * True if ScheduledFutureTask.cancel should remove from queue.
135 */
136 private volatile boolean removeOnCancel = false;
137
138 /**
139 * Sequence number to break scheduling ties, and in turn to
140 * guarantee FIFO order among tied entries.
141 */
142 private static final AtomicLong sequencer = new AtomicLong();
143
144 /**
145 * Returns current nanosecond time.
146 */
147 final long now() {
148 return System.nanoTime();
149 }
150
151 private class ScheduledFutureTask<V>
152 extends FutureTask<V> implements RunnableScheduledFuture<V> {
153
154 /** Sequence number to break ties FIFO */
155 private final long sequenceNumber;
156
157 /** The time the task is enabled to execute in nanoTime units */
158 private long time;
159
160 /**
161 * Period in nanoseconds for repeating tasks.
162 * A positive value indicates fixed-rate execution.
163 * A negative value indicates fixed-delay execution.
164 * A value of 0 indicates a non-repeating (one-shot) task.
165 */
166 private final long period;
167
168 /** The actual task to be re-enqueued by reExecutePeriodic */
169 RunnableScheduledFuture<V> outerTask = this;
170
171 /**
172 * Index into delay queue, to support faster cancellation.
173 */
174 int heapIndex;
175
176 /**
177 * Creates a one-shot action with given nanoTime-based trigger time.
178 */
179 ScheduledFutureTask(Runnable r, V result, long triggerTime) {
180 super(r, result);
181 this.time = triggerTime;
182 this.period = 0;
183 this.sequenceNumber = sequencer.getAndIncrement();
184 }
185
186 /**
187 * Creates a periodic action with given nanoTime-based initial
188 * trigger time and period.
189 */
190 ScheduledFutureTask(Runnable r, V result, long triggerTime,
191 long period) {
192 super(r, result);
193 this.time = triggerTime;
194 this.period = period;
195 this.sequenceNumber = sequencer.getAndIncrement();
196 }
197
198 /**
199 * Creates a one-shot action with given nanoTime-based trigger time.
200 */
201 ScheduledFutureTask(Callable<V> callable, long triggerTime) {
202 super(callable);
203 this.time = triggerTime;
204 this.period = 0;
205 this.sequenceNumber = sequencer.getAndIncrement();
206 }
207
208 public long getDelay(TimeUnit unit) {
209 return unit.convert(time - now(), NANOSECONDS);
210 }
211
212 public int compareTo(Delayed other) {
213 if (other == this) // compare zero if same object
214 return 0;
215 if (other instanceof ScheduledFutureTask) {
216 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
217 long diff = time - x.time;
218 if (diff < 0)
219 return -1;
220 else if (diff > 0)
221 return 1;
222 else if (sequenceNumber < x.sequenceNumber)
223 return -1;
224 else
225 return 1;
226 }
227 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
228 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
229 }
230
231 /**
232 * Returns {@code true} if this is a periodic (not a one-shot) action.
233 *
234 * @return {@code true} if periodic
235 */
236 public boolean isPeriodic() {
237 return period != 0;
238 }
239
240 /**
241 * Sets the next time to run for a periodic task.
242 */
243 private void setNextRunTime() {
244 long p = period;
245 if (p > 0)
246 time += p;
247 else
248 time = triggerTime(-p);
249 }
250
251 public boolean cancel(boolean mayInterruptIfRunning) {
252 boolean cancelled = super.cancel(mayInterruptIfRunning);
253 if (cancelled && removeOnCancel && heapIndex >= 0)
254 remove(this);
255 return cancelled;
256 }
257
258 /**
259 * Overrides FutureTask version so as to reset/requeue if periodic.
260 */
261 public void run() {
262 boolean periodic = isPeriodic();
263 if (!canRunInCurrentRunState(periodic))
264 cancel(false);
265 else if (!periodic)
266 ScheduledFutureTask.super.run();
267 else if (ScheduledFutureTask.super.runAndReset()) {
268 setNextRunTime();
269 reExecutePeriodic(outerTask);
270 }
271 }
272 }
273
274 /**
275 * Returns true if can run a task given current run state
276 * and run-after-shutdown parameters.
277 *
278 * @param periodic true if this task periodic, false if delayed
279 */
280 boolean canRunInCurrentRunState(boolean periodic) {
281 return isRunningOrShutdown(periodic ?
282 continueExistingPeriodicTasksAfterShutdown :
283 executeExistingDelayedTasksAfterShutdown);
284 }
285
286 /**
287 * Main execution method for delayed or periodic tasks. If pool
288 * is shut down, rejects the task. Otherwise adds task to queue
289 * and starts a thread, if necessary, to run it. (We cannot
290 * prestart the thread to run the task because the task (probably)
291 * shouldn't be run yet.) If the pool is shut down while the task
292 * is being added, cancel and remove it if required by state and
293 * run-after-shutdown parameters.
294 *
295 * @param task the task
296 */
297 private void delayedExecute(RunnableScheduledFuture<?> task) {
298 if (isShutdown())
299 reject(task);
300 else {
301 super.getQueue().add(task);
302 if (isShutdown() &&
303 !canRunInCurrentRunState(task.isPeriodic()) &&
304 remove(task))
305 task.cancel(false);
306 else
307 ensurePrestart();
308 }
309 }
310
311 /**
312 * Requeues a periodic task unless current run state precludes it.
313 * Same idea as delayedExecute except drops task rather than rejecting.
314 *
315 * @param task the task
316 */
317 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
318 if (canRunInCurrentRunState(true)) {
319 super.getQueue().add(task);
320 if (!canRunInCurrentRunState(true) && remove(task))
321 task.cancel(false);
322 else
323 ensurePrestart();
324 }
325 }
326
327 /**
328 * Cancels and clears the queue of all tasks that should not be run
329 * due to shutdown policy. Invoked within super.shutdown.
330 */
331 @Override void onShutdown() {
332 BlockingQueue<Runnable> q = super.getQueue();
333 boolean keepDelayed =
334 getExecuteExistingDelayedTasksAfterShutdownPolicy();
335 boolean keepPeriodic =
336 getContinueExistingPeriodicTasksAfterShutdownPolicy();
337 if (!keepDelayed && !keepPeriodic) {
338 for (Object e : q.toArray())
339 if (e instanceof RunnableScheduledFuture<?>)
340 ((RunnableScheduledFuture<?>) e).cancel(false);
341 q.clear();
342 }
343 else {
344 // Traverse snapshot to avoid iterator exceptions
345 for (Object e : q.toArray()) {
346 if (e instanceof RunnableScheduledFuture) {
347 RunnableScheduledFuture<?> t =
348 (RunnableScheduledFuture<?>)e;
349 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
350 t.isCancelled()) { // also remove if already cancelled
351 if (q.remove(t))
352 t.cancel(false);
353 }
354 }
355 }
356 }
357 tryTerminate();
358 }
359
360 /**
361 * Modifies or replaces the task used to execute a runnable.
362 * This method can be used to override the concrete
363 * class used for managing internal tasks.
364 * The default implementation simply returns the given task.
365 *
366 * @param runnable the submitted Runnable
367 * @param task the task created to execute the runnable
368 * @param <V> the type of the task's result
369 * @return a task that can execute the runnable
370 * @since 1.6
371 */
372 protected <V> RunnableScheduledFuture<V> decorateTask(
373 Runnable runnable, RunnableScheduledFuture<V> task) {
374 return task;
375 }
376
377 /**
378 * Modifies or replaces the task used to execute a callable.
379 * This method can be used to override the concrete
380 * class used for managing internal tasks.
381 * The default implementation simply returns the given task.
382 *
383 * @param callable the submitted Callable
384 * @param task the task created to execute the callable
385 * @param <V> the type of the task's result
386 * @return a task that can execute the callable
387 * @since 1.6
388 */
389 protected <V> RunnableScheduledFuture<V> decorateTask(
390 Callable<V> callable, RunnableScheduledFuture<V> task) {
391 return task;
392 }
393
394 /**
395 * The default keep-alive time for pool threads.
396 *
397 * Normally, this value is unused because all pool threads will be
398 * core threads, but if a user creates a pool with a corePoolSize
399 * of zero (against our advice), we keep a thread alive as long as
400 * there are queued tasks. If the keep alive time is zero (the
401 * historic value), we end up hot-spinning in getTask, wasting a
402 * CPU. But on the other hand, if we set the value too high, and
403 * users create a one-shot pool which they don't cleanly shutdown,
404 * the pool's non-daemon threads will prevent JVM termination. A
405 * small but non-zero value (relative to a JVM's lifetime) seems
406 * best.
407 */
408 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
409
410 /**
411 * Creates a new {@code ScheduledThreadPoolExecutor} with the
412 * given core pool size.
413 *
414 * @param corePoolSize the number of threads to keep in the pool, even
415 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
416 * @throws IllegalArgumentException if {@code corePoolSize < 0}
417 */
418 public ScheduledThreadPoolExecutor(int corePoolSize) {
419 super(corePoolSize, Integer.MAX_VALUE,
420 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
421 new DelayedWorkQueue());
422 }
423
424 /**
425 * Creates a new {@code ScheduledThreadPoolExecutor} with the
426 * given initial parameters.
427 *
428 * @param corePoolSize the number of threads to keep in the pool, even
429 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
430 * @param threadFactory the factory to use when the executor
431 * creates a new thread
432 * @throws IllegalArgumentException if {@code corePoolSize < 0}
433 * @throws NullPointerException if {@code threadFactory} is null
434 */
435 public ScheduledThreadPoolExecutor(int corePoolSize,
436 ThreadFactory threadFactory) {
437 super(corePoolSize, Integer.MAX_VALUE,
438 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
439 new DelayedWorkQueue(), threadFactory);
440 }
441
442 /**
443 * Creates a new {@code ScheduledThreadPoolExecutor} with the
444 * given initial parameters.
445 *
446 * @param corePoolSize the number of threads to keep in the pool, even
447 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
448 * @param handler the handler to use when execution is blocked
449 * because the thread bounds and queue capacities are reached
450 * @throws IllegalArgumentException if {@code corePoolSize < 0}
451 * @throws NullPointerException if {@code handler} is null
452 */
453 public ScheduledThreadPoolExecutor(int corePoolSize,
454 RejectedExecutionHandler handler) {
455 super(corePoolSize, Integer.MAX_VALUE,
456 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
457 new DelayedWorkQueue(), handler);
458 }
459
460 /**
461 * Creates a new {@code ScheduledThreadPoolExecutor} with the
462 * given initial parameters.
463 *
464 * @param corePoolSize the number of threads to keep in the pool, even
465 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
466 * @param threadFactory the factory to use when the executor
467 * creates a new thread
468 * @param handler the handler to use when execution is blocked
469 * because the thread bounds and queue capacities are reached
470 * @throws IllegalArgumentException if {@code corePoolSize < 0}
471 * @throws NullPointerException if {@code threadFactory} or
472 * {@code handler} is null
473 */
474 public ScheduledThreadPoolExecutor(int corePoolSize,
475 ThreadFactory threadFactory,
476 RejectedExecutionHandler handler) {
477 super(corePoolSize, Integer.MAX_VALUE,
478 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
479 new DelayedWorkQueue(), threadFactory, handler);
480 }
481
482 /**
483 * Returns the nanoTime-based trigger time of a delayed action.
484 */
485 private long triggerTime(long delay, TimeUnit unit) {
486 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
487 }
488
489 /**
490 * Returns the nanoTime-based trigger time of a delayed action.
491 */
492 long triggerTime(long delay) {
493 return now() +
494 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
495 }
496
497 /**
498 * Constrains the values of all delays in the queue to be within
499 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
500 * This may occur if a task is eligible to be dequeued, but has
501 * not yet been, while some other task is added with a delay of
502 * Long.MAX_VALUE.
503 */
504 private long overflowFree(long delay) {
505 Delayed head = (Delayed) super.getQueue().peek();
506 if (head != null) {
507 long headDelay = head.getDelay(NANOSECONDS);
508 if (headDelay < 0 && (delay - headDelay < 0))
509 delay = Long.MAX_VALUE + headDelay;
510 }
511 return delay;
512 }
513
514 /**
515 * @throws RejectedExecutionException {@inheritDoc}
516 * @throws NullPointerException {@inheritDoc}
517 */
518 public ScheduledFuture<?> schedule(Runnable command,
519 long delay,
520 TimeUnit unit) {
521 if (command == null || unit == null)
522 throw new NullPointerException();
523 RunnableScheduledFuture<Void> t = decorateTask(command,
524 new ScheduledFutureTask<Void>(command, null,
525 triggerTime(delay, unit)));
526 delayedExecute(t);
527 return t;
528 }
529
530 /**
531 * @throws RejectedExecutionException {@inheritDoc}
532 * @throws NullPointerException {@inheritDoc}
533 */
534 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
535 long delay,
536 TimeUnit unit) {
537 if (callable == null || unit == null)
538 throw new NullPointerException();
539 RunnableScheduledFuture<V> t = decorateTask(callable,
540 new ScheduledFutureTask<V>(callable,
541 triggerTime(delay, unit)));
542 delayedExecute(t);
543 return t;
544 }
545
546 /**
547 * @throws RejectedExecutionException {@inheritDoc}
548 * @throws NullPointerException {@inheritDoc}
549 * @throws IllegalArgumentException {@inheritDoc}
550 */
551 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
552 long initialDelay,
553 long period,
554 TimeUnit unit) {
555 if (command == null || unit == null)
556 throw new NullPointerException();
557 if (period <= 0)
558 throw new IllegalArgumentException();
559 ScheduledFutureTask<Void> sft =
560 new ScheduledFutureTask<Void>(command,
561 null,
562 triggerTime(initialDelay, unit),
563 unit.toNanos(period));
564 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
565 sft.outerTask = t;
566 delayedExecute(t);
567 return t;
568 }
569
570 /**
571 * @throws RejectedExecutionException {@inheritDoc}
572 * @throws NullPointerException {@inheritDoc}
573 * @throws IllegalArgumentException {@inheritDoc}
574 */
575 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
576 long initialDelay,
577 long delay,
578 TimeUnit unit) {
579 if (command == null || unit == null)
580 throw new NullPointerException();
581 if (delay <= 0)
582 throw new IllegalArgumentException();
583 ScheduledFutureTask<Void> sft =
584 new ScheduledFutureTask<Void>(command,
585 null,
586 triggerTime(initialDelay, unit),
587 unit.toNanos(-delay));
588 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
589 sft.outerTask = t;
590 delayedExecute(t);
591 return t;
592 }
593
594 /**
595 * Executes {@code command} with zero required delay.
596 * This has effect equivalent to
597 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
598 * Note that inspections of the queue and of the list returned by
599 * {@code shutdownNow} will access the zero-delayed
600 * {@link ScheduledFuture}, not the {@code command} itself.
601 *
602 * <p>A consequence of the use of {@code ScheduledFuture} objects is
603 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
604 * called with a null second {@code Throwable} argument, even if the
605 * {@code command} terminated abruptly. Instead, the {@code Throwable}
606 * thrown by such a task can be obtained via {@link Future#get}.
607 *
608 * @throws RejectedExecutionException at discretion of
609 * {@code RejectedExecutionHandler}, if the task
610 * cannot be accepted for execution because the
611 * executor has been shut down
612 * @throws NullPointerException {@inheritDoc}
613 */
614 public void execute(Runnable command) {
615 schedule(command, 0, NANOSECONDS);
616 }
617
618 // Override AbstractExecutorService methods
619
620 /**
621 * @throws RejectedExecutionException {@inheritDoc}
622 * @throws NullPointerException {@inheritDoc}
623 */
624 public Future<?> submit(Runnable task) {
625 return schedule(task, 0, NANOSECONDS);
626 }
627
628 /**
629 * @throws RejectedExecutionException {@inheritDoc}
630 * @throws NullPointerException {@inheritDoc}
631 */
632 public <T> Future<T> submit(Runnable task, T result) {
633 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
634 }
635
636 /**
637 * @throws RejectedExecutionException {@inheritDoc}
638 * @throws NullPointerException {@inheritDoc}
639 */
640 public <T> Future<T> submit(Callable<T> task) {
641 return schedule(task, 0, NANOSECONDS);
642 }
643
644 /**
645 * Sets the policy on whether to continue executing existing
646 * periodic tasks even when this executor has been {@code shutdown}.
647 * In this case, these tasks will only terminate upon
648 * {@code shutdownNow} or after setting the policy to
649 * {@code false} when already shutdown.
650 * This value is by default {@code false}.
651 *
652 * @param value if {@code true}, continue after shutdown, else don't
653 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
654 */
655 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
656 continueExistingPeriodicTasksAfterShutdown = value;
657 if (!value && isShutdown())
658 onShutdown();
659 }
660
661 /**
662 * Gets the policy on whether to continue executing existing
663 * periodic tasks even when this executor has been {@code shutdown}.
664 * In this case, these tasks will only terminate upon
665 * {@code shutdownNow} or after setting the policy to
666 * {@code false} when already shutdown.
667 * This value is by default {@code false}.
668 *
669 * @return {@code true} if will continue after shutdown
670 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
671 */
672 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
673 return continueExistingPeriodicTasksAfterShutdown;
674 }
675
676 /**
677 * Sets the policy on whether to execute existing delayed
678 * tasks even when this executor has been {@code shutdown}.
679 * In this case, these tasks will only terminate upon
680 * {@code shutdownNow}, or after setting the policy to
681 * {@code false} when already shutdown.
682 * This value is by default {@code true}.
683 *
684 * @param value if {@code true}, execute after shutdown, else don't
685 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
686 */
687 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
688 executeExistingDelayedTasksAfterShutdown = value;
689 if (!value && isShutdown())
690 onShutdown();
691 }
692
693 /**
694 * Gets the policy on whether to execute existing delayed
695 * tasks even when this executor has been {@code shutdown}.
696 * In this case, these tasks will only terminate upon
697 * {@code shutdownNow}, or after setting the policy to
698 * {@code false} when already shutdown.
699 * This value is by default {@code true}.
700 *
701 * @return {@code true} if will execute after shutdown
702 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
703 */
704 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
705 return executeExistingDelayedTasksAfterShutdown;
706 }
707
708 /**
709 * Sets the policy on whether cancelled tasks should be immediately
710 * removed from the work queue at time of cancellation. This value is
711 * by default {@code false}.
712 *
713 * @param value if {@code true}, remove on cancellation, else don't
714 * @see #getRemoveOnCancelPolicy
715 * @since 1.7
716 */
717 public void setRemoveOnCancelPolicy(boolean value) {
718 removeOnCancel = value;
719 }
720
721 /**
722 * Gets the policy on whether cancelled tasks should be immediately
723 * removed from the work queue at time of cancellation. This value is
724 * by default {@code false}.
725 *
726 * @return {@code true} if cancelled tasks are immediately removed
727 * from the queue
728 * @see #setRemoveOnCancelPolicy
729 * @since 1.7
730 */
731 public boolean getRemoveOnCancelPolicy() {
732 return removeOnCancel;
733 }
734
735 /**
736 * Initiates an orderly shutdown in which previously submitted
737 * tasks are executed, but no new tasks will be accepted.
738 * Invocation has no additional effect if already shut down.
739 *
740 * <p>This method does not wait for previously submitted tasks to
741 * complete execution. Use {@link #awaitTermination awaitTermination}
742 * to do that.
743 *
744 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
745 * has been set {@code false}, existing delayed tasks whose delays
746 * have not yet elapsed are cancelled. And unless the {@code
747 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
748 * {@code true}, future executions of existing periodic tasks will
749 * be cancelled.
750 *
751 * @throws SecurityException {@inheritDoc}
752 */
753 public void shutdown() {
754 super.shutdown();
755 }
756
757 /**
758 * Attempts to stop all actively executing tasks, halts the
759 * processing of waiting tasks, and returns a list of the tasks
760 * that were awaiting execution.
761 *
762 * <p>This method does not wait for actively executing tasks to
763 * terminate. Use {@link #awaitTermination awaitTermination} to
764 * do that.
765 *
766 * <p>There are no guarantees beyond best-effort attempts to stop
767 * processing actively executing tasks. This implementation
768 * cancels tasks via {@link Thread#interrupt}, so any task that
769 * fails to respond to interrupts may never terminate.
770 *
771 * @return list of tasks that never commenced execution.
772 * Each element of this list is a {@link ScheduledFuture}.
773 * For tasks submitted via one of the {@code schedule}
774 * methods, the element will be identical to the returned
775 * {@code ScheduledFuture}. For tasks submitted using
776 * {@link #execute}, the element will be a zero-delay {@code
777 * ScheduledFuture}.
778 * @throws SecurityException {@inheritDoc}
779 */
780 public List<Runnable> shutdownNow() {
781 return super.shutdownNow();
782 }
783
784 /**
785 * Returns the task queue used by this executor.
786 * Each element of this list is a {@link ScheduledFuture}.
787 * For tasks submitted via one of the {@code schedule} methods, the
788 * element will be identical to the returned {@code ScheduledFuture}.
789 * For tasks submitted using {@link #execute}, the element will be a
790 * zero-delay {@code ScheduledFuture}.
791 *
792 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
793 * tasks in the order in which they will execute.
794 *
795 * @return the task queue
796 */
797 public BlockingQueue<Runnable> getQueue() {
798 return super.getQueue();
799 }
800
801 /**
802 * Specialized delay queue. To mesh with TPE declarations, this
803 * class must be declared as a BlockingQueue<Runnable> even though
804 * it can only hold RunnableScheduledFutures.
805 */
806 static class DelayedWorkQueue extends AbstractQueue<Runnable>
807 implements BlockingQueue<Runnable> {
808
809 /*
810 * A DelayedWorkQueue is based on a heap-based data structure
811 * like those in DelayQueue and PriorityQueue, except that
812 * every ScheduledFutureTask also records its index into the
813 * heap array. This eliminates the need to find a task upon
814 * cancellation, greatly speeding up removal (down from O(n)
815 * to O(log n)), and reducing garbage retention that would
816 * otherwise occur by waiting for the element to rise to top
817 * before clearing. But because the queue may also hold
818 * RunnableScheduledFutures that are not ScheduledFutureTasks,
819 * we are not guaranteed to have such indices available, in
820 * which case we fall back to linear search. (We expect that
821 * most tasks will not be decorated, and that the faster cases
822 * will be much more common.)
823 *
824 * All heap operations must record index changes -- mainly
825 * within siftUp and siftDown. Upon removal, a task's
826 * heapIndex is set to -1. Note that ScheduledFutureTasks can
827 * appear at most once in the queue (this need not be true for
828 * other kinds of tasks or work queues), so are uniquely
829 * identified by heapIndex.
830 */
831
832 private static final int INITIAL_CAPACITY = 16;
833 private RunnableScheduledFuture<?>[] queue =
834 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
835 private final ReentrantLock lock = new ReentrantLock();
836 private int size = 0;
837
838 /**
839 * Thread designated to wait for the task at the head of the
840 * queue. This variant of the Leader-Follower pattern
841 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
842 * minimize unnecessary timed waiting. When a thread becomes
843 * the leader, it waits only for the next delay to elapse, but
844 * other threads await indefinitely. The leader thread must
845 * signal some other thread before returning from take() or
846 * poll(...), unless some other thread becomes leader in the
847 * interim. Whenever the head of the queue is replaced with a
848 * task with an earlier expiration time, the leader field is
849 * invalidated by being reset to null, and some waiting
850 * thread, but not necessarily the current leader, is
851 * signalled. So waiting threads must be prepared to acquire
852 * and lose leadership while waiting.
853 */
854 private Thread leader = null;
855
856 /**
857 * Condition signalled when a newer task becomes available at the
858 * head of the queue or a new thread may need to become leader.
859 */
860 private final Condition available = lock.newCondition();
861
862 /**
863 * Sets f's heapIndex if it is a ScheduledFutureTask.
864 */
865 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
866 if (f instanceof ScheduledFutureTask)
867 ((ScheduledFutureTask)f).heapIndex = idx;
868 }
869
870 /**
871 * Sifts element added at bottom up to its heap-ordered spot.
872 * Call only when holding lock.
873 */
874 private void siftUp(int k, RunnableScheduledFuture<?> key) {
875 while (k > 0) {
876 int parent = (k - 1) >>> 1;
877 RunnableScheduledFuture<?> e = queue[parent];
878 if (key.compareTo(e) >= 0)
879 break;
880 queue[k] = e;
881 setIndex(e, k);
882 k = parent;
883 }
884 queue[k] = key;
885 setIndex(key, k);
886 }
887
888 /**
889 * Sifts element added at top down to its heap-ordered spot.
890 * Call only when holding lock.
891 */
892 private void siftDown(int k, RunnableScheduledFuture<?> key) {
893 int half = size >>> 1;
894 while (k < half) {
895 int child = (k << 1) + 1;
896 RunnableScheduledFuture<?> c = queue[child];
897 int right = child + 1;
898 if (right < size && c.compareTo(queue[right]) > 0)
899 c = queue[child = right];
900 if (key.compareTo(c) <= 0)
901 break;
902 queue[k] = c;
903 setIndex(c, k);
904 k = child;
905 }
906 queue[k] = key;
907 setIndex(key, k);
908 }
909
910 /**
911 * Resizes the heap array. Call only when holding lock.
912 */
913 private void grow() {
914 int oldCapacity = queue.length;
915 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
916 if (newCapacity < 0) // overflow
917 newCapacity = Integer.MAX_VALUE;
918 queue = Arrays.copyOf(queue, newCapacity);
919 }
920
921 /**
922 * Finds index of given object, or -1 if absent.
923 */
924 private int indexOf(Object x) {
925 if (x != null) {
926 if (x instanceof ScheduledFutureTask) {
927 int i = ((ScheduledFutureTask) x).heapIndex;
928 // Sanity check; x could conceivably be a
929 // ScheduledFutureTask from some other pool.
930 if (i >= 0 && i < size && queue[i] == x)
931 return i;
932 } else {
933 for (int i = 0; i < size; i++)
934 if (x.equals(queue[i]))
935 return i;
936 }
937 }
938 return -1;
939 }
940
941 public boolean contains(Object x) {
942 final ReentrantLock lock = this.lock;
943 lock.lock();
944 try {
945 return indexOf(x) != -1;
946 } finally {
947 lock.unlock();
948 }
949 }
950
951 public boolean remove(Object x) {
952 final ReentrantLock lock = this.lock;
953 lock.lock();
954 try {
955 int i = indexOf(x);
956 if (i < 0)
957 return false;
958
959 setIndex(queue[i], -1);
960 int s = --size;
961 RunnableScheduledFuture<?> replacement = queue[s];
962 queue[s] = null;
963 if (s != i) {
964 siftDown(i, replacement);
965 if (queue[i] == replacement)
966 siftUp(i, replacement);
967 }
968 return true;
969 } finally {
970 lock.unlock();
971 }
972 }
973
974 public int size() {
975 final ReentrantLock lock = this.lock;
976 lock.lock();
977 try {
978 return size;
979 } finally {
980 lock.unlock();
981 }
982 }
983
984 public boolean isEmpty() {
985 return size() == 0;
986 }
987
988 public int remainingCapacity() {
989 return Integer.MAX_VALUE;
990 }
991
992 public RunnableScheduledFuture<?> peek() {
993 final ReentrantLock lock = this.lock;
994 lock.lock();
995 try {
996 return queue[0];
997 } finally {
998 lock.unlock();
999 }
1000 }
1001
1002 public boolean offer(Runnable x) {
1003 if (x == null)
1004 throw new NullPointerException();
1005 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1006 final ReentrantLock lock = this.lock;
1007 lock.lock();
1008 try {
1009 int i = size;
1010 if (i >= queue.length)
1011 grow();
1012 size = i + 1;
1013 if (i == 0) {
1014 queue[0] = e;
1015 setIndex(e, 0);
1016 } else {
1017 siftUp(i, e);
1018 }
1019 if (queue[0] == e) {
1020 leader = null;
1021 available.signal();
1022 }
1023 } finally {
1024 lock.unlock();
1025 }
1026 return true;
1027 }
1028
1029 public void put(Runnable e) {
1030 offer(e);
1031 }
1032
1033 public boolean add(Runnable e) {
1034 return offer(e);
1035 }
1036
1037 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1038 return offer(e);
1039 }
1040
1041 /**
1042 * Performs common bookkeeping for poll and take: Replaces
1043 * first element with last and sifts it down. Call only when
1044 * holding lock.
1045 * @param f the task to remove and return
1046 */
1047 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1048 int s = --size;
1049 RunnableScheduledFuture<?> x = queue[s];
1050 queue[s] = null;
1051 if (s != 0)
1052 siftDown(0, x);
1053 setIndex(f, -1);
1054 return f;
1055 }
1056
1057 public RunnableScheduledFuture<?> poll() {
1058 final ReentrantLock lock = this.lock;
1059 lock.lock();
1060 try {
1061 RunnableScheduledFuture<?> first = queue[0];
1062 if (first == null || first.getDelay(NANOSECONDS) > 0)
1063 return null;
1064 else
1065 return finishPoll(first);
1066 } finally {
1067 lock.unlock();
1068 }
1069 }
1070
1071 public RunnableScheduledFuture<?> take() throws InterruptedException {
1072 final ReentrantLock lock = this.lock;
1073 lock.lockInterruptibly();
1074 try {
1075 for (;;) {
1076 RunnableScheduledFuture<?> first = queue[0];
1077 if (first == null)
1078 available.await();
1079 else {
1080 long delay = first.getDelay(NANOSECONDS);
1081 if (delay <= 0)
1082 return finishPoll(first);
1083 first = null; // don't retain ref while waiting
1084 if (leader != null)
1085 available.await();
1086 else {
1087 Thread thisThread = Thread.currentThread();
1088 leader = thisThread;
1089 try {
1090 available.awaitNanos(delay);
1091 } finally {
1092 if (leader == thisThread)
1093 leader = null;
1094 }
1095 }
1096 }
1097 }
1098 } finally {
1099 if (leader == null && queue[0] != null)
1100 available.signal();
1101 lock.unlock();
1102 }
1103 }
1104
1105 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1106 throws InterruptedException {
1107 long nanos = unit.toNanos(timeout);
1108 final ReentrantLock lock = this.lock;
1109 lock.lockInterruptibly();
1110 try {
1111 for (;;) {
1112 RunnableScheduledFuture<?> first = queue[0];
1113 if (first == null) {
1114 if (nanos <= 0)
1115 return null;
1116 else
1117 nanos = available.awaitNanos(nanos);
1118 } else {
1119 long delay = first.getDelay(NANOSECONDS);
1120 if (delay <= 0)
1121 return finishPoll(first);
1122 if (nanos <= 0)
1123 return null;
1124 first = null; // don't retain ref while waiting
1125 if (nanos < delay || leader != null)
1126 nanos = available.awaitNanos(nanos);
1127 else {
1128 Thread thisThread = Thread.currentThread();
1129 leader = thisThread;
1130 try {
1131 long timeLeft = available.awaitNanos(delay);
1132 nanos -= delay - timeLeft;
1133 } finally {
1134 if (leader == thisThread)
1135 leader = null;
1136 }
1137 }
1138 }
1139 }
1140 } finally {
1141 if (leader == null && queue[0] != null)
1142 available.signal();
1143 lock.unlock();
1144 }
1145 }
1146
1147 public void clear() {
1148 final ReentrantLock lock = this.lock;
1149 lock.lock();
1150 try {
1151 for (int i = 0; i < size; i++) {
1152 RunnableScheduledFuture<?> t = queue[i];
1153 if (t != null) {
1154 queue[i] = null;
1155 setIndex(t, -1);
1156 }
1157 }
1158 size = 0;
1159 } finally {
1160 lock.unlock();
1161 }
1162 }
1163
1164 /**
1165 * Returns first element only if it is expired.
1166 * Used only by drainTo. Call only when holding lock.
1167 */
1168 private RunnableScheduledFuture<?> peekExpired() {
1169 // assert lock.isHeldByCurrentThread();
1170 RunnableScheduledFuture<?> first = queue[0];
1171 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1172 null : first;
1173 }
1174
1175 public int drainTo(Collection<? super Runnable> c) {
1176 if (c == null)
1177 throw new NullPointerException();
1178 if (c == this)
1179 throw new IllegalArgumentException();
1180 final ReentrantLock lock = this.lock;
1181 lock.lock();
1182 try {
1183 RunnableScheduledFuture<?> first;
1184 int n = 0;
1185 while ((first = peekExpired()) != null) {
1186 c.add(first); // In this order, in case add() throws.
1187 finishPoll(first);
1188 ++n;
1189 }
1190 return n;
1191 } finally {
1192 lock.unlock();
1193 }
1194 }
1195
1196 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1197 if (c == null)
1198 throw new NullPointerException();
1199 if (c == this)
1200 throw new IllegalArgumentException();
1201 if (maxElements <= 0)
1202 return 0;
1203 final ReentrantLock lock = this.lock;
1204 lock.lock();
1205 try {
1206 RunnableScheduledFuture<?> first;
1207 int n = 0;
1208 while (n < maxElements && (first = peekExpired()) != null) {
1209 c.add(first); // In this order, in case add() throws.
1210 finishPoll(first);
1211 ++n;
1212 }
1213 return n;
1214 } finally {
1215 lock.unlock();
1216 }
1217 }
1218
1219 public Object[] toArray() {
1220 final ReentrantLock lock = this.lock;
1221 lock.lock();
1222 try {
1223 return Arrays.copyOf(queue, size, Object[].class);
1224 } finally {
1225 lock.unlock();
1226 }
1227 }
1228
1229 @SuppressWarnings("unchecked")
1230 public <T> T[] toArray(T[] a) {
1231 final ReentrantLock lock = this.lock;
1232 lock.lock();
1233 try {
1234 if (a.length < size)
1235 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1236 System.arraycopy(queue, 0, a, 0, size);
1237 if (a.length > size)
1238 a[size] = null;
1239 return a;
1240 } finally {
1241 lock.unlock();
1242 }
1243 }
1244
1245 public Iterator<Runnable> iterator() {
1246 return new Itr(Arrays.copyOf(queue, size));
1247 }
1248
1249 /**
1250 * Snapshot iterator that works off copy of underlying q array.
1251 */
1252 private class Itr implements Iterator<Runnable> {
1253 final RunnableScheduledFuture<?>[] array;
1254 int cursor = 0; // index of next element to return
1255 int lastRet = -1; // index of last element, or -1 if no such
1256
1257 Itr(RunnableScheduledFuture<?>[] array) {
1258 this.array = array;
1259 }
1260
1261 public boolean hasNext() {
1262 return cursor < array.length;
1263 }
1264
1265 public Runnable next() {
1266 if (cursor >= array.length)
1267 throw new NoSuchElementException();
1268 lastRet = cursor;
1269 return array[cursor++];
1270 }
1271
1272 public void remove() {
1273 if (lastRet < 0)
1274 throw new IllegalStateException();
1275 DelayedWorkQueue.this.remove(array[lastRet]);
1276 lastRet = -1;
1277 }
1278 }
1279 }
1280 }