96 |
|
this.sequenceNumber = sequencer.getAndIncrement(); |
97 |
|
} |
98 |
|
|
99 |
– |
|
99 |
|
public long getDelay(TimeUnit unit) { |
100 |
|
long d = unit.convert(time - System.nanoTime(), |
101 |
|
TimeUnit.NANOSECONDS); |
135 |
|
} |
136 |
|
|
137 |
|
/** |
138 |
+ |
* RUn a periodic task |
139 |
+ |
*/ |
140 |
+ |
private void runPeriodic() { |
141 |
+ |
boolean ok = ScheduledFutureTask.super.runAndReset(); |
142 |
+ |
boolean down = isShutdown(); |
143 |
+ |
// Reschedule if not cancelled and not shutdown or policy allows |
144 |
+ |
if (ok && (!down || |
145 |
+ |
(getContinueExistingPeriodicTasksAfterShutdownPolicy() && |
146 |
+ |
!isTerminating()))) { |
147 |
+ |
time = period + (rateBased ? time : System.nanoTime()); |
148 |
+ |
ScheduledThreadPoolExecutor.super.getQueue().add(this); |
149 |
+ |
} |
150 |
+ |
// This might have been the final executed delayed |
151 |
+ |
// task. Wake up threads to check. |
152 |
+ |
else if (down) |
153 |
+ |
interruptIdleWorkers(); |
154 |
+ |
} |
155 |
+ |
|
156 |
+ |
/** |
157 |
|
* Overrides FutureTask version so as to reset/requeue if periodic. |
158 |
|
*/ |
159 |
|
public void run() { |
160 |
< |
if (!isPeriodic()) |
160 |
> |
if (isPeriodic()) |
161 |
> |
runPeriodic(); |
162 |
> |
else |
163 |
|
ScheduledFutureTask.super.run(); |
144 |
– |
else { |
145 |
– |
boolean ok = (ScheduledFutureTask.super.runAndReset()); |
146 |
– |
boolean down = isShutdown(); |
147 |
– |
if (ok && (!down || |
148 |
– |
(getContinueExistingPeriodicTasksAfterShutdownPolicy() && |
149 |
– |
!isTerminating()))) { |
150 |
– |
time = period + (rateBased ? time : System.nanoTime()); |
151 |
– |
ScheduledThreadPoolExecutor.super.getQueue().add(this); |
152 |
– |
} |
153 |
– |
// This might have been the final executed delayed |
154 |
– |
// task. Wake up threads to check. |
155 |
– |
else if (down) |
156 |
– |
interruptIdleWorkers(); |
157 |
– |
} |
164 |
|
} |
165 |
|
} |
166 |
|
|
167 |
|
/** |
168 |
< |
* An annoying wrapper class to convince generics compiler to |
169 |
< |
* use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable> |
170 |
< |
*/ |
171 |
< |
private static class DelayedWorkQueue |
172 |
< |
extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> { |
173 |
< |
|
168 |
< |
private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>(); |
169 |
< |
public Runnable poll() { return dq.poll(); } |
170 |
< |
public Runnable peek() { return dq.peek(); } |
171 |
< |
public Runnable take() throws InterruptedException { return dq.take(); } |
172 |
< |
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { |
173 |
< |
return dq.poll(timeout, unit); |
174 |
< |
} |
175 |
< |
|
176 |
< |
public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); } |
177 |
< |
public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); } |
178 |
< |
public void put(Runnable x) { |
179 |
< |
dq.put((ScheduledFutureTask)x); |
180 |
< |
} |
181 |
< |
public boolean offer(Runnable x, long timeout, TimeUnit unit) { |
182 |
< |
return dq.offer((ScheduledFutureTask)x, timeout, unit); |
183 |
< |
} |
184 |
< |
|
185 |
< |
public Runnable remove() { return dq.remove(); } |
186 |
< |
public Runnable element() { return dq.element(); } |
187 |
< |
public void clear() { dq.clear(); } |
188 |
< |
public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); } |
189 |
< |
public int drainTo(Collection<? super Runnable> c, int maxElements) { |
190 |
< |
return dq.drainTo(c, maxElements); |
168 |
> |
* Specialized variant of ThreadPoolExecutor.execute for delayed tasks. |
169 |
> |
*/ |
170 |
> |
private void delayedExecute(Runnable command) { |
171 |
> |
if (isShutdown()) { |
172 |
> |
reject(command); |
173 |
> |
return; |
174 |
|
} |
175 |
+ |
// Prestart a thread if necessary. We cannot prestart it |
176 |
+ |
// running the task because the task (probably) shouldn't be |
177 |
+ |
// run yet, so thread will just idle until delay elapses. |
178 |
+ |
if (getPoolSize() < getCorePoolSize()) |
179 |
+ |
prestartCoreThread(); |
180 |
+ |
|
181 |
+ |
super.getQueue().add(command); |
182 |
+ |
} |
183 |
|
|
184 |
< |
public int remainingCapacity() { return dq.remainingCapacity(); } |
185 |
< |
public boolean remove(Object x) { return dq.remove(x); } |
186 |
< |
public boolean contains(Object x) { return dq.contains(x); } |
187 |
< |
public int size() { return dq.size(); } |
188 |
< |
public boolean isEmpty() { return dq.isEmpty(); } |
189 |
< |
public Object[] toArray() { return dq.toArray(); } |
190 |
< |
public <T> T[] toArray(T[] array) { return dq.toArray(array); } |
191 |
< |
public Iterator<Runnable> iterator() { |
192 |
< |
return new Iterator<Runnable>() { |
193 |
< |
private Iterator<ScheduledFutureTask> it = dq.iterator(); |
194 |
< |
public boolean hasNext() { return it.hasNext(); } |
195 |
< |
public Runnable next() { return it.next(); } |
196 |
< |
public void remove() { it.remove(); } |
197 |
< |
}; |
184 |
> |
/** |
185 |
> |
* Cancel and clear the queue of all tasks that should not be run |
186 |
> |
* due to shutdown policy. |
187 |
> |
*/ |
188 |
> |
private void cancelUnwantedTasks() { |
189 |
> |
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); |
190 |
> |
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); |
191 |
> |
if (!keepDelayed && !keepPeriodic) |
192 |
> |
super.getQueue().clear(); |
193 |
> |
else if (keepDelayed || keepPeriodic) { |
194 |
> |
Object[] entries = super.getQueue().toArray(); |
195 |
> |
for (int i = 0; i < entries.length; ++i) { |
196 |
> |
Object e = entries[i]; |
197 |
> |
if (e instanceof ScheduledFutureTask) { |
198 |
> |
ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e; |
199 |
> |
if (t.isPeriodic()? !keepPeriodic : !keepDelayed) |
200 |
> |
t.cancel(false); |
201 |
> |
} |
202 |
> |
} |
203 |
> |
entries = null; |
204 |
> |
purge(); |
205 |
|
} |
206 |
|
} |
207 |
|
|
208 |
|
/** |
209 |
< |
* Creates a new ScheduledThreadPoolExecutor with the given core pool size. |
209 |
> |
* Creates a new ScheduledThreadPoolExecutor with the given core |
210 |
> |
* pool size. |
211 |
|
* |
212 |
|
* @param corePoolSize the number of threads to keep in the pool, |
213 |
|
* even if they are idle. |
220 |
|
} |
221 |
|
|
222 |
|
/** |
223 |
< |
* Creates a new ScheduledThreadPoolExecutor with the given initial parameters. |
223 |
> |
* Creates a new ScheduledThreadPoolExecutor with the given |
224 |
> |
* initial parameters. |
225 |
|
* |
226 |
|
* @param corePoolSize the number of threads to keep in the pool, |
227 |
|
* even if they are idle. |
236 |
|
} |
237 |
|
|
238 |
|
/** |
239 |
< |
* Creates a new ScheduledThreadPoolExecutor with the given initial parameters. |
239 |
> |
* Creates a new ScheduledThreadPoolExecutor with the given |
240 |
> |
* initial parameters. |
241 |
|
* |
242 |
|
* @param corePoolSize the number of threads to keep in the pool, |
243 |
|
* even if they are idle. |
252 |
|
} |
253 |
|
|
254 |
|
/** |
255 |
< |
* Creates a new ScheduledThreadPoolExecutor with the given initial parameters. |
255 |
> |
* Creates a new ScheduledThreadPoolExecutor with the given |
256 |
> |
* initial parameters. |
257 |
|
* |
258 |
|
* @param corePoolSize the number of threads to keep in the pool, |
259 |
|
* even if they are idle. |
270 |
|
new DelayedWorkQueue(), threadFactory, handler); |
271 |
|
} |
272 |
|
|
273 |
< |
/** |
274 |
< |
* Specialized variant of ThreadPoolExecutor.execute for delayed tasks. |
275 |
< |
*/ |
274 |
< |
private void delayedExecute(Runnable command) { |
275 |
< |
if (isShutdown()) { |
276 |
< |
reject(command); |
277 |
< |
return; |
278 |
< |
} |
279 |
< |
// Prestart a thread if necessary. We cannot prestart it |
280 |
< |
// running the task because the task (probably) shouldn't be |
281 |
< |
// run yet, so thread will just idle until delay elapses. |
282 |
< |
if (getPoolSize() < getCorePoolSize()) |
283 |
< |
prestartCoreThread(); |
284 |
< |
|
285 |
< |
super.getQueue().add(command); |
286 |
< |
} |
287 |
< |
|
288 |
< |
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { |
273 |
> |
public ScheduledFuture<?> schedule(Runnable command, |
274 |
> |
long delay, |
275 |
> |
TimeUnit unit) { |
276 |
|
if (command == null || unit == null) |
277 |
|
throw new NullPointerException(); |
278 |
|
long triggerTime = System.nanoTime() + unit.toNanos(delay); |
279 |
< |
ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(command, null, triggerTime); |
279 |
> |
ScheduledFutureTask<?> t = |
280 |
> |
new ScheduledFutureTask<Boolean>(command, null, triggerTime); |
281 |
|
delayedExecute(t); |
282 |
|
return t; |
283 |
|
} |
284 |
|
|
285 |
< |
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { |
285 |
> |
public <V> ScheduledFuture<V> schedule(Callable<V> callable, |
286 |
> |
long delay, |
287 |
> |
TimeUnit unit) { |
288 |
|
if (callable == null || unit == null) |
289 |
|
throw new NullPointerException(); |
290 |
|
long triggerTime = System.nanoTime() + unit.toNanos(delay); |
291 |
< |
ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable, triggerTime); |
291 |
> |
ScheduledFutureTask<V> t = |
292 |
> |
new ScheduledFutureTask<V>(callable, triggerTime); |
293 |
|
delayedExecute(t); |
294 |
|
return t; |
295 |
|
} |
296 |
|
|
297 |
< |
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { |
297 |
> |
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, |
298 |
> |
long initialDelay, |
299 |
> |
long period, |
300 |
> |
TimeUnit unit) { |
301 |
|
if (command == null || unit == null) |
302 |
|
throw new NullPointerException(); |
303 |
|
if (period <= 0) |
304 |
|
throw new IllegalArgumentException(); |
305 |
|
long triggerTime = System.nanoTime() + unit.toNanos(initialDelay); |
306 |
< |
ScheduledFutureTask<?> t = new ScheduledFutureTask<Object> |
307 |
< |
(command, null, |
308 |
< |
triggerTime, |
309 |
< |
unit.toNanos(period), |
310 |
< |
true); |
306 |
> |
ScheduledFutureTask<?> t = |
307 |
> |
new ScheduledFutureTask<Object>(command, |
308 |
> |
null, |
309 |
> |
triggerTime, |
310 |
> |
unit.toNanos(period), |
311 |
> |
true); |
312 |
|
delayedExecute(t); |
313 |
|
return t; |
314 |
|
} |
315 |
|
|
316 |
< |
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { |
316 |
> |
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, |
317 |
> |
long initialDelay, |
318 |
> |
long delay, |
319 |
> |
TimeUnit unit) { |
320 |
|
if (command == null || unit == null) |
321 |
|
throw new NullPointerException(); |
322 |
|
if (delay <= 0) |
323 |
|
throw new IllegalArgumentException(); |
324 |
|
long triggerTime = System.nanoTime() + unit.toNanos(initialDelay); |
325 |
< |
ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean> |
326 |
< |
(command, |
327 |
< |
null, |
328 |
< |
triggerTime, |
329 |
< |
unit.toNanos(delay), |
330 |
< |
false); |
325 |
> |
ScheduledFutureTask<?> t = |
326 |
> |
new ScheduledFutureTask<Boolean>(command, |
327 |
> |
null, |
328 |
> |
triggerTime, |
329 |
> |
unit.toNanos(delay), |
330 |
> |
false); |
331 |
|
delayedExecute(t); |
332 |
|
return t; |
333 |
|
} |
352 |
|
schedule(command, 0, TimeUnit.NANOSECONDS); |
353 |
|
} |
354 |
|
|
355 |
+ |
// Override AbstractExecutorService methods |
356 |
+ |
|
357 |
|
public Future<?> submit(Runnable task) { |
358 |
|
return schedule(task, 0, TimeUnit.NANOSECONDS); |
359 |
|
} |
360 |
|
|
361 |
|
public <T> Future<T> submit(Runnable task, T result) { |
362 |
< |
return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); |
362 |
> |
return schedule(Executors.callable(task, result), |
363 |
> |
0, TimeUnit.NANOSECONDS); |
364 |
|
} |
365 |
|
|
366 |
|
public <T> Future<T> submit(Callable<T> task) { |
423 |
|
return executeExistingDelayedTasksAfterShutdown; |
424 |
|
} |
425 |
|
|
425 |
– |
/** |
426 |
– |
* Cancel and clear the queue of all tasks that should not be run |
427 |
– |
* due to shutdown policy. |
428 |
– |
*/ |
429 |
– |
private void cancelUnwantedTasks() { |
430 |
– |
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); |
431 |
– |
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); |
432 |
– |
if (!keepDelayed && !keepPeriodic) |
433 |
– |
super.getQueue().clear(); |
434 |
– |
else if (keepDelayed || keepPeriodic) { |
435 |
– |
Object[] entries = super.getQueue().toArray(); |
436 |
– |
for (int i = 0; i < entries.length; ++i) { |
437 |
– |
Object e = entries[i]; |
438 |
– |
if (e instanceof ScheduledFutureTask) { |
439 |
– |
ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e; |
440 |
– |
if (t.isPeriodic()? !keepPeriodic : !keepDelayed) |
441 |
– |
t.cancel(false); |
442 |
– |
} |
443 |
– |
} |
444 |
– |
entries = null; |
445 |
– |
purge(); |
446 |
– |
} |
447 |
– |
} |
426 |
|
|
427 |
|
/** |
428 |
|
* Initiates an orderly shutdown in which previously submitted |
459 |
|
return super.shutdownNow(); |
460 |
|
} |
461 |
|
|
484 |
– |
|
462 |
|
/** |
463 |
|
* Returns the task queue used by this executor. Each element of |
464 |
|
* this queue is a {@link ScheduledFuture}, including those |
474 |
|
return super.getQueue(); |
475 |
|
} |
476 |
|
|
477 |
+ |
/** |
478 |
+ |
* An annoying wrapper class to convince generics compiler to |
479 |
+ |
* use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable> |
480 |
+ |
*/ |
481 |
+ |
private static class DelayedWorkQueue |
482 |
+ |
extends AbstractCollection<Runnable> |
483 |
+ |
implements BlockingQueue<Runnable> { |
484 |
+ |
|
485 |
+ |
private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>(); |
486 |
+ |
public Runnable poll() { return dq.poll(); } |
487 |
+ |
public Runnable peek() { return dq.peek(); } |
488 |
+ |
public Runnable take() throws InterruptedException { return dq.take(); } |
489 |
+ |
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { |
490 |
+ |
return dq.poll(timeout, unit); |
491 |
+ |
} |
492 |
+ |
|
493 |
+ |
public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); } |
494 |
+ |
public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); } |
495 |
+ |
public void put(Runnable x) { |
496 |
+ |
dq.put((ScheduledFutureTask)x); |
497 |
+ |
} |
498 |
+ |
public boolean offer(Runnable x, long timeout, TimeUnit unit) { |
499 |
+ |
return dq.offer((ScheduledFutureTask)x, timeout, unit); |
500 |
+ |
} |
501 |
+ |
|
502 |
+ |
public Runnable remove() { return dq.remove(); } |
503 |
+ |
public Runnable element() { return dq.element(); } |
504 |
+ |
public void clear() { dq.clear(); } |
505 |
+ |
public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); } |
506 |
+ |
public int drainTo(Collection<? super Runnable> c, int maxElements) { |
507 |
+ |
return dq.drainTo(c, maxElements); |
508 |
+ |
} |
509 |
+ |
|
510 |
+ |
public int remainingCapacity() { return dq.remainingCapacity(); } |
511 |
+ |
public boolean remove(Object x) { return dq.remove(x); } |
512 |
+ |
public boolean contains(Object x) { return dq.contains(x); } |
513 |
+ |
public int size() { return dq.size(); } |
514 |
+ |
public boolean isEmpty() { return dq.isEmpty(); } |
515 |
+ |
public Object[] toArray() { return dq.toArray(); } |
516 |
+ |
public <T> T[] toArray(T[] array) { return dq.toArray(array); } |
517 |
+ |
public Iterator<Runnable> iterator() { |
518 |
+ |
return new Iterator<Runnable>() { |
519 |
+ |
private Iterator<ScheduledFutureTask> it = dq.iterator(); |
520 |
+ |
public boolean hasNext() { return it.hasNext(); } |
521 |
+ |
public Runnable next() { return it.next(); } |
522 |
+ |
public void remove() { it.remove(); } |
523 |
+ |
}; |
524 |
+ |
} |
525 |
+ |
} |
526 |
|
} |