ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ScheduledThreadPoolExecutor.java
Revision: 1.39
Committed: Tue Jan 30 03:43:07 2007 UTC (17 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.38: +125 -119 lines
Log Message:
TPE/STPE review rework

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.*;
10
11 /**
12 * A {@link ThreadPoolExecutor} that can additionally schedule
13 * commands to run after a given delay, or to execute
14 * periodically. This class is preferable to {@link java.util.Timer}
15 * when multiple worker threads are needed, or when the additional
16 * flexibility or capabilities of {@link ThreadPoolExecutor} (which
17 * this class extends) are required.
18 *
19 * <p> Delayed tasks execute no sooner than they are enabled, but
20 * without any real-time guarantees about when, after they are
21 * enabled, they will commence. Tasks scheduled for exactly the same
22 * execution time are enabled in first-in-first-out (FIFO) order of
23 * submission.
24 *
25 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
26 * of the inherited tuning methods are not useful for it. In
27 * particular, because it acts as a fixed-sized pool using
28 * {@code corePoolSize} threads and an unbounded queue, adjustments
29 * to {@code maximumPoolSize} have no useful effect. Additionally, it
30 * is almost never a good idea to set {@code corePoolSize} to zero or
31 * use {@code allowCoreThreadTimeOut} because this may leave the pool
32 * without threads to handle tasks once they become eligible to run.
33 *
34 * <p><b>Extension notes:</b> This class overrides the
35 * {@link ThreadPoolExecutor#execute execute} and
36 * {@link AbstractExecutorService#submit(Runnable) submit}
37 * methods to generate internal {@link ScheduledFuture} objects to
38 * control per-task delays and scheduling. To preserve
39 * functionality, any further overrides of these methods in
40 * subclasses must invoke superclass versions, which effectively
41 * disables additional task customization. However, this class
42 * provides alternative protected extension method
43 * {@code decorateTask} (one version each for {@code Runnable} and
44 * {@code Callable}) that can be used to customize the concrete task
45 * types used to execute commands entered via {@code execute},
46 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
47 * and {@code scheduleWithFixedDelay}. By default, a
48 * {@code ScheduledThreadPoolExecutor} uses a task type extending
49 * {@link FutureTask}. However, this may be modified or replaced using
50 * subclasses of the form:
51 *
52 * <pre> {@code
53 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
54 *
55 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
56 *
57 * protected <V> RunnableScheduledFuture<V> decorateTask(
58 * Runnable r, RunnableScheduledFuture<V> task) {
59 * return new CustomTask<V>(r, task);
60 * }
61 *
62 * protected <V> RunnableScheduledFuture<V> decorateTask(
63 * Callable<V> c, RunnableScheduledFuture<V> task) {
64 * return new CustomTask<V>(c, task);
65 * }
66 * // ... add constructors, etc.
67 * }}</pre>
68 *
69 * @since 1.5
70 * @author Doug Lea
71 */
72 public class ScheduledThreadPoolExecutor
73 extends ThreadPoolExecutor
74 implements ScheduledExecutorService {
75
76 /*
77 * This class specializes ThreadPoolExecutor implementation by
78 *
79 * 1. Using a custom task type, ScheduledFutureTask for
80 * tasks, even those that don't require scheduling (i.e.,
81 * those submitted using ExecutorService execute, not
82 * ScheduledExecutorService methods) which are treated as
83 * delayed tasks with a delay of zero.
84 *
85 * 2. Using a custom queue (DelayedWorkQueue) based on an
86 * unbounded DelayQueue. The lack of capacity constraint and
87 * the fact that corePoolSize and maximumPoolSize are
88 * effectively identical simplifies some execution mechanics
89 * (see delayedExecute) compared to ThreadPoolExecutor
90 * version.
91 *
92 * The DelayedWorkQueue class is defined below for the sake of
93 * ensuring that all elements are instances of
94 * RunnableScheduledFuture. Since DelayQueue otherwise
95 * requires type be Delayed, but not necessarily Runnable, and
96 * the workQueue requires the opposite, we need to explicitly
97 * define a class that requires both to ensure that users don't
98 * add objects that aren't RunnableScheduledFutures via
99 * getQueue().add() etc.
100 *
101 * 3. Supporting optional run-after-shutdown parameters, which
102 * leads to overrides of shutdown methods to remove and cancel
103 * tasks that should NOT be run after shutdown, as well as
104 * different recheck logic when task (re)submission overlaps
105 * with a shutdown.
106 *
107 * 4. Task decoration methods to allow interception and
108 * instrumentation, which are needed because subclasses cannot
109 * otherwise override submit methods to get this effect. These
110 * don't have any impact on pool control logic though.
111 */
112
113 /**
114 * False if should cancel/suppress periodic tasks on shutdown.
115 */
116 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
117
118 /**
119 * False if should cancel non-periodic tasks on shutdown.
120 */
121 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
122
123 /**
124 * Sequence number to break scheduling ties, and in turn to
125 * guarantee FIFO order among tied entries.
126 */
127 private static final AtomicLong sequencer = new AtomicLong(0);
128
129 /**
130 * Returns current nanosecond time.
131 */
132 final long now() {
133 return System.nanoTime();
134 }
135
136 private class ScheduledFutureTask<V>
137 extends FutureTask<V> implements RunnableScheduledFuture<V> {
138
139 /** Sequence number to break ties FIFO */
140 private final long sequenceNumber;
141 /** The time the task is enabled to execute in nanoTime units */
142 private long time;
143 /**
144 * Period in nanoseconds for repeating tasks. A positive
145 * value indicates fixed-rate execution. A negative value
146 * indicates fixed-delay execution. A value of 0 indicates a
147 * non-repeating task.
148 */
149 private final long period;
150
151 /**
152 * Creates a one-shot action with given nanoTime-based trigger time.
153 */
154 ScheduledFutureTask(Runnable r, V result, long ns) {
155 super(r, result);
156 this.time = ns;
157 this.period = 0;
158 this.sequenceNumber = sequencer.getAndIncrement();
159 }
160
161 /**
162 * Creates a periodic action with given nano time and period.
163 */
164 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
165 super(r, result);
166 this.time = ns;
167 this.period = period;
168 this.sequenceNumber = sequencer.getAndIncrement();
169 }
170
171 /**
172 * Creates a one-shot action with given nanoTime-based trigger.
173 */
174 ScheduledFutureTask(Callable<V> callable, long ns) {
175 super(callable);
176 this.time = ns;
177 this.period = 0;
178 this.sequenceNumber = sequencer.getAndIncrement();
179 }
180
181 public long getDelay(TimeUnit unit) {
182 long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
183 return d;
184 }
185
186 public int compareTo(Delayed other) {
187 if (other == this) // compare zero ONLY if same object
188 return 0;
189 if (other instanceof ScheduledFutureTask) {
190 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
191 long diff = time - x.time;
192 if (diff < 0)
193 return -1;
194 else if (diff > 0)
195 return 1;
196 else if (sequenceNumber < x.sequenceNumber)
197 return -1;
198 else
199 return 1;
200 }
201 long d = (getDelay(TimeUnit.NANOSECONDS) -
202 other.getDelay(TimeUnit.NANOSECONDS));
203 return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
204 }
205
206 /**
207 * Returns true if this is a periodic (not a one-shot) action.
208 *
209 * @return true if periodic
210 */
211 public boolean isPeriodic() {
212 return period != 0;
213 }
214
215 /**
216 * Sets the next time to run for a periodic task.
217 */
218 private void setNextRunTime() {
219 long p = period;
220 if (p > 0)
221 time += p;
222 else
223 time = now() - p;
224 }
225
226 /**
227 * Overrides FutureTask version so as to reset/requeue if periodic.
228 */
229 public void run() {
230 boolean periodic = isPeriodic();
231 if (!canRunInCurrentRunState(periodic))
232 cancel(false);
233 else if (!periodic)
234 ScheduledFutureTask.super.run();
235 else if (ScheduledFutureTask.super.runAndReset()) {
236 setNextRunTime();
237 reExecutePeriodic(this);
238 }
239 }
240 }
241
242 /**
243 * Returns true if can run a task given current run state
244 * and run-after-shutdown parameters.
245 *
246 * @param periodic true if this task periodic, false if delayed
247 */
248 boolean canRunInCurrentRunState(boolean periodic) {
249 return isRunningOrShutdown(periodic ?
250 continueExistingPeriodicTasksAfterShutdown :
251 executeExistingDelayedTasksAfterShutdown);
252 }
253
254 /**
255 * Main execution method for delayed or periodic tasks. If pool
256 * is shut down, rejects the task. Otherwise adds task to queue
257 * and starts a thread, if necessary, to run it. (We cannot
258 * prestart the thread to run the task because the task (probably)
259 * shouldn't be run yet,) If the pool is shut down while the task
260 * is being added, cancel and remove it if required by state and
261 * run-after-shutdown parameters.
262 *
263 * @param task the task
264 */
265 private void delayedExecute(RunnableScheduledFuture<?> task) {
266 if (isShutdown())
267 reject(task);
268 else {
269 super.getQueue().add(task);
270 if (isShutdown() &&
271 !canRunInCurrentRunState(task.isPeriodic()) &&
272 remove(task))
273 task.cancel(false);
274 else
275 prestartCoreThread();
276 }
277 }
278
279 /**
280 * Requeues a periodic task unless current run state precludes it.
281 * Same idea as delayedExecute except drops task rather than rejecting.
282 *
283 * @param task the task
284 */
285 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
286 if (canRunInCurrentRunState(true)) {
287 super.getQueue().add(task);
288 if (!canRunInCurrentRunState(true) && remove(task))
289 task.cancel(false);
290 else
291 prestartCoreThread();
292 }
293 }
294
295 /**
296 * Cancels and clears the queue of all tasks that should not be run
297 * due to shutdown policy. Invoked within super.shutdown.
298 */
299 @Override void onShutdown() {
300 BlockingQueue<Runnable> q = super.getQueue();
301 boolean keepDelayed =
302 getExecuteExistingDelayedTasksAfterShutdownPolicy();
303 boolean keepPeriodic =
304 getContinueExistingPeriodicTasksAfterShutdownPolicy();
305 if (!keepDelayed && !keepPeriodic)
306 q.clear();
307 else {
308 // Traverse snapshot to avoid iterator exceptions
309 for (Object e : q.toArray()) {
310 if (e instanceof RunnableScheduledFuture) {
311 RunnableScheduledFuture<?> t =
312 (RunnableScheduledFuture<?>)e;
313 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
314 t.isCancelled()) { // also remove if already cancelled
315 if (q.remove(t))
316 t.cancel(false);
317 }
318 }
319 }
320 }
321 tryTerminate();
322 }
323
324 /**
325 * Modifies or replaces the task used to execute a runnable.
326 * This method can be used to override the concrete
327 * class used for managing internal tasks.
328 * The default implementation simply returns the given task.
329 *
330 * @param runnable the submitted Runnable
331 * @param task the task created to execute the runnable
332 * @return a task that can execute the runnable
333 * @since 1.6
334 */
335 protected <V> RunnableScheduledFuture<V> decorateTask(
336 Runnable runnable, RunnableScheduledFuture<V> task) {
337 return task;
338 }
339
340 /**
341 * Modifies or replaces the task used to execute a callable.
342 * This method can be used to override the concrete
343 * class used for managing internal tasks.
344 * The default implementation simply returns the given task.
345 *
346 * @param callable the submitted Callable
347 * @param task the task created to execute the callable
348 * @return a task that can execute the callable
349 * @since 1.6
350 */
351 protected <V> RunnableScheduledFuture<V> decorateTask(
352 Callable<V> callable, RunnableScheduledFuture<V> task) {
353 return task;
354 }
355
356 /**
357 * Creates a new {@code ScheduledThreadPoolExecutor} with the
358 * given core pool size.
359 *
360 * @param corePoolSize the number of threads to keep in the pool, even
361 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
362 * @throws IllegalArgumentException if {@code corePoolSize < 0}
363 */
364 public ScheduledThreadPoolExecutor(int corePoolSize) {
365 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
366 new DelayedWorkQueue());
367 }
368
369 /**
370 * Creates a new {@code ScheduledThreadPoolExecutor} with the
371 * given initial parameters.
372 *
373 * @param corePoolSize the number of threads to keep in the pool, even
374 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
375 * @param threadFactory the factory to use when the executor
376 * creates a new thread
377 * @throws IllegalArgumentException if {@code corePoolSize < 0}
378 * @throws NullPointerException if {@code threadFactory} is null
379 */
380 public ScheduledThreadPoolExecutor(int corePoolSize,
381 ThreadFactory threadFactory) {
382 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
383 new DelayedWorkQueue(), threadFactory);
384 }
385
386 /**
387 * Creates a new ScheduledThreadPoolExecutor with the given
388 * initial parameters.
389 *
390 * @param corePoolSize the number of threads to keep in the pool, even
391 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
392 * @param handler the handler to use when execution is blocked
393 * because the thread bounds and queue capacities are reached
394 * @throws IllegalArgumentException if {@code corePoolSize < 0}
395 * @throws NullPointerException if {@code handler} is null
396 */
397 public ScheduledThreadPoolExecutor(int corePoolSize,
398 RejectedExecutionHandler handler) {
399 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
400 new DelayedWorkQueue(), handler);
401 }
402
403 /**
404 * Creates a new ScheduledThreadPoolExecutor with the given
405 * initial parameters.
406 *
407 * @param corePoolSize the number of threads to keep in the pool, even
408 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
409 * @param threadFactory the factory to use when the executor
410 * creates a new thread
411 * @param handler the handler to use when execution is blocked
412 * because the thread bounds and queue capacities are reached
413 * @throws IllegalArgumentException if {@code corePoolSize < 0}
414 * @throws NullPointerException if {@code threadFactory} or
415 * {@code handler} is null
416 */
417 public ScheduledThreadPoolExecutor(int corePoolSize,
418 ThreadFactory threadFactory,
419 RejectedExecutionHandler handler) {
420 super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
421 new DelayedWorkQueue(), threadFactory, handler);
422 }
423
424 /**
425 * @throws RejectedExecutionException {@inheritDoc}
426 * @throws NullPointerException {@inheritDoc}
427 */
428 public ScheduledFuture<?> schedule(Runnable command,
429 long delay,
430 TimeUnit unit) {
431 if (command == null || unit == null)
432 throw new NullPointerException();
433 if (delay < 0) delay = 0;
434 long triggerTime = now() + unit.toNanos(delay);
435 RunnableScheduledFuture<?> t = decorateTask(command,
436 new ScheduledFutureTask<Boolean>(command, null, triggerTime));
437 delayedExecute(t);
438 return t;
439 }
440
441 /**
442 * @throws RejectedExecutionException {@inheritDoc}
443 * @throws NullPointerException {@inheritDoc}
444 */
445 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
446 long delay,
447 TimeUnit unit) {
448 if (callable == null || unit == null)
449 throw new NullPointerException();
450 if (delay < 0) delay = 0;
451 long triggerTime = now() + unit.toNanos(delay);
452 RunnableScheduledFuture<V> t = decorateTask(callable,
453 new ScheduledFutureTask<V>(callable, triggerTime));
454 delayedExecute(t);
455 return t;
456 }
457
458 /**
459 * @throws RejectedExecutionException {@inheritDoc}
460 * @throws NullPointerException {@inheritDoc}
461 * @throws IllegalArgumentException {@inheritDoc}
462 */
463 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
464 long initialDelay,
465 long period,
466 TimeUnit unit) {
467 if (command == null || unit == null)
468 throw new NullPointerException();
469 if (period <= 0)
470 throw new IllegalArgumentException();
471 if (initialDelay < 0) initialDelay = 0;
472 long triggerTime = now() + unit.toNanos(initialDelay);
473 RunnableScheduledFuture<?> t = decorateTask(command,
474 new ScheduledFutureTask<Object>(command,
475 null,
476 triggerTime,
477 unit.toNanos(period)));
478 delayedExecute(t);
479 return t;
480 }
481
482 /**
483 * @throws RejectedExecutionException {@inheritDoc}
484 * @throws NullPointerException {@inheritDoc}
485 * @throws IllegalArgumentException {@inheritDoc}
486 */
487 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
488 long initialDelay,
489 long delay,
490 TimeUnit unit) {
491 if (command == null || unit == null)
492 throw new NullPointerException();
493 if (delay <= 0)
494 throw new IllegalArgumentException();
495 if (initialDelay < 0) initialDelay = 0;
496 long triggerTime = now() + unit.toNanos(initialDelay);
497 RunnableScheduledFuture<?> t = decorateTask(command,
498 new ScheduledFutureTask<Boolean>(command,
499 null,
500 triggerTime,
501 unit.toNanos(-delay)));
502 delayedExecute(t);
503 return t;
504 }
505
506 /**
507 * Executes {@code command} with zero required delay.
508 * This has effect equivalent to
509 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
510 * Note that inspections of the queue and of the list returned by
511 * {@code shutdownNow} will access the zero-delayed
512 * {@link ScheduledFuture}, not the {@code command} itself.
513 *
514 * <p>A consequence of the use of {@code ScheduledFuture} objects is
515 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
516 * called with a null second {@code Throwable} argument, even if the
517 * {@code command} terminated abruptly. Instead, the {@code Throwable}
518 * thrown by such a task can be obtained via {@link Future#get}.
519 *
520 * @throws RejectedExecutionException at discretion of
521 * {@code RejectedExecutionHandler}, if the task
522 * cannot be accepted for execution because the
523 * executor has been shut down
524 * @throws NullPointerException {@inheritDoc}
525 */
526 public void execute(Runnable command) {
527 schedule(command, 0, TimeUnit.NANOSECONDS);
528 }
529
530 // Override AbstractExecutorService methods
531
532 /**
533 * @throws RejectedExecutionException {@inheritDoc}
534 * @throws NullPointerException {@inheritDoc}
535 */
536 public Future<?> submit(Runnable task) {
537 return schedule(task, 0, TimeUnit.NANOSECONDS);
538 }
539
540 /**
541 * @throws RejectedExecutionException {@inheritDoc}
542 * @throws NullPointerException {@inheritDoc}
543 */
544 public <T> Future<T> submit(Runnable task, T result) {
545 return schedule(Executors.callable(task, result),
546 0, TimeUnit.NANOSECONDS);
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 */
553 public <T> Future<T> submit(Callable<T> task) {
554 return schedule(task, 0, TimeUnit.NANOSECONDS);
555 }
556
557 /**
558 * Sets the policy on whether to continue executing existing
559 * periodic tasks even when this executor has been {@code shutdown}.
560 * In this case, these tasks will only terminate upon
561 * {@code shutdownNow} or after setting the policy to
562 * {@code false} when already shutdown.
563 * This value is by default {@code false}.
564 *
565 * @param value if {@code true}, continue after shutdown, else don't.
566 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
567 */
568 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
569 continueExistingPeriodicTasksAfterShutdown = value;
570 if (!value && isShutdown())
571 onShutdown();
572 }
573
574 /**
575 * Gets the policy on whether to continue executing existing
576 * periodic tasks even when this executor has been {@code shutdown}.
577 * In this case, these tasks will only terminate upon
578 * {@code shutdownNow} or after setting the policy to
579 * {@code false} when already shutdown.
580 * This value is by default {@code false}.
581 *
582 * @return {@code true} if will continue after shutdown
583 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
584 */
585 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
586 return continueExistingPeriodicTasksAfterShutdown;
587 }
588
589 /**
590 * Sets the policy on whether to execute existing delayed
591 * tasks even when this executor has been {@code shutdown}.
592 * In this case, these tasks will only terminate upon
593 * {@code shutdownNow}, or after setting the policy to
594 * {@code false} when already shutdown.
595 * This value is by default {@code true}.
596 *
597 * @param value if {@code true}, execute after shutdown, else don't.
598 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
599 */
600 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
601 executeExistingDelayedTasksAfterShutdown = value;
602 if (!value && isShutdown())
603 onShutdown();
604 }
605
606 /**
607 * Gets the policy on whether to execute existing delayed
608 * tasks even when this executor has been {@code shutdown}.
609 * In this case, these tasks will only terminate upon
610 * {@code shutdownNow}, or after setting the policy to
611 * {@code false} when already shutdown.
612 * This value is by default {@code true}.
613 *
614 * @return {@code true} if will execute after shutdown
615 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
616 */
617 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
618 return executeExistingDelayedTasksAfterShutdown;
619 }
620
621 /**
622 * Initiates an orderly shutdown in which previously submitted
623 * tasks are executed, but no new tasks will be accepted. If the
624 * {@code ExecuteExistingDelayedTasksAfterShutdownPolicy} has
625 * been set {@code false}, existing delayed tasks whose delays
626 * have not yet elapsed are cancelled. And unless the
627 * {@code ContinueExistingPeriodicTasksAfterShutdownPolicy} has
628 * been set {@code true}, future executions of existing periodic
629 * tasks will be cancelled.
630 *
631 * @throws SecurityException {@inheritDoc}
632 */
633 public void shutdown() {
634 super.shutdown();
635 }
636
637 /**
638 * Attempts to stop all actively executing tasks, halts the
639 * processing of waiting tasks, and returns a list of the tasks
640 * that were awaiting execution.
641 *
642 * <p>There are no guarantees beyond best-effort attempts to stop
643 * processing actively executing tasks. This implementation
644 * cancels tasks via {@link Thread#interrupt}, so any task that
645 * fails to respond to interrupts may never terminate.
646 *
647 * @return list of tasks that never commenced execution.
648 * Each element of this list is a {@link ScheduledFuture},
649 * including those tasks submitted using {@code execute},
650 * which are for scheduling purposes used as the basis of a
651 * zero-delay {@code ScheduledFuture}.
652 * @throws SecurityException {@inheritDoc}
653 */
654 public List<Runnable> shutdownNow() {
655 return super.shutdownNow();
656 }
657
658 /**
659 * Returns the task queue used by this executor. Each element of
660 * this queue is a {@link ScheduledFuture}, including those
661 * tasks submitted using {@code execute} which are for scheduling
662 * purposes used as the basis of a zero-delay
663 * {@code ScheduledFuture}. Iteration over this queue is
664 * <em>not</em> guaranteed to traverse tasks in the order in
665 * which they will execute.
666 *
667 * @return the task queue
668 */
669 public BlockingQueue<Runnable> getQueue() {
670 return super.getQueue();
671 }
672
673 /**
674 * An annoying wrapper class to convince javac to use a
675 * DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable>
676 */
677 private static class DelayedWorkQueue
678 extends AbstractCollection<Runnable>
679 implements BlockingQueue<Runnable> {
680
681 private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
682 public Runnable poll() { return dq.poll(); }
683 public Runnable peek() { return dq.peek(); }
684 public Runnable take() throws InterruptedException { return dq.take(); }
685 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
686 return dq.poll(timeout, unit);
687 }
688
689 public boolean add(Runnable x) {
690 return dq.add((RunnableScheduledFuture)x);
691 }
692 public boolean offer(Runnable x) {
693 return dq.offer((RunnableScheduledFuture)x);
694 }
695 public void put(Runnable x) {
696 dq.put((RunnableScheduledFuture)x);
697 }
698 public boolean offer(Runnable x, long timeout, TimeUnit unit) {
699 return dq.offer((RunnableScheduledFuture)x, timeout, unit);
700 }
701
702 public Runnable remove() { return dq.remove(); }
703 public Runnable element() { return dq.element(); }
704 public void clear() { dq.clear(); }
705 public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
706 public int drainTo(Collection<? super Runnable> c, int maxElements) {
707 return dq.drainTo(c, maxElements);
708 }
709
710 public int remainingCapacity() { return dq.remainingCapacity(); }
711 public boolean remove(Object x) { return dq.remove(x); }
712 public boolean contains(Object x) { return dq.contains(x); }
713 public int size() { return dq.size(); }
714 public boolean isEmpty() { return dq.isEmpty(); }
715 public Object[] toArray() { return dq.toArray(); }
716 public <T> T[] toArray(T[] array) { return dq.toArray(array); }
717 public Iterator<Runnable> iterator() {
718 return new Iterator<Runnable>() {
719 private Iterator<RunnableScheduledFuture> it = dq.iterator();
720 public boolean hasNext() { return it.hasNext(); }
721 public Runnable next() { return it.next(); }
722 public void remove() { it.remove(); }
723 };
724 }
725 }
726 }