ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.50
Committed: Sat Jul 19 15:38:49 2008 UTC (15 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.49: +31 -12 lines
Log Message:
Avoid numerical overflow for trigger times

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * A {@link ThreadPoolExecutor} that can additionally schedule
14 * commands to run after a given delay, or to execute
15 * periodically. This class is preferable to {@link java.util.Timer}
16 * when multiple worker threads are needed, or when the additional
17 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
18 * this class extends) are required.
19 *
20 * <p>Delayed tasks execute no sooner than they are enabled, but
21 * without any real-time guarantees about when, after they are
22 * enabled, they will commence. Tasks scheduled for exactly the same
23 * execution time are enabled in first-in-first-out (FIFO) order of
24 * submission.
25 *
26 * <p>When a submitted task is cancelled before it is run, execution
27 * is suppressed. By default, such a cancelled task is not
28 * automatically removed from the work queue until its delay
29 * elapses. While this enables further inspection and monitoring, it
30 * may also cause unbounded retention of cancelled tasks. To avoid
31 * this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which
32 * causes tasks to be immediately removed from the work queue at
33 * time of cancellation.
34 *
35 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
36 * of the inherited tuning methods are not useful for it. In
37 * particular, because it acts as a fixed-sized pool using
38 * {@code corePoolSize} threads and an unbounded queue, adjustments
39 * to {@code maximumPoolSize} have no useful effect. Additionally, it
40 * is almost never a good idea to set {@code corePoolSize} to zero or
41 * use {@code allowCoreThreadTimeOut} because this may leave the pool
42 * without threads to handle tasks once they become eligible to run.
43 *
44 * <p><b>Extension notes:</b> This class overrides the
45 * {@link ThreadPoolExecutor#execute execute} and
46 * {@link AbstractExecutorService#submit(Runnable) submit}
47 * methods to generate internal {@link ScheduledFuture} objects to
48 * control per-task delays and scheduling. To preserve
49 * functionality, any further overrides of these methods in
50 * subclasses must invoke superclass versions, which effectively
51 * disables additional task customization. However, this class
52 * provides alternative protected extension method
53 * {@code decorateTask} (one version each for {@code Runnable} and
54 * {@code Callable}) that can be used to customize the concrete task
55 * types used to execute commands entered via {@code execute},
56 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
57 * and {@code scheduleWithFixedDelay}. By default, a
58 * {@code ScheduledThreadPoolExecutor} uses a task type extending
59 * {@link FutureTask}. However, this may be modified or replaced using
60 * subclasses of the form:
61 *
62 * <pre> {@code
63 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
64 *
65 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
66 *
67 * protected <V> RunnableScheduledFuture<V> decorateTask(
68 * Runnable r, RunnableScheduledFuture<V> task) {
69 * return new CustomTask<V>(r, task);
70 * }
71 *
72 * protected <V> RunnableScheduledFuture<V> decorateTask(
73 * Callable<V> c, RunnableScheduledFuture<V> task) {
74 * return new CustomTask<V>(c, task);
75 * }
76 * // ... add constructors, etc.
77 * }}</pre>
78 *
79 * @since 1.5
80 * @author Doug Lea
81 */
82 public class ScheduledThreadPoolExecutor
83 extends ThreadPoolExecutor
84 implements ScheduledExecutorService {
85
86 /*
87 * This class specializes ThreadPoolExecutor implementation by
88 *
89 * 1. Using a custom task type, ScheduledFutureTask for
90 * tasks, even those that don't require scheduling (i.e.,
91 * those submitted using ExecutorService execute, not
92 * ScheduledExecutorService methods) which are treated as
93 * delayed tasks with a delay of zero.
94 *
95 * 2. Using a custom queue (DelayedWorkQueue), a variant of
96 * unbounded DelayQueue. The lack of capacity constraint and
97 * the fact that corePoolSize and maximumPoolSize are
98 * effectively identical simplifies some execution mechanics
99 * (see delayedExecute) compared to ThreadPoolExecutor.
100 *
101 * 3. Supporting optional run-after-shutdown parameters, which
102 * leads to overrides of shutdown methods to remove and cancel
103 * tasks that should NOT be run after shutdown, as well as
104 * different recheck logic when task (re)submission overlaps
105 * with a shutdown.
106 *
107 * 4. Task decoration methods to allow interception and
108 * instrumentation, which are needed because subclasses cannot
109 * otherwise override submit methods to get this effect. These
110 * don't have any impact on pool control logic though.
111 */
112
113 /**
114 * False if should cancel/suppress periodic tasks on shutdown.
115 */
116 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
117
118 /**
119 * False if should cancel non-periodic tasks on shutdown.
120 */
121 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
122
123 /**
124 * True if ScheduledFutureTask.cancel should remove from queue
125 */
126 private volatile boolean removeOnCancel = false;
127
128 /**
129 * Sequence number to break scheduling ties, and in turn to
130 * guarantee FIFO order among tied entries.
131 */
132 private static final AtomicLong sequencer = new AtomicLong(0);
133
134
135 /**
136 * Value of System.nanoTime upon static initialization. This is
137 * used as an offset by now() to avoid wraparound of time values
138 * that would make them appear negative.
139 */
140 static final long initialNanoTime = System.nanoTime();
141
142 /**
143 * Returns current nanosecond time.
144 */
145 static long now() {
146 return System.nanoTime() - initialNanoTime;
147 }
148
149 private class ScheduledFutureTask<V>
150 extends FutureTask<V> implements RunnableScheduledFuture<V> {
151
152 /** Sequence number to break ties FIFO */
153 private final long sequenceNumber;
154
155 /** The time the task is enabled to execute in nanoTime units */
156 private long time;
157
158 /**
159 * Period in nanoseconds for repeating tasks. A positive
160 * value indicates fixed-rate execution. A negative value
161 * indicates fixed-delay execution. A value of 0 indicates a
162 * non-repeating task.
163 */
164 private final long period;
165
166 /** The actual task to be re-enqueued by reExecutePeriodic */
167 RunnableScheduledFuture<V> outerTask = this;
168
169 /**
170 * Index into delay queue, to support faster cancellation.
171 */
172 int heapIndex;
173
174 /**
175 * Creates a one-shot action with given nanoTime-based trigger time.
176 */
177 ScheduledFutureTask(Runnable r, V result, long ns) {
178 super(r, result);
179 this.time = ns;
180 this.period = 0;
181 this.sequenceNumber = sequencer.getAndIncrement();
182 }
183
184 /**
185 * Creates a periodic action with given nano time and period.
186 */
187 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
188 super(r, result);
189 this.time = ns;
190 this.period = period;
191 this.sequenceNumber = sequencer.getAndIncrement();
192 }
193
194 /**
195 * Creates a one-shot action with given nanoTime-based trigger.
196 */
197 ScheduledFutureTask(Callable<V> callable, long ns) {
198 super(callable);
199 this.time = ns;
200 this.period = 0;
201 this.sequenceNumber = sequencer.getAndIncrement();
202 }
203
204 public long getDelay(TimeUnit unit) {
205 long d = time - now();
206 return d<=0? 0 : unit.convert(d, TimeUnit.NANOSECONDS);
207 }
208
209 public int compareTo(Delayed other) {
210 if (other == this) // compare zero ONLY if same object
211 return 0;
212 if (other instanceof ScheduledFutureTask) {
213 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
214 long diff = time - x.time;
215 if (diff < 0)
216 return -1;
217 else if (diff > 0)
218 return 1;
219 else if (sequenceNumber < x.sequenceNumber)
220 return -1;
221 else
222 return 1;
223 }
224 long d = (getDelay(TimeUnit.NANOSECONDS) -
225 other.getDelay(TimeUnit.NANOSECONDS));
226 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
227 }
228
229 /**
230 * Returns true if this is a periodic (not a one-shot) action.
231 *
232 * @return true if periodic
233 */
234 public boolean isPeriodic() {
235 return period != 0;
236 }
237
238 /**
239 * Sets the next time to run for a periodic task.
240 */
241 private void setNextRunTime() {
242 long p = period;
243 if (p > 0)
244 time += p;
245 else
246 time = now() - p;
247 }
248
249 public boolean cancel(boolean mayInterruptIfRunning) {
250 boolean cancelled = super.cancel(mayInterruptIfRunning);
251 if (cancelled && removeOnCancel && heapIndex >= 0)
252 remove(this);
253 return cancelled;
254 }
255
256 /**
257 * Overrides FutureTask version so as to reset/requeue if periodic.
258 */
259 public void run() {
260 boolean periodic = isPeriodic();
261 if (!canRunInCurrentRunState(periodic))
262 cancel(false);
263 else if (!periodic)
264 ScheduledFutureTask.super.run();
265 else if (ScheduledFutureTask.super.runAndReset()) {
266 setNextRunTime();
267 reExecutePeriodic(outerTask);
268 }
269 }
270 }
271
272 /**
273 * Returns true if can run a task given current run state
274 * and run-after-shutdown parameters.
275 *
276 * @param periodic true if this task periodic, false if delayed
277 */
278 boolean canRunInCurrentRunState(boolean periodic) {
279 return isRunningOrShutdown(periodic ?
280 continueExistingPeriodicTasksAfterShutdown :
281 executeExistingDelayedTasksAfterShutdown);
282 }
283
284 /**
285 * Main execution method for delayed or periodic tasks. If pool
286 * is shut down, rejects the task. Otherwise adds task to queue
287 * and starts a thread, if necessary, to run it. (We cannot
288 * prestart the thread to run the task because the task (probably)
289 * shouldn't be run yet,) If the pool is shut down while the task
290 * is being added, cancel and remove it if required by state and
291 * run-after-shutdown parameters.
292 *
293 * @param task the task
294 */
295 private void delayedExecute(RunnableScheduledFuture<?> task) {
296 if (isShutdown())
297 reject(task);
298 else {
299 super.getQueue().add(task);
300 if (isShutdown() &&
301 !canRunInCurrentRunState(task.isPeriodic()) &&
302 remove(task))
303 task.cancel(false);
304 else
305 prestartCoreThread();
306 }
307 }
308
309 /**
310 * Requeues a periodic task unless current run state precludes it.
311 * Same idea as delayedExecute except drops task rather than rejecting.
312 *
313 * @param task the task
314 */
315 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
316 if (canRunInCurrentRunState(true)) {
317 super.getQueue().add(task);
318 if (!canRunInCurrentRunState(true) && remove(task))
319 task.cancel(false);
320 else
321 prestartCoreThread();
322 }
323 }
324
325 /**
326 * Cancels and clears the queue of all tasks that should not be run
327 * due to shutdown policy. Invoked within super.shutdown.
328 */
329 @Override void onShutdown() {
330 BlockingQueue<Runnable> q = super.getQueue();
331 boolean keepDelayed =
332 getExecuteExistingDelayedTasksAfterShutdownPolicy();
333 boolean keepPeriodic =
334 getContinueExistingPeriodicTasksAfterShutdownPolicy();
335 if (!keepDelayed && !keepPeriodic)
336 q.clear();
337 else {
338 // Traverse snapshot to avoid iterator exceptions
339 for (Object e : q.toArray()) {
340 if (e instanceof RunnableScheduledFuture) {
341 RunnableScheduledFuture<?> t =
342 (RunnableScheduledFuture<?>)e;
343 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
344 t.isCancelled()) { // also remove if already cancelled
345 if (q.remove(t))
346 t.cancel(false);
347 }
348 }
349 }
350 }
351 tryTerminate();
352 }
353
354 /**
355 * Modifies or replaces the task used to execute a runnable.
356 * This method can be used to override the concrete
357 * class used for managing internal tasks.
358 * The default implementation simply returns the given task.
359 *
360 * @param runnable the submitted Runnable
361 * @param task the task created to execute the runnable
362 * @return a task that can execute the runnable
363 * @since 1.6
364 */
365 protected <V> RunnableScheduledFuture<V> decorateTask(
366 Runnable runnable, RunnableScheduledFuture<V> task) {
367 return task;
368 }
369
370 /**
371 * Modifies or replaces the task used to execute a callable.
372 * This method can be used to override the concrete
373 * class used for managing internal tasks.
374 * The default implementation simply returns the given task.
375 *
376 * @param callable the submitted Callable
377 * @param task the task created to execute the callable
378 * @return a task that can execute the callable
379 * @since 1.6
380 */
381 protected <V> RunnableScheduledFuture<V> decorateTask(
382 Callable<V> callable, RunnableScheduledFuture<V> task) {
383 return task;
384 }
385
386 /**
387 * Creates a new {@code ScheduledThreadPoolExecutor} with the
388 * given core pool size.
389 *
390 * @param corePoolSize the number of threads to keep in the pool, even
391 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
392 * @throws IllegalArgumentException if {@code corePoolSize < 0}
393 */
394 public ScheduledThreadPoolExecutor(int corePoolSize) {
395 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
396 new DelayedWorkQueue());
397 }
398
399 /**
400 * Creates a new {@code ScheduledThreadPoolExecutor} with the
401 * given initial parameters.
402 *
403 * @param corePoolSize the number of threads to keep in the pool, even
404 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
405 * @param threadFactory the factory to use when the executor
406 * creates a new thread
407 * @throws IllegalArgumentException if {@code corePoolSize < 0}
408 * @throws NullPointerException if {@code threadFactory} is null
409 */
410 public ScheduledThreadPoolExecutor(int corePoolSize,
411 ThreadFactory threadFactory) {
412 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
413 new DelayedWorkQueue(), threadFactory);
414 }
415
416 /**
417 * Creates a new ScheduledThreadPoolExecutor with the given
418 * initial parameters.
419 *
420 * @param corePoolSize the number of threads to keep in the pool, even
421 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
422 * @param handler the handler to use when execution is blocked
423 * because the thread bounds and queue capacities are reached
424 * @throws IllegalArgumentException if {@code corePoolSize < 0}
425 * @throws NullPointerException if {@code handler} is null
426 */
427 public ScheduledThreadPoolExecutor(int corePoolSize,
428 RejectedExecutionHandler handler) {
429 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
430 new DelayedWorkQueue(), handler);
431 }
432
433 /**
434 * Creates a new ScheduledThreadPoolExecutor with the given
435 * initial parameters.
436 *
437 * @param corePoolSize the number of threads to keep in the pool, even
438 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
439 * @param threadFactory the factory to use when the executor
440 * creates a new thread
441 * @param handler the handler to use when execution is blocked
442 * because the thread bounds and queue capacities are reached
443 * @throws IllegalArgumentException if {@code corePoolSize < 0}
444 * @throws NullPointerException if {@code threadFactory} or
445 * {@code handler} is null
446 */
447 public ScheduledThreadPoolExecutor(int corePoolSize,
448 ThreadFactory threadFactory,
449 RejectedExecutionHandler handler) {
450 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
451 new DelayedWorkQueue(), threadFactory, handler);
452 }
453
454 /**
455 * Returns the trigger time of a delayed action
456 */
457 private static long nextTriggerTime(long delay, TimeUnit unit) {
458 long triggerTime;
459 long now = now();
460 if (delay <= 0)
461 return now; // avoid negative trigger times
462 else if ((triggerTime = now + unit.toNanos(delay)) < 0)
463 return Long.MAX_VALUE; // avoid numerical overflow
464 else
465 return triggerTime;
466 }
467
468 /**
469 * @throws RejectedExecutionException {@inheritDoc}
470 * @throws NullPointerException {@inheritDoc}
471 */
472 public ScheduledFuture<?> schedule(Runnable command,
473 long delay,
474 TimeUnit unit) {
475 if (command == null || unit == null)
476 throw new NullPointerException();
477 long triggerTime = nextTriggerTime(delay, unit);
478 RunnableScheduledFuture<?> t = decorateTask(command,
479 new ScheduledFutureTask<Void>(command, null, triggerTime));
480 delayedExecute(t);
481 return t;
482 }
483
484 /**
485 * @throws RejectedExecutionException {@inheritDoc}
486 * @throws NullPointerException {@inheritDoc}
487 */
488 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
489 long delay,
490 TimeUnit unit) {
491 if (callable == null || unit == null)
492 throw new NullPointerException();
493 long triggerTime = nextTriggerTime(delay, unit);
494 RunnableScheduledFuture<V> t = decorateTask(callable,
495 new ScheduledFutureTask<V>(callable, triggerTime));
496 delayedExecute(t);
497 return t;
498 }
499
500 /**
501 * @throws RejectedExecutionException {@inheritDoc}
502 * @throws NullPointerException {@inheritDoc}
503 * @throws IllegalArgumentException {@inheritDoc}
504 */
505 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
506 long initialDelay,
507 long period,
508 TimeUnit unit) {
509 if (command == null || unit == null)
510 throw new NullPointerException();
511 if (period <= 0)
512 throw new IllegalArgumentException();
513 if (initialDelay < 0) initialDelay = 0;
514 long triggerTime = nextTriggerTime(initialDelay, unit);
515 ScheduledFutureTask<Void> sft =
516 new ScheduledFutureTask<Void>(command,
517 null,
518 triggerTime,
519 unit.toNanos(period));
520 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
521 sft.outerTask = t;
522 delayedExecute(t);
523 return t;
524 }
525
526 /**
527 * @throws RejectedExecutionException {@inheritDoc}
528 * @throws NullPointerException {@inheritDoc}
529 * @throws IllegalArgumentException {@inheritDoc}
530 */
531 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
532 long initialDelay,
533 long delay,
534 TimeUnit unit) {
535 if (command == null || unit == null)
536 throw new NullPointerException();
537 if (delay <= 0)
538 throw new IllegalArgumentException();
539 long triggerTime = nextTriggerTime(initialDelay, unit);
540 ScheduledFutureTask<Void> sft =
541 new ScheduledFutureTask<Void>(command,
542 null,
543 triggerTime,
544 unit.toNanos(-delay));
545 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
546 sft.outerTask = t;
547 delayedExecute(t);
548 return t;
549 }
550
551 /**
552 * Executes {@code command} with zero required delay.
553 * This has effect equivalent to
554 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
555 * Note that inspections of the queue and of the list returned by
556 * {@code shutdownNow} will access the zero-delayed
557 * {@link ScheduledFuture}, not the {@code command} itself.
558 *
559 * <p>A consequence of the use of {@code ScheduledFuture} objects is
560 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
561 * called with a null second {@code Throwable} argument, even if the
562 * {@code command} terminated abruptly. Instead, the {@code Throwable}
563 * thrown by such a task can be obtained via {@link Future#get}.
564 *
565 * @throws RejectedExecutionException at discretion of
566 * {@code RejectedExecutionHandler}, if the task
567 * cannot be accepted for execution because the
568 * executor has been shut down
569 * @throws NullPointerException {@inheritDoc}
570 */
571 public void execute(Runnable command) {
572 schedule(command, 0, TimeUnit.NANOSECONDS);
573 }
574
575 // Override AbstractExecutorService methods
576
577 /**
578 * @throws RejectedExecutionException {@inheritDoc}
579 * @throws NullPointerException {@inheritDoc}
580 */
581 public Future<?> submit(Runnable task) {
582 return schedule(task, 0, TimeUnit.NANOSECONDS);
583 }
584
585 /**
586 * @throws RejectedExecutionException {@inheritDoc}
587 * @throws NullPointerException {@inheritDoc}
588 */
589 public <T> Future<T> submit(Runnable task, T result) {
590 return schedule(Executors.callable(task, result),
591 0, TimeUnit.NANOSECONDS);
592 }
593
594 /**
595 * @throws RejectedExecutionException {@inheritDoc}
596 * @throws NullPointerException {@inheritDoc}
597 */
598 public <T> Future<T> submit(Callable<T> task) {
599 return schedule(task, 0, TimeUnit.NANOSECONDS);
600 }
601
602 /**
603 * Sets the policy on whether to continue executing existing
604 * periodic tasks even when this executor has been {@code shutdown}.
605 * In this case, these tasks will only terminate upon
606 * {@code shutdownNow} or after setting the policy to
607 * {@code false} when already shutdown.
608 * This value is by default {@code false}.
609 *
610 * @param value if {@code true}, continue after shutdown, else don't.
611 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
612 */
613 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
614 continueExistingPeriodicTasksAfterShutdown = value;
615 if (!value && isShutdown())
616 onShutdown();
617 }
618
619 /**
620 * Gets the policy on whether to continue executing existing
621 * periodic tasks even when this executor has been {@code shutdown}.
622 * In this case, these tasks will only terminate upon
623 * {@code shutdownNow} or after setting the policy to
624 * {@code false} when already shutdown.
625 * This value is by default {@code false}.
626 *
627 * @return {@code true} if will continue after shutdown
628 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
629 */
630 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
631 return continueExistingPeriodicTasksAfterShutdown;
632 }
633
634 /**
635 * Sets the policy on whether to execute existing delayed
636 * tasks even when this executor has been {@code shutdown}.
637 * In this case, these tasks will only terminate upon
638 * {@code shutdownNow}, or after setting the policy to
639 * {@code false} when already shutdown.
640 * This value is by default {@code true}.
641 *
642 * @param value if {@code true}, execute after shutdown, else don't.
643 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
644 */
645 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
646 executeExistingDelayedTasksAfterShutdown = value;
647 if (!value && isShutdown())
648 onShutdown();
649 }
650
651 /**
652 * Gets the policy on whether to execute existing delayed
653 * tasks even when this executor has been {@code shutdown}.
654 * In this case, these tasks will only terminate upon
655 * {@code shutdownNow}, or after setting the policy to
656 * {@code false} when already shutdown.
657 * This value is by default {@code true}.
658 *
659 * @return {@code true} if will execute after shutdown
660 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
661 */
662 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
663 return executeExistingDelayedTasksAfterShutdown;
664 }
665
666 /**
667 * Sets the policy on whether cancelled tasks should be immediately
668 * removed from the work queue at time of cancellation. This value is
669 * by default {@code false}.
670 *
671 * @param value if {@code true}, remove on cancellation, else don't
672 * @see #getRemoveOnCancelPolicy
673 * @since 1.7
674 */
675 public void setRemoveOnCancelPolicy(boolean value) {
676 removeOnCancel = value;
677 }
678
679 /**
680 * Gets the policy on whether cancelled tasks should be immediately
681 * removed from the work queue at time of cancellation. This value is
682 * by default {@code false}.
683 *
684 * @return {@code true} if cancelled tasks are immediately removed
685 * from the queue
686 * @see #setRemoveOnCancelPolicy
687 * @since 1.7
688 */
689 public boolean getRemoveOnCancelPolicy() {
690 return removeOnCancel;
691 }
692
693 /**
694 * Initiates an orderly shutdown in which previously submitted
695 * tasks are executed, but no new tasks will be accepted.
696 * Invocation has no additional effect if already shut down.
697 *
698 * <p>This method does not wait for previously submitted tasks to
699 * complete execution. Use {@link #awaitTermination awaitTermination}
700 * to do that.
701 *
702 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
703 * has been set {@code false}, existing delayed tasks whose delays
704 * have not yet elapsed are cancelled. And unless the {@code
705 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
706 * {@code true}, future executions of existing periodic tasks will
707 * be cancelled.
708 *
709 * @throws SecurityException {@inheritDoc}
710 */
711 public void shutdown() {
712 super.shutdown();
713 }
714
715 /**
716 * Attempts to stop all actively executing tasks, halts the
717 * processing of waiting tasks, and returns a list of the tasks
718 * that were awaiting execution.
719 *
720 * <p>This method does not wait for actively executing tasks to
721 * terminate. Use {@link #awaitTermination awaitTermination} to
722 * do that.
723 *
724 * <p>There are no guarantees beyond best-effort attempts to stop
725 * processing actively executing tasks. This implementation
726 * cancels tasks via {@link Thread#interrupt}, so any task that
727 * fails to respond to interrupts may never terminate.
728 *
729 * @return list of tasks that never commenced execution.
730 * Each element of this list is a {@link ScheduledFuture},
731 * including those tasks submitted using {@code execute},
732 * which are for scheduling purposes used as the basis of a
733 * zero-delay {@code ScheduledFuture}.
734 * @throws SecurityException {@inheritDoc}
735 */
736 public List<Runnable> shutdownNow() {
737 return super.shutdownNow();
738 }
739
740 /**
741 * Returns the task queue used by this executor. Each element of
742 * this queue is a {@link ScheduledFuture}, including those
743 * tasks submitted using {@code execute} which are for scheduling
744 * purposes used as the basis of a zero-delay
745 * {@code ScheduledFuture}. Iteration over this queue is
746 * <em>not</em> guaranteed to traverse tasks in the order in
747 * which they will execute.
748 *
749 * @return the task queue
750 */
751 public BlockingQueue<Runnable> getQueue() {
752 return super.getQueue();
753 }
754
755 /**
756 * Specialized delay queue. To mesh with TPE declarations, this
757 * class must be declared as a BlockingQueue<Runnable> even though
758 * it can only hold RunnableScheduledFutures.
759 */
760 static class DelayedWorkQueue extends AbstractQueue<Runnable>
761 implements BlockingQueue<Runnable> {
762
763 /*
764 * A DelayedWorkQueue is based on a heap-based data structure
765 * like those in DelayQueue and PriorityQueue, except that
766 * every ScheduledFutureTask also records its index into the
767 * heap array. This eliminates the need to find a task upon
768 * cancellation, greatly speeding up removal (down from O(n)
769 * to O(log n)), and reducing garbage retention that would
770 * otherwise occur by waiting for the element to rise to top
771 * before clearing. But because the queue may also hold
772 * RunnableScheduledFutures that are not ScheduledFutureTasks,
773 * we are not guaranteed to have such indices available, in
774 * which case we fall back to linear search. (We expect that
775 * most tasks will not be decorated, and that the faster cases
776 * will be much more common.)
777 *
778 * All heap operations must record index changes -- mainly
779 * within siftUp and siftDown. Upon removal, a task's
780 * heapIndex is set to -1. Note that ScheduledFutureTasks can
781 * appear at most once in the queue (this need not be true for
782 * other kinds of tasks or work queues), so are uniquely
783 * identified by heapIndex.
784 */
785
786 private static final int INITIAL_CAPACITY = 16;
787 private RunnableScheduledFuture[] queue =
788 new RunnableScheduledFuture[INITIAL_CAPACITY];
789 private final ReentrantLock lock = new ReentrantLock();
790 private int size = 0;
791
792 /**
793 * Thread designated to wait for the task at the head of the
794 * queue. This variant of the Leader-Follower pattern
795 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
796 * minimize unnecessary timed waiting. When a thread becomes
797 * the leader, it waits only for the next delay to elapse, but
798 * other threads await indefinitely. The leader thread must
799 * signal some other thread before returning from take() or
800 * poll(...), unless some other thread becomes leader in the
801 * interim. Whenever the head of the queue is replaced with a
802 * task with an earlier expiration time, the leader field is
803 * invalidated by being reset to null, and some waiting
804 * thread, but not necessarily the current leader, is
805 * signalled. So waiting threads must be prepared to acquire
806 * and lose leadership while waiting.
807 */
808 private Thread leader = null;
809
810 /**
811 * Condition signalled when a newer task becomes available at the
812 * head of the queue or a new thread may need to become leader.
813 */
814 private final Condition available = lock.newCondition();
815
816 /**
817 * Set f's heapIndex if it is a ScheduledFutureTask.
818 */
819 private void setIndex(RunnableScheduledFuture f, int idx) {
820 if (f instanceof ScheduledFutureTask)
821 ((ScheduledFutureTask)f).heapIndex = idx;
822 }
823
824 /**
825 * Sift element added at bottom up to its heap-ordered spot.
826 * Call only when holding lock.
827 */
828 private void siftUp(int k, RunnableScheduledFuture key) {
829 while (k > 0) {
830 int parent = (k - 1) >>> 1;
831 RunnableScheduledFuture e = queue[parent];
832 if (key.compareTo(e) >= 0)
833 break;
834 queue[k] = e;
835 setIndex(e, k);
836 k = parent;
837 }
838 queue[k] = key;
839 setIndex(key, k);
840 }
841
842 /**
843 * Sift element added at top down to its heap-ordered spot.
844 * Call only when holding lock.
845 */
846 private void siftDown(int k, RunnableScheduledFuture key) {
847 int half = size >>> 1;
848 while (k < half) {
849 int child = (k << 1) + 1;
850 RunnableScheduledFuture c = queue[child];
851 int right = child + 1;
852 if (right < size && c.compareTo(queue[right]) > 0)
853 c = queue[child = right];
854 if (key.compareTo(c) <= 0)
855 break;
856 queue[k] = c;
857 setIndex(c, k);
858 k = child;
859 }
860 queue[k] = key;
861 setIndex(key, k);
862 }
863
864 /**
865 * Resize the heap array. Call only when holding lock.
866 */
867 private void grow() {
868 int oldCapacity = queue.length;
869 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
870 if (newCapacity < 0) // overflow
871 newCapacity = Integer.MAX_VALUE;
872 queue = Arrays.copyOf(queue, newCapacity);
873 }
874
875 /**
876 * Find index of given object, or -1 if absent
877 */
878 private int indexOf(Object x) {
879 if (x != null) {
880 if (x instanceof ScheduledFutureTask) {
881 int i = ((ScheduledFutureTask) x).heapIndex;
882 // Sanity check; x could conceivably be a
883 // ScheduledFutureTask from some other pool.
884 if (i >= 0 && i < size && queue[i] == x)
885 return i;
886 } else {
887 for (int i = 0; i < size; i++)
888 if (x.equals(queue[i]))
889 return i;
890 }
891 }
892 return -1;
893 }
894
895 public boolean contains(Object x) {
896 final ReentrantLock lock = this.lock;
897 lock.lock();
898 try {
899 return indexOf(x) != -1;
900 } finally {
901 lock.unlock();
902 }
903 }
904
905 public boolean remove(Object x) {
906 final ReentrantLock lock = this.lock;
907 lock.lock();
908 try {
909 int i = indexOf(x);
910 if (i < 0)
911 return false;
912
913 setIndex(queue[i], -1);
914 int s = --size;
915 RunnableScheduledFuture replacement = queue[s];
916 queue[s] = null;
917 if (s != i) {
918 siftDown(i, replacement);
919 if (queue[i] == replacement)
920 siftUp(i, replacement);
921 }
922 return true;
923 } finally {
924 lock.unlock();
925 }
926 }
927
928 public int size() {
929 final ReentrantLock lock = this.lock;
930 lock.lock();
931 try {
932 return size;
933 } finally {
934 lock.unlock();
935 }
936 }
937
938 public boolean isEmpty() {
939 return size() == 0;
940 }
941
942 public int remainingCapacity() {
943 return Integer.MAX_VALUE;
944 }
945
946 public RunnableScheduledFuture peek() {
947 final ReentrantLock lock = this.lock;
948 lock.lock();
949 try {
950 return queue[0];
951 } finally {
952 lock.unlock();
953 }
954 }
955
956 public boolean offer(Runnable x) {
957 if (x == null)
958 throw new NullPointerException();
959 RunnableScheduledFuture e = (RunnableScheduledFuture)x;
960 final ReentrantLock lock = this.lock;
961 lock.lock();
962 try {
963 int i = size;
964 if (i >= queue.length)
965 grow();
966 size = i + 1;
967 if (i == 0) {
968 queue[0] = e;
969 setIndex(e, 0);
970 } else {
971 siftUp(i, e);
972 }
973 if (queue[0] == e) {
974 leader = null;
975 available.signal();
976 }
977 } finally {
978 lock.unlock();
979 }
980 return true;
981 }
982
983 public void put(Runnable e) {
984 offer(e);
985 }
986
987 public boolean add(Runnable e) {
988 return offer(e);
989 }
990
991 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
992 return offer(e);
993 }
994
995 /**
996 * Performs common bookkeeping for poll and take: Replaces
997 * first element with last and sifts it down. Call only when
998 * holding lock.
999 * @param f the task to remove and return
1000 */
1001 private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
1002 int s = --size;
1003 RunnableScheduledFuture x = queue[s];
1004 queue[s] = null;
1005 if (s != 0)
1006 siftDown(0, x);
1007 setIndex(f, -1);
1008 return f;
1009 }
1010
1011 public RunnableScheduledFuture poll() {
1012 final ReentrantLock lock = this.lock;
1013 lock.lock();
1014 try {
1015 RunnableScheduledFuture first = queue[0];
1016 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1017 return null;
1018 else
1019 return finishPoll(first);
1020 } finally {
1021 lock.unlock();
1022 }
1023 }
1024
1025 public RunnableScheduledFuture take() throws InterruptedException {
1026 final ReentrantLock lock = this.lock;
1027 lock.lockInterruptibly();
1028 try {
1029 for (;;) {
1030 RunnableScheduledFuture first = queue[0];
1031 if (first == null)
1032 available.await();
1033 else {
1034 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1035 if (delay <= 0)
1036 return finishPoll(first);
1037 else if (leader != null)
1038 available.await();
1039 else {
1040 Thread thisThread = Thread.currentThread();
1041 leader = thisThread;
1042 try {
1043 available.awaitNanos(delay);
1044 } finally {
1045 if (leader == thisThread)
1046 leader = null;
1047 }
1048 }
1049 }
1050 }
1051 } finally {
1052 if (leader == null && queue[0] != null)
1053 available.signal();
1054 lock.unlock();
1055 }
1056 }
1057
1058 public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
1059 throws InterruptedException {
1060 long nanos = unit.toNanos(timeout);
1061 final ReentrantLock lock = this.lock;
1062 lock.lockInterruptibly();
1063 try {
1064 for (;;) {
1065 RunnableScheduledFuture first = queue[0];
1066 if (first == null) {
1067 if (nanos <= 0)
1068 return null;
1069 else
1070 nanos = available.awaitNanos(nanos);
1071 } else {
1072 long delay = first.getDelay(TimeUnit.NANOSECONDS);
1073 if (delay <= 0)
1074 return finishPoll(first);
1075 if (nanos <= 0)
1076 return null;
1077 if (nanos < delay || leader != null)
1078 nanos = available.awaitNanos(nanos);
1079 else {
1080 Thread thisThread = Thread.currentThread();
1081 leader = thisThread;
1082 try {
1083 long timeLeft = available.awaitNanos(delay);
1084 nanos -= delay - timeLeft;
1085 } finally {
1086 if (leader == thisThread)
1087 leader = null;
1088 }
1089 }
1090 }
1091 }
1092 } finally {
1093 if (leader == null && queue[0] != null)
1094 available.signal();
1095 lock.unlock();
1096 }
1097 }
1098
1099 public void clear() {
1100 final ReentrantLock lock = this.lock;
1101 lock.lock();
1102 try {
1103 for (int i = 0; i < size; i++) {
1104 RunnableScheduledFuture t = queue[i];
1105 if (t != null) {
1106 queue[i] = null;
1107 setIndex(t, -1);
1108 }
1109 }
1110 size = 0;
1111 } finally {
1112 lock.unlock();
1113 }
1114 }
1115
1116 /**
1117 * Return and remove first element only if it is expired.
1118 * Used only by drainTo. Call only when holding lock.
1119 */
1120 private RunnableScheduledFuture pollExpired() {
1121 RunnableScheduledFuture first = queue[0];
1122 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
1123 return null;
1124 return finishPoll(first);
1125 }
1126
1127 public int drainTo(Collection<? super Runnable> c) {
1128 if (c == null)
1129 throw new NullPointerException();
1130 if (c == this)
1131 throw new IllegalArgumentException();
1132 final ReentrantLock lock = this.lock;
1133 lock.lock();
1134 try {
1135 RunnableScheduledFuture first;
1136 int n = 0;
1137 while ((first = pollExpired()) != null) {
1138 c.add(first);
1139 ++n;
1140 }
1141 return n;
1142 } finally {
1143 lock.unlock();
1144 }
1145 }
1146
1147 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1148 if (c == null)
1149 throw new NullPointerException();
1150 if (c == this)
1151 throw new IllegalArgumentException();
1152 if (maxElements <= 0)
1153 return 0;
1154 final ReentrantLock lock = this.lock;
1155 lock.lock();
1156 try {
1157 RunnableScheduledFuture first;
1158 int n = 0;
1159 while (n < maxElements && (first = pollExpired()) != null) {
1160 c.add(first);
1161 ++n;
1162 }
1163 return n;
1164 } finally {
1165 lock.unlock();
1166 }
1167 }
1168
1169 public Object[] toArray() {
1170 final ReentrantLock lock = this.lock;
1171 lock.lock();
1172 try {
1173 return Arrays.copyOf(queue, size, Object[].class);
1174 } finally {
1175 lock.unlock();
1176 }
1177 }
1178
1179 @SuppressWarnings("unchecked")
1180 public <T> T[] toArray(T[] a) {
1181 final ReentrantLock lock = this.lock;
1182 lock.lock();
1183 try {
1184 if (a.length < size)
1185 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1186 System.arraycopy(queue, 0, a, 0, size);
1187 if (a.length > size)
1188 a[size] = null;
1189 return a;
1190 } finally {
1191 lock.unlock();
1192 }
1193 }
1194
1195 public Iterator<Runnable> iterator() {
1196 return new Itr(Arrays.copyOf(queue, size));
1197 }
1198
1199 /**
1200 * Snapshot iterator that works off copy of underlying q array.
1201 */
1202 private class Itr implements Iterator<Runnable> {
1203 final RunnableScheduledFuture[] array;
1204 int cursor = 0; // index of next element to return
1205 int lastRet = -1; // index of last element, or -1 if no such
1206
1207 Itr(RunnableScheduledFuture[] array) {
1208 this.array = array;
1209 }
1210
1211 public boolean hasNext() {
1212 return cursor < array.length;
1213 }
1214
1215 public Runnable next() {
1216 if (cursor >= array.length)
1217 throw new NoSuchElementException();
1218 lastRet = cursor;
1219 return array[cursor++];
1220 }
1221
1222 public void remove() {
1223 if (lastRet < 0)
1224 throw new IllegalStateException();
1225 DelayedWorkQueue.this.remove(array[lastRet]);
1226 lastRet = -1;
1227 }
1228 }
1229 }
1230 }