26 |
|
* of the inherited tuning methods are not useful for it. In |
27 |
|
* particular, because it acts as a fixed-sized pool using |
28 |
|
* <tt>corePoolSize</tt> threads and an unbounded queue, adjustments |
29 |
< |
* to <tt>maximumPoolSize</tt> have no useful effect. |
29 |
> |
* to <tt>maximumPoolSize</tt> have no useful effect. Additionally, it |
30 |
> |
* is almost never a good idea to set <tt>corePoolSize</tt> to zero or |
31 |
> |
* use <tt>allowCoreThreadTimeOut</tt> because this may leave the pool |
32 |
> |
* without threads to handle tasks once they become eligible to run. |
33 |
|
* |
34 |
|
* <p><b>Extension notes:</b> This class overrides {@link |
35 |
|
* AbstractExecutorService} <tt>submit</tt> methods to generate |
71 |
|
extends ThreadPoolExecutor |
72 |
|
implements ScheduledExecutorService { |
73 |
|
|
74 |
+ |
/* |
75 |
+ |
* This class specializes ThreadPoolExecutor implementation by |
76 |
+ |
* |
77 |
+ |
* 1. Using a custom task type, ScheduledFutureTask for |
78 |
+ |
* tasks, even those that don't require scheduling (i.e., |
79 |
+ |
* those submitted using ExecutorService execute, not |
80 |
+ |
* ScheduledExecutorService methods) which are treated as |
81 |
+ |
* delayed tasks with a delay of zero. |
82 |
+ |
* |
83 |
+ |
* 2. Using a custom queue (DelayedWorkQueue) based on an |
84 |
+ |
* unbounded DelayQueue. The lack of capacity constraint and |
85 |
+ |
* the fact that corePoolSize and maximumPoolSize are |
86 |
+ |
* effectively identical simplifies some execution mechanics |
87 |
+ |
* (see delayedExecute) compared to ThreadPoolExecutor |
88 |
+ |
* version. |
89 |
+ |
* |
90 |
+ |
* The DelayedWorkQueue class is defined below for the sake of |
91 |
+ |
* ensuring that all elements are instances of |
92 |
+ |
* RunnableScheduledFuture. Since DelayQueue otherwise |
93 |
+ |
* requires type be Delayed, but not necessarily Runnable, and |
94 |
+ |
* the workQueue requires the opposite, we need to explicitly |
95 |
+ |
* define a class that requires both to ensure that users don't |
96 |
+ |
* add objects that aren't RunnableScheduledFutures via |
97 |
+ |
* getQueue().add() etc. |
98 |
+ |
* |
99 |
+ |
* 3. Supporting optional run-after-shutdown parameters, which |
100 |
+ |
* leads to overrides of shutdown methods to remove and cancel |
101 |
+ |
* tasks that should NOT be run after shutdown, as well as |
102 |
+ |
* different recheck logic when task (re)submission overlaps |
103 |
+ |
* with a shutdown. |
104 |
+ |
* |
105 |
+ |
* 4. Task decoration methods to allow interception and |
106 |
+ |
* instrumentation, which are needed because subclasses cannot |
107 |
+ |
* otherwise override submit methods to get this effect. These |
108 |
+ |
* don't have any impact on pool control logic though. |
109 |
+ |
*/ |
110 |
+ |
|
111 |
|
/** |
112 |
|
* False if should cancel/suppress periodic tasks on shutdown. |
113 |
|
*/ |
214 |
|
} |
215 |
|
|
216 |
|
/** |
217 |
< |
* Runs a periodic task. |
217 |
> |
* Sets the next time to run for a periodic task |
218 |
|
*/ |
219 |
< |
private void runPeriodic() { |
220 |
< |
boolean ok = ScheduledFutureTask.super.runAndReset(); |
221 |
< |
boolean down = isShutdown(); |
222 |
< |
// Reschedule if not cancelled and not shutdown or policy allows |
223 |
< |
if (ok && (!down || |
224 |
< |
(getContinueExistingPeriodicTasksAfterShutdownPolicy() && |
185 |
< |
!isTerminating()))) { |
186 |
< |
long p = period; |
187 |
< |
if (p > 0) |
188 |
< |
time += p; |
189 |
< |
else |
190 |
< |
time = now() - p; |
191 |
< |
ScheduledThreadPoolExecutor.super.getQueue().add(this); |
192 |
< |
} |
193 |
< |
// This might have been the final executed delayed |
194 |
< |
// task. Wake up threads to check. |
195 |
< |
else if (down) |
196 |
< |
interruptIdleWorkers(); |
219 |
> |
private void setNextRunTime() { |
220 |
> |
long p = period; |
221 |
> |
if (p > 0) |
222 |
> |
time += p; |
223 |
> |
else |
224 |
> |
time = now() - p; |
225 |
|
} |
226 |
|
|
227 |
|
/** |
228 |
|
* Overrides FutureTask version so as to reset/requeue if periodic. |
229 |
|
*/ |
230 |
|
public void run() { |
231 |
< |
if (isPeriodic()) |
232 |
< |
runPeriodic(); |
233 |
< |
else |
231 |
> |
boolean periodic = isPeriodic(); |
232 |
> |
if (!canRunInCurrentRunState(periodic)) |
233 |
> |
cancel(false); |
234 |
> |
else if (!periodic) |
235 |
|
ScheduledFutureTask.super.run(); |
236 |
+ |
else if (ScheduledFutureTask.super.runAndReset()) { |
237 |
+ |
setNextRunTime(); |
238 |
+ |
reExecutePeriodic(this); |
239 |
+ |
} |
240 |
|
} |
241 |
|
} |
242 |
|
|
243 |
|
/** |
244 |
< |
* Specialized variant of ThreadPoolExecutor.execute for delayed tasks. |
245 |
< |
*/ |
246 |
< |
private void delayedExecute(Runnable command) { |
247 |
< |
if (isShutdown()) { |
248 |
< |
reject(command); |
249 |
< |
return; |
250 |
< |
} |
251 |
< |
// Prestart a thread if necessary. We cannot prestart it |
252 |
< |
// running the task because the task (probably) shouldn't be |
253 |
< |
// run yet, so thread will just idle until delay elapses. |
254 |
< |
if (getPoolSize() < getCorePoolSize()) |
244 |
> |
* Returns true if can run a task given current run state |
245 |
> |
* and run-after-shutdown parameters |
246 |
> |
* @param periodic true if this task periodic, false if delayed |
247 |
> |
*/ |
248 |
> |
boolean canRunInCurrentRunState(boolean periodic) { |
249 |
> |
return isRunningOrShutdown(periodic? |
250 |
> |
continueExistingPeriodicTasksAfterShutdown : |
251 |
> |
executeExistingDelayedTasksAfterShutdown); |
252 |
> |
} |
253 |
> |
|
254 |
> |
/** |
255 |
> |
* Main execution method for delayed or periodic tasks. If pool |
256 |
> |
* is shut down, rejects the task. Otherwise adds task to queue |
257 |
> |
* and starts a thread, if necessary, to run it. (We cannot |
258 |
> |
* prestart the thread to run the task because the task (probably) |
259 |
> |
* shouldn't be run yet,) If the pool is shut down while the task |
260 |
> |
* is being added, cancel and remove it if required by state and |
261 |
> |
* run-after-shutdown parameters |
262 |
> |
* @param task the task |
263 |
> |
*/ |
264 |
> |
private void delayedExecute(RunnableScheduledFuture<?> task) { |
265 |
> |
if (isShutdown()) |
266 |
> |
reject(task); |
267 |
> |
else { |
268 |
> |
super.getQueue().add(task); |
269 |
> |
if (isShutdown() && |
270 |
> |
!canRunInCurrentRunState(task.isPeriodic()) && |
271 |
> |
remove(task)) |
272 |
> |
task.cancel(false); |
273 |
|
prestartCoreThread(); |
274 |
+ |
} |
275 |
+ |
} |
276 |
|
|
277 |
< |
super.getQueue().add(command); |
277 |
> |
/** |
278 |
> |
* Requeues a periodic task unless current run state precludes |
279 |
> |
* it. Same idea as delayedExecute except drops task rather than |
280 |
> |
* rejecting. |
281 |
> |
* @param task the task |
282 |
> |
*/ |
283 |
> |
void reExecutePeriodic(RunnableScheduledFuture<?> task) { |
284 |
> |
if (canRunInCurrentRunState(true)) { |
285 |
> |
super.getQueue().add(task); |
286 |
> |
if (!canRunInCurrentRunState(true) && remove(task)) |
287 |
> |
task.cancel(false); |
288 |
> |
prestartCoreThread(); |
289 |
> |
} |
290 |
|
} |
291 |
|
|
292 |
|
/** |
293 |
|
* Cancels and clears the queue of all tasks that should not be run |
294 |
< |
* due to shutdown policy. |
294 |
> |
* due to shutdown policy. Invoked within super.shutdown. |
295 |
|
*/ |
296 |
< |
private void cancelUnwantedTasks() { |
297 |
< |
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); |
298 |
< |
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); |
296 |
> |
@Override void onShutdown() { |
297 |
> |
BlockingQueue<Runnable> q = super.getQueue(); |
298 |
> |
boolean keepDelayed = |
299 |
> |
getExecuteExistingDelayedTasksAfterShutdownPolicy(); |
300 |
> |
boolean keepPeriodic = |
301 |
> |
getContinueExistingPeriodicTasksAfterShutdownPolicy(); |
302 |
|
if (!keepDelayed && !keepPeriodic) |
303 |
< |
super.getQueue().clear(); |
304 |
< |
else if (keepDelayed || keepPeriodic) { |
305 |
< |
Object[] entries = super.getQueue().toArray(); |
303 |
> |
q.clear(); |
304 |
> |
else { |
305 |
> |
// Traverse snapshot to avoid iterator exceptions |
306 |
> |
Object[] entries = q.toArray(); |
307 |
|
for (int i = 0; i < entries.length; ++i) { |
308 |
|
Object e = entries[i]; |
309 |
|
if (e instanceof RunnableScheduledFuture) { |
310 |
< |
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; |
311 |
< |
if (t.isPeriodic()? !keepPeriodic : !keepDelayed) |
312 |
< |
t.cancel(false); |
310 |
> |
RunnableScheduledFuture<?> t = |
311 |
> |
(RunnableScheduledFuture<?>)e; |
312 |
> |
if ((t.isPeriodic()? !keepPeriodic : !keepDelayed) || |
313 |
> |
t.isCancelled()) { // also remove if already cancelled |
314 |
> |
if (q.remove(t)) |
315 |
> |
t.cancel(false); |
316 |
> |
} |
317 |
|
} |
318 |
|
} |
246 |
– |
entries = null; |
247 |
– |
purge(); |
319 |
|
} |
320 |
|
} |
321 |
|
|
251 |
– |
public boolean remove(Runnable task) { |
252 |
– |
if (!(task instanceof RunnableScheduledFuture)) |
253 |
– |
return false; |
254 |
– |
return getQueue().remove(task); |
255 |
– |
} |
256 |
– |
|
322 |
|
/** |
323 |
|
* Modifies or replaces the task used to execute a runnable. |
324 |
|
* This method can be used to override the concrete |
356 |
|
* pool size. |
357 |
|
* |
358 |
|
* @param corePoolSize the number of threads to keep in the pool, |
359 |
< |
* even if they are idle |
359 |
> |
* even if they are idle, unless allowCoreThreadTimeOut is set |
360 |
|
* @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> |
361 |
|
*/ |
362 |
|
public ScheduledThreadPoolExecutor(int corePoolSize) { |
369 |
|
* initial parameters. |
370 |
|
* |
371 |
|
* @param corePoolSize the number of threads to keep in the pool, |
372 |
< |
* even if they are idle |
372 |
> |
* even if they are idle, unless allowCoreThreadTimeOut is set |
373 |
|
* @param threadFactory the factory to use when the executor |
374 |
|
* creates a new thread |
375 |
|
* @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> |
386 |
|
* initial parameters. |
387 |
|
* |
388 |
|
* @param corePoolSize the number of threads to keep in the pool, |
389 |
< |
* even if they are idle |
389 |
> |
* even if they are idle, unless allowCoreThreadTimeOut is set |
390 |
|
* @param handler the handler to use when execution is blocked |
391 |
|
* because the thread bounds and queue capacities are reached |
392 |
|
* @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> |
403 |
|
* initial parameters. |
404 |
|
* |
405 |
|
* @param corePoolSize the number of threads to keep in the pool, |
406 |
< |
* even if they are idle |
406 |
> |
* even if they are idle, unless allowCoreThreadTimeOut is set |
407 |
|
* @param threadFactory the factory to use when the executor |
408 |
|
* creates a new thread |
409 |
|
* @param handler the handler to use when execution is blocked |
418 |
|
new DelayedWorkQueue(), threadFactory, handler); |
419 |
|
} |
420 |
|
|
421 |
+ |
/** |
422 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
423 |
+ |
* @throws NullPointerException {@inheritDoc} |
424 |
+ |
*/ |
425 |
|
public ScheduledFuture<?> schedule(Runnable command, |
426 |
|
long delay, |
427 |
|
TimeUnit unit) { |
428 |
|
if (command == null || unit == null) |
429 |
|
throw new NullPointerException(); |
430 |
+ |
if (delay < 0) delay = 0; |
431 |
|
long triggerTime = now() + unit.toNanos(delay); |
432 |
|
RunnableScheduledFuture<?> t = decorateTask(command, |
433 |
|
new ScheduledFutureTask<Boolean>(command, null, triggerTime)); |
435 |
|
return t; |
436 |
|
} |
437 |
|
|
438 |
+ |
/** |
439 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
440 |
+ |
* @throws NullPointerException {@inheritDoc} |
441 |
+ |
*/ |
442 |
|
public <V> ScheduledFuture<V> schedule(Callable<V> callable, |
443 |
|
long delay, |
444 |
|
TimeUnit unit) { |
452 |
|
return t; |
453 |
|
} |
454 |
|
|
455 |
+ |
/** |
456 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
457 |
+ |
* @throws NullPointerException {@inheritDoc} |
458 |
+ |
* @throws IllegalArgumentException {@inheritDoc} |
459 |
+ |
*/ |
460 |
|
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, |
461 |
|
long initialDelay, |
462 |
|
long period, |
476 |
|
return t; |
477 |
|
} |
478 |
|
|
479 |
+ |
/** |
480 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
481 |
+ |
* @throws NullPointerException {@inheritDoc} |
482 |
+ |
* @throws IllegalArgumentException {@inheritDoc} |
483 |
+ |
*/ |
484 |
|
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, |
485 |
|
long initialDelay, |
486 |
|
long delay, |
500 |
|
return t; |
501 |
|
} |
502 |
|
|
419 |
– |
|
503 |
|
/** |
504 |
|
* Executes command with zero required delay. This has effect |
505 |
|
* equivalent to <tt>schedule(command, 0, anyUnit)</tt>. Note |
521 |
|
|
522 |
|
// Override AbstractExecutorService methods |
523 |
|
|
524 |
+ |
/** |
525 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
526 |
+ |
* @throws NullPointerException {@inheritDoc} |
527 |
+ |
*/ |
528 |
|
public Future<?> submit(Runnable task) { |
529 |
|
return schedule(task, 0, TimeUnit.NANOSECONDS); |
530 |
|
} |
531 |
|
|
532 |
+ |
/** |
533 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
534 |
+ |
* @throws NullPointerException {@inheritDoc} |
535 |
+ |
*/ |
536 |
|
public <T> Future<T> submit(Runnable task, T result) { |
537 |
|
return schedule(Executors.callable(task, result), |
538 |
|
0, TimeUnit.NANOSECONDS); |
539 |
|
} |
540 |
|
|
541 |
+ |
/** |
542 |
+ |
* @throws RejectedExecutionException {@inheritDoc} |
543 |
+ |
* @throws NullPointerException {@inheritDoc} |
544 |
+ |
*/ |
545 |
|
public <T> Future<T> submit(Callable<T> task) { |
546 |
|
return schedule(task, 0, TimeUnit.NANOSECONDS); |
547 |
|
} |
548 |
|
|
549 |
|
/** |
550 |
< |
* Sets the policy on whether to continue executing existing periodic |
551 |
< |
* tasks even when this executor has been <tt>shutdown</tt>. In |
552 |
< |
* this case, these tasks will only terminate upon |
553 |
< |
* <tt>shutdownNow</tt>, or after setting the policy to |
554 |
< |
* <tt>false</tt> when already shutdown. This value is by default |
555 |
< |
* false. |
550 |
> |
* Sets the policy on whether to continue executing existing |
551 |
> |
* periodic tasks even when this executor has been |
552 |
> |
* <tt>shutdown</tt>. In this case, these tasks will only |
553 |
> |
* terminate upon <tt>shutdownNow</tt>, or after setting the |
554 |
> |
* policy to <tt>false</tt> when already shutdown. This value is |
555 |
> |
* by default false. |
556 |
|
* |
557 |
|
* @param value if true, continue after shutdown, else don't. |
558 |
|
* @see #getContinueExistingPeriodicTasksAfterShutdownPolicy |
559 |
|
*/ |
560 |
|
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { |
561 |
|
continueExistingPeriodicTasksAfterShutdown = value; |
562 |
< |
if (!value && isShutdown()) |
563 |
< |
cancelUnwantedTasks(); |
562 |
> |
if (!value && isShutdown()) { |
563 |
> |
onShutdown(); |
564 |
> |
tryTerminate(); |
565 |
> |
} |
566 |
|
} |
567 |
|
|
568 |
|
/** |
593 |
|
*/ |
594 |
|
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { |
595 |
|
executeExistingDelayedTasksAfterShutdown = value; |
596 |
< |
if (!value && isShutdown()) |
597 |
< |
cancelUnwantedTasks(); |
596 |
> |
if (!value && isShutdown()) { |
597 |
> |
onShutdown(); |
598 |
> |
tryTerminate(); |
599 |
> |
} |
600 |
|
} |
601 |
|
|
602 |
|
/** |
614 |
|
return executeExistingDelayedTasksAfterShutdown; |
615 |
|
} |
616 |
|
|
518 |
– |
|
617 |
|
/** |
618 |
|
* Initiates an orderly shutdown in which previously submitted |
619 |
|
* tasks are executed, but no new tasks will be accepted. If the |
625 |
|
* tasks will be cancelled. |
626 |
|
*/ |
627 |
|
public void shutdown() { |
530 |
– |
cancelUnwantedTasks(); |
628 |
|
super.shutdown(); |
629 |
|
} |
630 |
|
|