ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.91
Committed: Sun Sep 20 16:14:27 2015 UTC (8 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.90: +2 -2 lines
Log Message:
kill mysterious bridge methods by s/ScheduledFutureTask.super/super/

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10 import static java.util.concurrent.TimeUnit.NANOSECONDS;
11
12 import java.util.AbstractQueue;
13 import java.util.Arrays;
14 import java.util.Collection;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.NoSuchElementException;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.locks.Condition;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 /**
23 * A {@link ThreadPoolExecutor} that can additionally schedule
24 * commands to run after a given delay, or to execute periodically.
25 * This class is preferable to {@link java.util.Timer} when multiple
26 * worker threads are needed, or when the additional flexibility or
27 * capabilities of {@link ThreadPoolExecutor} (which this class
28 * extends) are required.
29 *
30 * <p>Delayed tasks execute no sooner than they are enabled, but
31 * without any real-time guarantees about when, after they are
32 * enabled, they will commence. Tasks scheduled for exactly the same
33 * execution time are enabled in first-in-first-out (FIFO) order of
34 * submission.
35 *
36 * <p>When a submitted task is cancelled before it is run, execution
37 * is suppressed. By default, such a cancelled task is not
38 * automatically removed from the work queue until its delay elapses.
39 * While this enables further inspection and monitoring, it may also
40 * cause unbounded retention of cancelled tasks. To avoid this, use
41 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
42 * removed from the work queue at time of cancellation.
43 *
44 * <p>Successive executions of a periodic task scheduled via
45 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
46 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
47 * do not overlap. While different executions may be performed by
48 * different threads, the effects of prior executions
49 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
50 * those of subsequent ones.
51 *
52 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
53 * of the inherited tuning methods are not useful for it. In
54 * particular, because it acts as a fixed-sized pool using
55 * {@code corePoolSize} threads and an unbounded queue, adjustments
56 * to {@code maximumPoolSize} have no useful effect. Additionally, it
57 * is almost never a good idea to set {@code corePoolSize} to zero or
58 * use {@code allowCoreThreadTimeOut} because this may leave the pool
59 * without threads to handle tasks once they become eligible to run.
60 *
61 * <p><b>Extension notes:</b> This class overrides the
62 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
63 * {@link AbstractExecutorService#submit(Runnable) submit}
64 * methods to generate internal {@link ScheduledFuture} objects to
65 * control per-task delays and scheduling. To preserve
66 * functionality, any further overrides of these methods in
67 * subclasses must invoke superclass versions, which effectively
68 * disables additional task customization. However, this class
69 * provides alternative protected extension method
70 * {@code decorateTask} (one version each for {@code Runnable} and
71 * {@code Callable}) that can be used to customize the concrete task
72 * types used to execute commands entered via {@code execute},
73 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
74 * and {@code scheduleWithFixedDelay}. By default, a
75 * {@code ScheduledThreadPoolExecutor} uses a task type extending
76 * {@link FutureTask}. However, this may be modified or replaced using
77 * subclasses of the form:
78 *
79 * <pre> {@code
80 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
81 *
82 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
83 *
84 * protected <V> RunnableScheduledFuture<V> decorateTask(
85 * Runnable r, RunnableScheduledFuture<V> task) {
86 * return new CustomTask<V>(r, task);
87 * }
88 *
89 * protected <V> RunnableScheduledFuture<V> decorateTask(
90 * Callable<V> c, RunnableScheduledFuture<V> task) {
91 * return new CustomTask<V>(c, task);
92 * }
93 * // ... add constructors, etc.
94 * }}</pre>
95 *
96 * @since 1.5
97 * @author Doug Lea
98 */
99 public class ScheduledThreadPoolExecutor
100 extends ThreadPoolExecutor
101 implements ScheduledExecutorService {
102
103 /*
104 * This class specializes ThreadPoolExecutor implementation by
105 *
106 * 1. Using a custom task type, ScheduledFutureTask for
107 * tasks, even those that don't require scheduling (i.e.,
108 * those submitted using ExecutorService execute, not
109 * ScheduledExecutorService methods) which are treated as
110 * delayed tasks with a delay of zero.
111 *
112 * 2. Using a custom queue (DelayedWorkQueue), a variant of
113 * unbounded DelayQueue. The lack of capacity constraint and
114 * the fact that corePoolSize and maximumPoolSize are
115 * effectively identical simplifies some execution mechanics
116 * (see delayedExecute) compared to ThreadPoolExecutor.
117 *
118 * 3. Supporting optional run-after-shutdown parameters, which
119 * leads to overrides of shutdown methods to remove and cancel
120 * tasks that should NOT be run after shutdown, as well as
121 * different recheck logic when task (re)submission overlaps
122 * with a shutdown.
123 *
124 * 4. Task decoration methods to allow interception and
125 * instrumentation, which are needed because subclasses cannot
126 * otherwise override submit methods to get this effect. These
127 * don't have any impact on pool control logic though.
128 */
129
130 /**
131 * False if should cancel/suppress periodic tasks on shutdown.
132 */
133 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
134
135 /**
136 * False if should cancel non-periodic tasks on shutdown.
137 */
138 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
139
140 /**
141 * True if ScheduledFutureTask.cancel should remove from queue.
142 */
143 volatile boolean removeOnCancel;
144
145 /**
146 * Sequence number to break scheduling ties, and in turn to
147 * guarantee FIFO order among tied entries.
148 */
149 private static final AtomicLong sequencer = new AtomicLong();
150
151 /**
152 * Returns current nanosecond time.
153 */
154 static final long now() {
155 return System.nanoTime();
156 }
157
158 private class ScheduledFutureTask<V>
159 extends FutureTask<V> implements RunnableScheduledFuture<V> {
160
161 /** Sequence number to break ties FIFO */
162 private final long sequenceNumber;
163
164 /** The time the task is enabled to execute in nanoTime units */
165 private volatile long time;
166
167 /**
168 * Period in nanoseconds for repeating tasks.
169 * A positive value indicates fixed-rate execution.
170 * A negative value indicates fixed-delay execution.
171 * A value of 0 indicates a non-repeating (one-shot) task.
172 */
173 private final long period;
174
175 /** The actual task to be re-enqueued by reExecutePeriodic */
176 RunnableScheduledFuture<V> outerTask = this;
177
178 /**
179 * Index into delay queue, to support faster cancellation.
180 */
181 int heapIndex;
182
183 /**
184 * Creates a one-shot action with given nanoTime-based trigger time.
185 */
186 ScheduledFutureTask(Runnable r, V result, long triggerTime,
187 long sequenceNumber) {
188 super(r, result);
189 this.time = triggerTime;
190 this.period = 0;
191 this.sequenceNumber = sequenceNumber;
192 }
193
194 /**
195 * Creates a periodic action with given nanoTime-based initial
196 * trigger time and period.
197 */
198 ScheduledFutureTask(Runnable r, V result, long triggerTime,
199 long period, long sequenceNumber) {
200 super(r, result);
201 this.time = triggerTime;
202 this.period = period;
203 this.sequenceNumber = sequenceNumber;
204 }
205
206 /**
207 * Creates a one-shot action with given nanoTime-based trigger time.
208 */
209 ScheduledFutureTask(Callable<V> callable, long triggerTime,
210 long sequenceNumber) {
211 super(callable);
212 this.time = triggerTime;
213 this.period = 0;
214 this.sequenceNumber = sequenceNumber;
215 }
216
217 public long getDelay(TimeUnit unit) {
218 return unit.convert(time - now(), NANOSECONDS);
219 }
220
221 public int compareTo(Delayed other) {
222 if (other == this) // compare zero if same object
223 return 0;
224 if (other instanceof ScheduledFutureTask) {
225 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
226 long diff = time - x.time;
227 if (diff < 0)
228 return -1;
229 else if (diff > 0)
230 return 1;
231 else if (sequenceNumber < x.sequenceNumber)
232 return -1;
233 else
234 return 1;
235 }
236 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
237 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
238 }
239
240 /**
241 * Returns {@code true} if this is a periodic (not a one-shot) action.
242 *
243 * @return {@code true} if periodic
244 */
245 public boolean isPeriodic() {
246 return period != 0;
247 }
248
249 /**
250 * Sets the next time to run for a periodic task.
251 */
252 private void setNextRunTime() {
253 long p = period;
254 if (p > 0)
255 time += p;
256 else
257 time = triggerTime(-p);
258 }
259
260 public boolean cancel(boolean mayInterruptIfRunning) {
261 boolean cancelled = super.cancel(mayInterruptIfRunning);
262 if (cancelled && removeOnCancel && heapIndex >= 0)
263 remove(this);
264 return cancelled;
265 }
266
267 /**
268 * Overrides FutureTask version so as to reset/requeue if periodic.
269 */
270 public void run() {
271 boolean periodic = isPeriodic();
272 if (!canRunInCurrentRunState(periodic))
273 cancel(false);
274 else if (!periodic)
275 super.run();
276 else if (super.runAndReset()) {
277 setNextRunTime();
278 reExecutePeriodic(outerTask);
279 }
280 }
281 }
282
283 /**
284 * Returns true if can run a task given current run state
285 * and run-after-shutdown parameters.
286 *
287 * @param periodic true if this task periodic, false if delayed
288 */
289 boolean canRunInCurrentRunState(boolean periodic) {
290 return isRunningOrShutdown(periodic ?
291 continueExistingPeriodicTasksAfterShutdown :
292 executeExistingDelayedTasksAfterShutdown);
293 }
294
295 /**
296 * Main execution method for delayed or periodic tasks. If pool
297 * is shut down, rejects the task. Otherwise adds task to queue
298 * and starts a thread, if necessary, to run it. (We cannot
299 * prestart the thread to run the task because the task (probably)
300 * shouldn't be run yet.) If the pool is shut down while the task
301 * is being added, cancel and remove it if required by state and
302 * run-after-shutdown parameters.
303 *
304 * @param task the task
305 */
306 private void delayedExecute(RunnableScheduledFuture<?> task) {
307 if (isShutdown())
308 reject(task);
309 else {
310 super.getQueue().add(task);
311 if (isShutdown() &&
312 !canRunInCurrentRunState(task.isPeriodic()) &&
313 remove(task))
314 task.cancel(false);
315 else
316 ensurePrestart();
317 }
318 }
319
320 /**
321 * Requeues a periodic task unless current run state precludes it.
322 * Same idea as delayedExecute except drops task rather than rejecting.
323 *
324 * @param task the task
325 */
326 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
327 if (canRunInCurrentRunState(true)) {
328 super.getQueue().add(task);
329 if (!canRunInCurrentRunState(true) && remove(task))
330 task.cancel(false);
331 else
332 ensurePrestart();
333 }
334 }
335
336 /**
337 * Cancels and clears the queue of all tasks that should not be run
338 * due to shutdown policy. Invoked within super.shutdown.
339 */
340 @Override void onShutdown() {
341 BlockingQueue<Runnable> q = super.getQueue();
342 boolean keepDelayed =
343 getExecuteExistingDelayedTasksAfterShutdownPolicy();
344 boolean keepPeriodic =
345 getContinueExistingPeriodicTasksAfterShutdownPolicy();
346 if (!keepDelayed && !keepPeriodic) {
347 for (Object e : q.toArray())
348 if (e instanceof RunnableScheduledFuture<?>)
349 ((RunnableScheduledFuture<?>) e).cancel(false);
350 q.clear();
351 }
352 else {
353 // Traverse snapshot to avoid iterator exceptions
354 for (Object e : q.toArray()) {
355 if (e instanceof RunnableScheduledFuture) {
356 RunnableScheduledFuture<?> t =
357 (RunnableScheduledFuture<?>)e;
358 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
359 t.isCancelled()) { // also remove if already cancelled
360 if (q.remove(t))
361 t.cancel(false);
362 }
363 }
364 }
365 }
366 tryTerminate();
367 }
368
369 /**
370 * Modifies or replaces the task used to execute a runnable.
371 * This method can be used to override the concrete
372 * class used for managing internal tasks.
373 * The default implementation simply returns the given task.
374 *
375 * @param runnable the submitted Runnable
376 * @param task the task created to execute the runnable
377 * @param <V> the type of the task's result
378 * @return a task that can execute the runnable
379 * @since 1.6
380 */
381 protected <V> RunnableScheduledFuture<V> decorateTask(
382 Runnable runnable, RunnableScheduledFuture<V> task) {
383 return task;
384 }
385
386 /**
387 * Modifies or replaces the task used to execute a callable.
388 * This method can be used to override the concrete
389 * class used for managing internal tasks.
390 * The default implementation simply returns the given task.
391 *
392 * @param callable the submitted Callable
393 * @param task the task created to execute the callable
394 * @param <V> the type of the task's result
395 * @return a task that can execute the callable
396 * @since 1.6
397 */
398 protected <V> RunnableScheduledFuture<V> decorateTask(
399 Callable<V> callable, RunnableScheduledFuture<V> task) {
400 return task;
401 }
402
403 /**
404 * The default keep-alive time for pool threads.
405 *
406 * Normally, this value is unused because all pool threads will be
407 * core threads, but if a user creates a pool with a corePoolSize
408 * of zero (against our advice), we keep a thread alive as long as
409 * there are queued tasks. If the keep alive time is zero (the
410 * historic value), we end up hot-spinning in getTask, wasting a
411 * CPU. But on the other hand, if we set the value too high, and
412 * users create a one-shot pool which they don't cleanly shutdown,
413 * the pool's non-daemon threads will prevent JVM termination. A
414 * small but non-zero value (relative to a JVM's lifetime) seems
415 * best.
416 */
417 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
418
419 /**
420 * Creates a new {@code ScheduledThreadPoolExecutor} with the
421 * given core pool size.
422 *
423 * @param corePoolSize the number of threads to keep in the pool, even
424 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
425 * @throws IllegalArgumentException if {@code corePoolSize < 0}
426 */
427 public ScheduledThreadPoolExecutor(int corePoolSize) {
428 super(corePoolSize, Integer.MAX_VALUE,
429 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
430 new DelayedWorkQueue());
431 }
432
433 /**
434 * Creates a new {@code ScheduledThreadPoolExecutor} with the
435 * given initial parameters.
436 *
437 * @param corePoolSize the number of threads to keep in the pool, even
438 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
439 * @param threadFactory the factory to use when the executor
440 * creates a new thread
441 * @throws IllegalArgumentException if {@code corePoolSize < 0}
442 * @throws NullPointerException if {@code threadFactory} is null
443 */
444 public ScheduledThreadPoolExecutor(int corePoolSize,
445 ThreadFactory threadFactory) {
446 super(corePoolSize, Integer.MAX_VALUE,
447 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
448 new DelayedWorkQueue(), threadFactory);
449 }
450
451 /**
452 * Creates a new {@code ScheduledThreadPoolExecutor} with the
453 * given initial parameters.
454 *
455 * @param corePoolSize the number of threads to keep in the pool, even
456 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
457 * @param handler the handler to use when execution is blocked
458 * because the thread bounds and queue capacities are reached
459 * @throws IllegalArgumentException if {@code corePoolSize < 0}
460 * @throws NullPointerException if {@code handler} is null
461 */
462 public ScheduledThreadPoolExecutor(int corePoolSize,
463 RejectedExecutionHandler handler) {
464 super(corePoolSize, Integer.MAX_VALUE,
465 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
466 new DelayedWorkQueue(), handler);
467 }
468
469 /**
470 * Creates a new {@code ScheduledThreadPoolExecutor} with the
471 * given initial parameters.
472 *
473 * @param corePoolSize the number of threads to keep in the pool, even
474 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
475 * @param threadFactory the factory to use when the executor
476 * creates a new thread
477 * @param handler the handler to use when execution is blocked
478 * because the thread bounds and queue capacities are reached
479 * @throws IllegalArgumentException if {@code corePoolSize < 0}
480 * @throws NullPointerException if {@code threadFactory} or
481 * {@code handler} is null
482 */
483 public ScheduledThreadPoolExecutor(int corePoolSize,
484 ThreadFactory threadFactory,
485 RejectedExecutionHandler handler) {
486 super(corePoolSize, Integer.MAX_VALUE,
487 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
488 new DelayedWorkQueue(), threadFactory, handler);
489 }
490
491 /**
492 * Returns the nanoTime-based trigger time of a delayed action.
493 */
494 private long triggerTime(long delay, TimeUnit unit) {
495 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
496 }
497
498 /**
499 * Returns the nanoTime-based trigger time of a delayed action.
500 */
501 long triggerTime(long delay) {
502 return now() +
503 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
504 }
505
506 /**
507 * Constrains the values of all delays in the queue to be within
508 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
509 * This may occur if a task is eligible to be dequeued, but has
510 * not yet been, while some other task is added with a delay of
511 * Long.MAX_VALUE.
512 */
513 private long overflowFree(long delay) {
514 Delayed head = (Delayed) super.getQueue().peek();
515 if (head != null) {
516 long headDelay = head.getDelay(NANOSECONDS);
517 if (headDelay < 0 && (delay - headDelay < 0))
518 delay = Long.MAX_VALUE + headDelay;
519 }
520 return delay;
521 }
522
523 /**
524 * @throws RejectedExecutionException {@inheritDoc}
525 * @throws NullPointerException {@inheritDoc}
526 */
527 public ScheduledFuture<?> schedule(Runnable command,
528 long delay,
529 TimeUnit unit) {
530 if (command == null || unit == null)
531 throw new NullPointerException();
532 RunnableScheduledFuture<Void> t = decorateTask(command,
533 new ScheduledFutureTask<Void>(command, null,
534 triggerTime(delay, unit),
535 sequencer.getAndIncrement()));
536 delayedExecute(t);
537 return t;
538 }
539
540 /**
541 * @throws RejectedExecutionException {@inheritDoc}
542 * @throws NullPointerException {@inheritDoc}
543 */
544 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
545 long delay,
546 TimeUnit unit) {
547 if (callable == null || unit == null)
548 throw new NullPointerException();
549 RunnableScheduledFuture<V> t = decorateTask(callable,
550 new ScheduledFutureTask<V>(callable,
551 triggerTime(delay, unit),
552 sequencer.getAndIncrement()));
553 delayedExecute(t);
554 return t;
555 }
556
557 /**
558 * @throws RejectedExecutionException {@inheritDoc}
559 * @throws NullPointerException {@inheritDoc}
560 * @throws IllegalArgumentException {@inheritDoc}
561 */
562 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
563 long initialDelay,
564 long period,
565 TimeUnit unit) {
566 if (command == null || unit == null)
567 throw new NullPointerException();
568 if (period <= 0)
569 throw new IllegalArgumentException();
570 ScheduledFutureTask<Void> sft =
571 new ScheduledFutureTask<Void>(command,
572 null,
573 triggerTime(initialDelay, unit),
574 unit.toNanos(period),
575 sequencer.getAndIncrement());
576 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
577 sft.outerTask = t;
578 delayedExecute(t);
579 return t;
580 }
581
582 /**
583 * @throws RejectedExecutionException {@inheritDoc}
584 * @throws NullPointerException {@inheritDoc}
585 * @throws IllegalArgumentException {@inheritDoc}
586 */
587 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
588 long initialDelay,
589 long delay,
590 TimeUnit unit) {
591 if (command == null || unit == null)
592 throw new NullPointerException();
593 if (delay <= 0)
594 throw new IllegalArgumentException();
595 ScheduledFutureTask<Void> sft =
596 new ScheduledFutureTask<Void>(command,
597 null,
598 triggerTime(initialDelay, unit),
599 unit.toNanos(-delay),
600 sequencer.getAndIncrement());
601 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
602 sft.outerTask = t;
603 delayedExecute(t);
604 return t;
605 }
606
607 /**
608 * Executes {@code command} with zero required delay.
609 * This has effect equivalent to
610 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
611 * Note that inspections of the queue and of the list returned by
612 * {@code shutdownNow} will access the zero-delayed
613 * {@link ScheduledFuture}, not the {@code command} itself.
614 *
615 * <p>A consequence of the use of {@code ScheduledFuture} objects is
616 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
617 * called with a null second {@code Throwable} argument, even if the
618 * {@code command} terminated abruptly. Instead, the {@code Throwable}
619 * thrown by such a task can be obtained via {@link Future#get}.
620 *
621 * @throws RejectedExecutionException at discretion of
622 * {@code RejectedExecutionHandler}, if the task
623 * cannot be accepted for execution because the
624 * executor has been shut down
625 * @throws NullPointerException {@inheritDoc}
626 */
627 public void execute(Runnable command) {
628 schedule(command, 0, NANOSECONDS);
629 }
630
631 // Override AbstractExecutorService methods
632
633 /**
634 * @throws RejectedExecutionException {@inheritDoc}
635 * @throws NullPointerException {@inheritDoc}
636 */
637 public Future<?> submit(Runnable task) {
638 return schedule(task, 0, NANOSECONDS);
639 }
640
641 /**
642 * @throws RejectedExecutionException {@inheritDoc}
643 * @throws NullPointerException {@inheritDoc}
644 */
645 public <T> Future<T> submit(Runnable task, T result) {
646 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
647 }
648
649 /**
650 * @throws RejectedExecutionException {@inheritDoc}
651 * @throws NullPointerException {@inheritDoc}
652 */
653 public <T> Future<T> submit(Callable<T> task) {
654 return schedule(task, 0, NANOSECONDS);
655 }
656
657 /**
658 * Sets the policy on whether to continue executing existing
659 * periodic tasks even when this executor has been {@code shutdown}.
660 * In this case, these tasks will only terminate upon
661 * {@code shutdownNow} or after setting the policy to
662 * {@code false} when already shutdown.
663 * This value is by default {@code false}.
664 *
665 * @param value if {@code true}, continue after shutdown, else don't
666 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
667 */
668 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
669 continueExistingPeriodicTasksAfterShutdown = value;
670 if (!value && isShutdown())
671 onShutdown();
672 }
673
674 /**
675 * Gets the policy on whether to continue executing existing
676 * periodic tasks even when this executor has been {@code shutdown}.
677 * In this case, these tasks will only terminate upon
678 * {@code shutdownNow} or after setting the policy to
679 * {@code false} when already shutdown.
680 * This value is by default {@code false}.
681 *
682 * @return {@code true} if will continue after shutdown
683 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
684 */
685 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
686 return continueExistingPeriodicTasksAfterShutdown;
687 }
688
689 /**
690 * Sets the policy on whether to execute existing delayed
691 * tasks even when this executor has been {@code shutdown}.
692 * In this case, these tasks will only terminate upon
693 * {@code shutdownNow}, or after setting the policy to
694 * {@code false} when already shutdown.
695 * This value is by default {@code true}.
696 *
697 * @param value if {@code true}, execute after shutdown, else don't
698 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
699 */
700 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
701 executeExistingDelayedTasksAfterShutdown = value;
702 if (!value && isShutdown())
703 onShutdown();
704 }
705
706 /**
707 * Gets the policy on whether to execute existing delayed
708 * tasks even when this executor has been {@code shutdown}.
709 * In this case, these tasks will only terminate upon
710 * {@code shutdownNow}, or after setting the policy to
711 * {@code false} when already shutdown.
712 * This value is by default {@code true}.
713 *
714 * @return {@code true} if will execute after shutdown
715 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
716 */
717 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
718 return executeExistingDelayedTasksAfterShutdown;
719 }
720
721 /**
722 * Sets the policy on whether cancelled tasks should be immediately
723 * removed from the work queue at time of cancellation. This value is
724 * by default {@code false}.
725 *
726 * @param value if {@code true}, remove on cancellation, else don't
727 * @see #getRemoveOnCancelPolicy
728 * @since 1.7
729 */
730 public void setRemoveOnCancelPolicy(boolean value) {
731 removeOnCancel = value;
732 }
733
734 /**
735 * Gets the policy on whether cancelled tasks should be immediately
736 * removed from the work queue at time of cancellation. This value is
737 * by default {@code false}.
738 *
739 * @return {@code true} if cancelled tasks are immediately removed
740 * from the queue
741 * @see #setRemoveOnCancelPolicy
742 * @since 1.7
743 */
744 public boolean getRemoveOnCancelPolicy() {
745 return removeOnCancel;
746 }
747
748 /**
749 * Initiates an orderly shutdown in which previously submitted
750 * tasks are executed, but no new tasks will be accepted.
751 * Invocation has no additional effect if already shut down.
752 *
753 * <p>This method does not wait for previously submitted tasks to
754 * complete execution. Use {@link #awaitTermination awaitTermination}
755 * to do that.
756 *
757 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
758 * has been set {@code false}, existing delayed tasks whose delays
759 * have not yet elapsed are cancelled. And unless the {@code
760 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
761 * {@code true}, future executions of existing periodic tasks will
762 * be cancelled.
763 *
764 * @throws SecurityException {@inheritDoc}
765 */
766 public void shutdown() {
767 super.shutdown();
768 }
769
770 /**
771 * Attempts to stop all actively executing tasks, halts the
772 * processing of waiting tasks, and returns a list of the tasks
773 * that were awaiting execution.
774 *
775 * <p>This method does not wait for actively executing tasks to
776 * terminate. Use {@link #awaitTermination awaitTermination} to
777 * do that.
778 *
779 * <p>There are no guarantees beyond best-effort attempts to stop
780 * processing actively executing tasks. This implementation
781 * cancels tasks via {@link Thread#interrupt}, so any task that
782 * fails to respond to interrupts may never terminate.
783 *
784 * @return list of tasks that never commenced execution.
785 * Each element of this list is a {@link ScheduledFuture}.
786 * For tasks submitted via one of the {@code schedule}
787 * methods, the element will be identical to the returned
788 * {@code ScheduledFuture}. For tasks submitted using
789 * {@link #execute execute}, the element will be a
790 * zero-delay {@code ScheduledFuture}.
791 * @throws SecurityException {@inheritDoc}
792 */
793 public List<Runnable> shutdownNow() {
794 return super.shutdownNow();
795 }
796
797 /**
798 * Returns the task queue used by this executor.
799 * Each element of this list is a {@link ScheduledFuture}.
800 * For tasks submitted via one of the {@code schedule} methods, the
801 * element will be identical to the returned {@code ScheduledFuture}.
802 * For tasks submitted using {@link #execute execute}, the element
803 * will be a zero-delay {@code ScheduledFuture}.
804 *
805 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
806 * tasks in the order in which they will execute.
807 *
808 * @return the task queue
809 */
810 public BlockingQueue<Runnable> getQueue() {
811 return super.getQueue();
812 }
813
814 /**
815 * Specialized delay queue. To mesh with TPE declarations, this
816 * class must be declared as a BlockingQueue<Runnable> even though
817 * it can only hold RunnableScheduledFutures.
818 */
819 static class DelayedWorkQueue extends AbstractQueue<Runnable>
820 implements BlockingQueue<Runnable> {
821
822 /*
823 * A DelayedWorkQueue is based on a heap-based data structure
824 * like those in DelayQueue and PriorityQueue, except that
825 * every ScheduledFutureTask also records its index into the
826 * heap array. This eliminates the need to find a task upon
827 * cancellation, greatly speeding up removal (down from O(n)
828 * to O(log n)), and reducing garbage retention that would
829 * otherwise occur by waiting for the element to rise to top
830 * before clearing. But because the queue may also hold
831 * RunnableScheduledFutures that are not ScheduledFutureTasks,
832 * we are not guaranteed to have such indices available, in
833 * which case we fall back to linear search. (We expect that
834 * most tasks will not be decorated, and that the faster cases
835 * will be much more common.)
836 *
837 * All heap operations must record index changes -- mainly
838 * within siftUp and siftDown. Upon removal, a task's
839 * heapIndex is set to -1. Note that ScheduledFutureTasks can
840 * appear at most once in the queue (this need not be true for
841 * other kinds of tasks or work queues), so are uniquely
842 * identified by heapIndex.
843 */
844
845 private static final int INITIAL_CAPACITY = 16;
846 private RunnableScheduledFuture<?>[] queue =
847 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
848 private final ReentrantLock lock = new ReentrantLock();
849 private int size;
850
851 /**
852 * Thread designated to wait for the task at the head of the
853 * queue. This variant of the Leader-Follower pattern
854 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
855 * minimize unnecessary timed waiting. When a thread becomes
856 * the leader, it waits only for the next delay to elapse, but
857 * other threads await indefinitely. The leader thread must
858 * signal some other thread before returning from take() or
859 * poll(...), unless some other thread becomes leader in the
860 * interim. Whenever the head of the queue is replaced with a
861 * task with an earlier expiration time, the leader field is
862 * invalidated by being reset to null, and some waiting
863 * thread, but not necessarily the current leader, is
864 * signalled. So waiting threads must be prepared to acquire
865 * and lose leadership while waiting.
866 */
867 private Thread leader;
868
869 /**
870 * Condition signalled when a newer task becomes available at the
871 * head of the queue or a new thread may need to become leader.
872 */
873 private final Condition available = lock.newCondition();
874
875 /**
876 * Sets f's heapIndex if it is a ScheduledFutureTask.
877 */
878 private void setIndex(RunnableScheduledFuture<?> f, int idx) {
879 if (f instanceof ScheduledFutureTask)
880 ((ScheduledFutureTask)f).heapIndex = idx;
881 }
882
883 /**
884 * Sifts element added at bottom up to its heap-ordered spot.
885 * Call only when holding lock.
886 */
887 private void siftUp(int k, RunnableScheduledFuture<?> key) {
888 while (k > 0) {
889 int parent = (k - 1) >>> 1;
890 RunnableScheduledFuture<?> e = queue[parent];
891 if (key.compareTo(e) >= 0)
892 break;
893 queue[k] = e;
894 setIndex(e, k);
895 k = parent;
896 }
897 queue[k] = key;
898 setIndex(key, k);
899 }
900
901 /**
902 * Sifts element added at top down to its heap-ordered spot.
903 * Call only when holding lock.
904 */
905 private void siftDown(int k, RunnableScheduledFuture<?> key) {
906 int half = size >>> 1;
907 while (k < half) {
908 int child = (k << 1) + 1;
909 RunnableScheduledFuture<?> c = queue[child];
910 int right = child + 1;
911 if (right < size && c.compareTo(queue[right]) > 0)
912 c = queue[child = right];
913 if (key.compareTo(c) <= 0)
914 break;
915 queue[k] = c;
916 setIndex(c, k);
917 k = child;
918 }
919 queue[k] = key;
920 setIndex(key, k);
921 }
922
923 /**
924 * Resizes the heap array. Call only when holding lock.
925 */
926 private void grow() {
927 int oldCapacity = queue.length;
928 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
929 if (newCapacity < 0) // overflow
930 newCapacity = Integer.MAX_VALUE;
931 queue = Arrays.copyOf(queue, newCapacity);
932 }
933
934 /**
935 * Finds index of given object, or -1 if absent.
936 */
937 private int indexOf(Object x) {
938 if (x != null) {
939 if (x instanceof ScheduledFutureTask) {
940 int i = ((ScheduledFutureTask) x).heapIndex;
941 // Sanity check; x could conceivably be a
942 // ScheduledFutureTask from some other pool.
943 if (i >= 0 && i < size && queue[i] == x)
944 return i;
945 } else {
946 for (int i = 0; i < size; i++)
947 if (x.equals(queue[i]))
948 return i;
949 }
950 }
951 return -1;
952 }
953
954 public boolean contains(Object x) {
955 final ReentrantLock lock = this.lock;
956 lock.lock();
957 try {
958 return indexOf(x) != -1;
959 } finally {
960 lock.unlock();
961 }
962 }
963
964 public boolean remove(Object x) {
965 final ReentrantLock lock = this.lock;
966 lock.lock();
967 try {
968 int i = indexOf(x);
969 if (i < 0)
970 return false;
971
972 setIndex(queue[i], -1);
973 int s = --size;
974 RunnableScheduledFuture<?> replacement = queue[s];
975 queue[s] = null;
976 if (s != i) {
977 siftDown(i, replacement);
978 if (queue[i] == replacement)
979 siftUp(i, replacement);
980 }
981 return true;
982 } finally {
983 lock.unlock();
984 }
985 }
986
987 public int size() {
988 final ReentrantLock lock = this.lock;
989 lock.lock();
990 try {
991 return size;
992 } finally {
993 lock.unlock();
994 }
995 }
996
997 public boolean isEmpty() {
998 return size() == 0;
999 }
1000
1001 public int remainingCapacity() {
1002 return Integer.MAX_VALUE;
1003 }
1004
1005 public RunnableScheduledFuture<?> peek() {
1006 final ReentrantLock lock = this.lock;
1007 lock.lock();
1008 try {
1009 return queue[0];
1010 } finally {
1011 lock.unlock();
1012 }
1013 }
1014
1015 public boolean offer(Runnable x) {
1016 if (x == null)
1017 throw new NullPointerException();
1018 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1019 final ReentrantLock lock = this.lock;
1020 lock.lock();
1021 try {
1022 int i = size;
1023 if (i >= queue.length)
1024 grow();
1025 size = i + 1;
1026 if (i == 0) {
1027 queue[0] = e;
1028 setIndex(e, 0);
1029 } else {
1030 siftUp(i, e);
1031 }
1032 if (queue[0] == e) {
1033 leader = null;
1034 available.signal();
1035 }
1036 } finally {
1037 lock.unlock();
1038 }
1039 return true;
1040 }
1041
1042 public void put(Runnable e) {
1043 offer(e);
1044 }
1045
1046 public boolean add(Runnable e) {
1047 return offer(e);
1048 }
1049
1050 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1051 return offer(e);
1052 }
1053
1054 /**
1055 * Performs common bookkeeping for poll and take: Replaces
1056 * first element with last and sifts it down. Call only when
1057 * holding lock.
1058 * @param f the task to remove and return
1059 */
1060 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1061 int s = --size;
1062 RunnableScheduledFuture<?> x = queue[s];
1063 queue[s] = null;
1064 if (s != 0)
1065 siftDown(0, x);
1066 setIndex(f, -1);
1067 return f;
1068 }
1069
1070 public RunnableScheduledFuture<?> poll() {
1071 final ReentrantLock lock = this.lock;
1072 lock.lock();
1073 try {
1074 RunnableScheduledFuture<?> first = queue[0];
1075 return (first == null || first.getDelay(NANOSECONDS) > 0)
1076 ? null
1077 : finishPoll(first);
1078 } finally {
1079 lock.unlock();
1080 }
1081 }
1082
1083 public RunnableScheduledFuture<?> take() throws InterruptedException {
1084 final ReentrantLock lock = this.lock;
1085 lock.lockInterruptibly();
1086 try {
1087 for (;;) {
1088 RunnableScheduledFuture<?> first = queue[0];
1089 if (first == null)
1090 available.await();
1091 else {
1092 long delay = first.getDelay(NANOSECONDS);
1093 if (delay <= 0)
1094 return finishPoll(first);
1095 first = null; // don't retain ref while waiting
1096 if (leader != null)
1097 available.await();
1098 else {
1099 Thread thisThread = Thread.currentThread();
1100 leader = thisThread;
1101 try {
1102 available.awaitNanos(delay);
1103 } finally {
1104 if (leader == thisThread)
1105 leader = null;
1106 }
1107 }
1108 }
1109 }
1110 } finally {
1111 if (leader == null && queue[0] != null)
1112 available.signal();
1113 lock.unlock();
1114 }
1115 }
1116
1117 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1118 throws InterruptedException {
1119 long nanos = unit.toNanos(timeout);
1120 final ReentrantLock lock = this.lock;
1121 lock.lockInterruptibly();
1122 try {
1123 for (;;) {
1124 RunnableScheduledFuture<?> first = queue[0];
1125 if (first == null) {
1126 if (nanos <= 0)
1127 return null;
1128 else
1129 nanos = available.awaitNanos(nanos);
1130 } else {
1131 long delay = first.getDelay(NANOSECONDS);
1132 if (delay <= 0)
1133 return finishPoll(first);
1134 if (nanos <= 0)
1135 return null;
1136 first = null; // don't retain ref while waiting
1137 if (nanos < delay || leader != null)
1138 nanos = available.awaitNanos(nanos);
1139 else {
1140 Thread thisThread = Thread.currentThread();
1141 leader = thisThread;
1142 try {
1143 long timeLeft = available.awaitNanos(delay);
1144 nanos -= delay - timeLeft;
1145 } finally {
1146 if (leader == thisThread)
1147 leader = null;
1148 }
1149 }
1150 }
1151 }
1152 } finally {
1153 if (leader == null && queue[0] != null)
1154 available.signal();
1155 lock.unlock();
1156 }
1157 }
1158
1159 public void clear() {
1160 final ReentrantLock lock = this.lock;
1161 lock.lock();
1162 try {
1163 for (int i = 0; i < size; i++) {
1164 RunnableScheduledFuture<?> t = queue[i];
1165 if (t != null) {
1166 queue[i] = null;
1167 setIndex(t, -1);
1168 }
1169 }
1170 size = 0;
1171 } finally {
1172 lock.unlock();
1173 }
1174 }
1175
1176 /**
1177 * Returns first element only if it is expired.
1178 * Used only by drainTo. Call only when holding lock.
1179 */
1180 private RunnableScheduledFuture<?> peekExpired() {
1181 // assert lock.isHeldByCurrentThread();
1182 RunnableScheduledFuture<?> first = queue[0];
1183 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
1184 null : first;
1185 }
1186
1187 public int drainTo(Collection<? super Runnable> c) {
1188 if (c == null)
1189 throw new NullPointerException();
1190 if (c == this)
1191 throw new IllegalArgumentException();
1192 final ReentrantLock lock = this.lock;
1193 lock.lock();
1194 try {
1195 RunnableScheduledFuture<?> first;
1196 int n = 0;
1197 while ((first = peekExpired()) != null) {
1198 c.add(first); // In this order, in case add() throws.
1199 finishPoll(first);
1200 ++n;
1201 }
1202 return n;
1203 } finally {
1204 lock.unlock();
1205 }
1206 }
1207
1208 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1209 if (c == null)
1210 throw new NullPointerException();
1211 if (c == this)
1212 throw new IllegalArgumentException();
1213 if (maxElements <= 0)
1214 return 0;
1215 final ReentrantLock lock = this.lock;
1216 lock.lock();
1217 try {
1218 RunnableScheduledFuture<?> first;
1219 int n = 0;
1220 while (n < maxElements && (first = peekExpired()) != null) {
1221 c.add(first); // In this order, in case add() throws.
1222 finishPoll(first);
1223 ++n;
1224 }
1225 return n;
1226 } finally {
1227 lock.unlock();
1228 }
1229 }
1230
1231 public Object[] toArray() {
1232 final ReentrantLock lock = this.lock;
1233 lock.lock();
1234 try {
1235 return Arrays.copyOf(queue, size, Object[].class);
1236 } finally {
1237 lock.unlock();
1238 }
1239 }
1240
1241 @SuppressWarnings("unchecked")
1242 public <T> T[] toArray(T[] a) {
1243 final ReentrantLock lock = this.lock;
1244 lock.lock();
1245 try {
1246 if (a.length < size)
1247 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1248 System.arraycopy(queue, 0, a, 0, size);
1249 if (a.length > size)
1250 a[size] = null;
1251 return a;
1252 } finally {
1253 lock.unlock();
1254 }
1255 }
1256
1257 public Iterator<Runnable> iterator() {
1258 return new Itr(Arrays.copyOf(queue, size));
1259 }
1260
1261 /**
1262 * Snapshot iterator that works off copy of underlying q array.
1263 */
1264 private class Itr implements Iterator<Runnable> {
1265 final RunnableScheduledFuture<?>[] array;
1266 int cursor; // index of next element to return; initially 0
1267 int lastRet = -1; // index of last element returned; -1 if no such
1268
1269 Itr(RunnableScheduledFuture<?>[] array) {
1270 this.array = array;
1271 }
1272
1273 public boolean hasNext() {
1274 return cursor < array.length;
1275 }
1276
1277 public Runnable next() {
1278 if (cursor >= array.length)
1279 throw new NoSuchElementException();
1280 lastRet = cursor;
1281 return array[cursor++];
1282 }
1283
1284 public void remove() {
1285 if (lastRet < 0)
1286 throw new IllegalStateException();
1287 DelayedWorkQueue.this.remove(array[lastRet]);
1288 lastRet = -1;
1289 }
1290 }
1291 }
1292 }