6 |
|
|
7 |
|
package java.util.concurrent; |
8 |
|
|
9 |
– |
import static java.util.concurrent.TimeUnit.NANOSECONDS; |
9 |
|
import static java.util.concurrent.TimeUnit.MILLISECONDS; |
10 |
+ |
import static java.util.concurrent.TimeUnit.NANOSECONDS; |
11 |
+ |
|
12 |
+ |
import java.util.AbstractQueue; |
13 |
+ |
import java.util.Arrays; |
14 |
+ |
import java.util.Collection; |
15 |
+ |
import java.util.Iterator; |
16 |
+ |
import java.util.List; |
17 |
+ |
import java.util.NoSuchElementException; |
18 |
|
import java.util.concurrent.atomic.AtomicLong; |
19 |
|
import java.util.concurrent.locks.Condition; |
20 |
|
import java.util.concurrent.locks.ReentrantLock; |
14 |
– |
import java.util.*; |
21 |
|
|
22 |
|
/** |
23 |
|
* A {@link ThreadPoolExecutor} that can additionally schedule |
42 |
|
* removed from the work queue at time of cancellation. |
43 |
|
* |
44 |
|
* <p>Successive executions of a periodic task scheduled via |
45 |
< |
* {@link #scheduleAtFixedRate} or |
46 |
< |
* {@link #scheduleWithFixedDelay} do not overlap. While different |
47 |
< |
* executions may be performed by different threads, the effects of |
48 |
< |
* prior executions <a |
49 |
< |
* href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
45 |
> |
* {@link #scheduleAtFixedRate scheduleAtFixedRate} or |
46 |
> |
* {@link #scheduleWithFixedDelay scheduleWithFixedDelay} |
47 |
> |
* do not overlap. While different executions may be performed by |
48 |
> |
* different threads, the effects of prior executions |
49 |
> |
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> |
50 |
|
* those of subsequent ones. |
51 |
|
* |
52 |
|
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few |
76 |
|
* {@link FutureTask}. However, this may be modified or replaced using |
77 |
|
* subclasses of the form: |
78 |
|
* |
79 |
< |
* <pre> {@code |
79 |
> |
* <pre> {@code |
80 |
|
* public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor { |
81 |
|
* |
82 |
|
* static class CustomTask<V> implements RunnableScheduledFuture<V> { ... } |
140 |
|
/** |
141 |
|
* True if ScheduledFutureTask.cancel should remove from queue. |
142 |
|
*/ |
143 |
< |
private volatile boolean removeOnCancel = false; |
143 |
> |
volatile boolean removeOnCancel; |
144 |
|
|
145 |
|
/** |
146 |
|
* Sequence number to break scheduling ties, and in turn to |
148 |
|
*/ |
149 |
|
private static final AtomicLong sequencer = new AtomicLong(); |
150 |
|
|
145 |
– |
/** |
146 |
– |
* Returns current nanosecond time. |
147 |
– |
*/ |
148 |
– |
final long now() { |
149 |
– |
return System.nanoTime(); |
150 |
– |
} |
151 |
– |
|
151 |
|
private class ScheduledFutureTask<V> |
152 |
|
extends FutureTask<V> implements RunnableScheduledFuture<V> { |
153 |
|
|
155 |
|
private final long sequenceNumber; |
156 |
|
|
157 |
|
/** The time the task is enabled to execute in nanoTime units */ |
158 |
< |
private long time; |
158 |
> |
private volatile long time; |
159 |
|
|
160 |
|
/** |
161 |
|
* Period in nanoseconds for repeating tasks. |
176 |
|
/** |
177 |
|
* Creates a one-shot action with given nanoTime-based trigger time. |
178 |
|
*/ |
179 |
< |
ScheduledFutureTask(Runnable r, V result, long triggerTime) { |
179 |
> |
ScheduledFutureTask(Runnable r, V result, long triggerTime, |
180 |
> |
long sequenceNumber) { |
181 |
|
super(r, result); |
182 |
|
this.time = triggerTime; |
183 |
|
this.period = 0; |
184 |
< |
this.sequenceNumber = sequencer.getAndIncrement(); |
184 |
> |
this.sequenceNumber = sequenceNumber; |
185 |
|
} |
186 |
|
|
187 |
|
/** |
189 |
|
* trigger time and period. |
190 |
|
*/ |
191 |
|
ScheduledFutureTask(Runnable r, V result, long triggerTime, |
192 |
< |
long period) { |
192 |
> |
long period, long sequenceNumber) { |
193 |
|
super(r, result); |
194 |
|
this.time = triggerTime; |
195 |
|
this.period = period; |
196 |
< |
this.sequenceNumber = sequencer.getAndIncrement(); |
196 |
> |
this.sequenceNumber = sequenceNumber; |
197 |
|
} |
198 |
|
|
199 |
|
/** |
200 |
|
* Creates a one-shot action with given nanoTime-based trigger time. |
201 |
|
*/ |
202 |
< |
ScheduledFutureTask(Callable<V> callable, long triggerTime) { |
202 |
> |
ScheduledFutureTask(Callable<V> callable, long triggerTime, |
203 |
> |
long sequenceNumber) { |
204 |
|
super(callable); |
205 |
|
this.time = triggerTime; |
206 |
|
this.period = 0; |
207 |
< |
this.sequenceNumber = sequencer.getAndIncrement(); |
207 |
> |
this.sequenceNumber = sequenceNumber; |
208 |
|
} |
209 |
|
|
210 |
|
public long getDelay(TimeUnit unit) { |
211 |
< |
return unit.convert(time - now(), NANOSECONDS); |
211 |
> |
return unit.convert(time - System.nanoTime(), NANOSECONDS); |
212 |
|
} |
213 |
|
|
214 |
|
public int compareTo(Delayed other) { |
265 |
|
if (!canRunInCurrentRunState(periodic)) |
266 |
|
cancel(false); |
267 |
|
else if (!periodic) |
268 |
< |
ScheduledFutureTask.super.run(); |
269 |
< |
else if (ScheduledFutureTask.super.runAndReset()) { |
268 |
> |
super.run(); |
269 |
> |
else if (super.runAndReset()) { |
270 |
|
setNextRunTime(); |
271 |
|
reExecutePeriodic(outerTask); |
272 |
|
} |
492 |
|
* Returns the nanoTime-based trigger time of a delayed action. |
493 |
|
*/ |
494 |
|
long triggerTime(long delay) { |
495 |
< |
return now() + |
495 |
> |
return System.nanoTime() + |
496 |
|
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); |
497 |
|
} |
498 |
|
|
524 |
|
throw new NullPointerException(); |
525 |
|
RunnableScheduledFuture<Void> t = decorateTask(command, |
526 |
|
new ScheduledFutureTask<Void>(command, null, |
527 |
< |
triggerTime(delay, unit))); |
527 |
> |
triggerTime(delay, unit), |
528 |
> |
sequencer.getAndIncrement())); |
529 |
|
delayedExecute(t); |
530 |
|
return t; |
531 |
|
} |
541 |
|
throw new NullPointerException(); |
542 |
|
RunnableScheduledFuture<V> t = decorateTask(callable, |
543 |
|
new ScheduledFutureTask<V>(callable, |
544 |
< |
triggerTime(delay, unit))); |
544 |
> |
triggerTime(delay, unit), |
545 |
> |
sequencer.getAndIncrement())); |
546 |
|
delayedExecute(t); |
547 |
|
return t; |
548 |
|
} |
558 |
|
TimeUnit unit) { |
559 |
|
if (command == null || unit == null) |
560 |
|
throw new NullPointerException(); |
561 |
< |
if (period <= 0) |
561 |
> |
if (period <= 0L) |
562 |
|
throw new IllegalArgumentException(); |
563 |
|
ScheduledFutureTask<Void> sft = |
564 |
|
new ScheduledFutureTask<Void>(command, |
565 |
|
null, |
566 |
|
triggerTime(initialDelay, unit), |
567 |
< |
unit.toNanos(period)); |
567 |
> |
unit.toNanos(period), |
568 |
> |
sequencer.getAndIncrement()); |
569 |
|
RunnableScheduledFuture<Void> t = decorateTask(command, sft); |
570 |
|
sft.outerTask = t; |
571 |
|
delayedExecute(t); |
583 |
|
TimeUnit unit) { |
584 |
|
if (command == null || unit == null) |
585 |
|
throw new NullPointerException(); |
586 |
< |
if (delay <= 0) |
586 |
> |
if (delay <= 0L) |
587 |
|
throw new IllegalArgumentException(); |
588 |
|
ScheduledFutureTask<Void> sft = |
589 |
|
new ScheduledFutureTask<Void>(command, |
590 |
|
null, |
591 |
|
triggerTime(initialDelay, unit), |
592 |
< |
unit.toNanos(-delay)); |
592 |
> |
-unit.toNanos(delay), |
593 |
> |
sequencer.getAndIncrement()); |
594 |
|
RunnableScheduledFuture<Void> t = decorateTask(command, sft); |
595 |
|
sft.outerTask = t; |
596 |
|
delayedExecute(t); |
763 |
|
/** |
764 |
|
* Attempts to stop all actively executing tasks, halts the |
765 |
|
* processing of waiting tasks, and returns a list of the tasks |
766 |
< |
* that were awaiting execution. |
766 |
> |
* that were awaiting execution. These tasks are drained (removed) |
767 |
> |
* from the task queue upon return from this method. |
768 |
|
* |
769 |
|
* <p>This method does not wait for actively executing tasks to |
770 |
|
* terminate. Use {@link #awaitTermination awaitTermination} to |
772 |
|
* |
773 |
|
* <p>There are no guarantees beyond best-effort attempts to stop |
774 |
|
* processing actively executing tasks. This implementation |
775 |
< |
* cancels tasks via {@link Thread#interrupt}, so any task that |
775 |
> |
* interrupts tasks via {@link Thread#interrupt}; any task that |
776 |
|
* fails to respond to interrupts may never terminate. |
777 |
|
* |
778 |
|
* @return list of tasks that never commenced execution. |
780 |
|
* For tasks submitted via one of the {@code schedule} |
781 |
|
* methods, the element will be identical to the returned |
782 |
|
* {@code ScheduledFuture}. For tasks submitted using |
783 |
< |
* {@link #execute}, the element will be a zero-delay {@code |
784 |
< |
* ScheduledFuture}. |
783 |
> |
* {@link #execute execute}, the element will be a |
784 |
> |
* zero-delay {@code ScheduledFuture}. |
785 |
|
* @throws SecurityException {@inheritDoc} |
786 |
|
*/ |
787 |
|
public List<Runnable> shutdownNow() { |
789 |
|
} |
790 |
|
|
791 |
|
/** |
792 |
< |
* Returns the task queue used by this executor. |
793 |
< |
* Each element of this list is a {@link ScheduledFuture}. |
792 |
> |
* Returns the task queue used by this executor. Access to the |
793 |
> |
* task queue is intended primarily for debugging and monitoring. |
794 |
> |
* This queue may be in active use. Retrieving the task queue |
795 |
> |
* does not prevent queued tasks from executing. |
796 |
> |
* |
797 |
> |
* <p>Each element of this queue is a {@link ScheduledFuture}. |
798 |
|
* For tasks submitted via one of the {@code schedule} methods, the |
799 |
|
* element will be identical to the returned {@code ScheduledFuture}. |
800 |
< |
* For tasks submitted using {@link #execute}, the element will be a |
801 |
< |
* zero-delay {@code ScheduledFuture}. |
800 |
> |
* For tasks submitted using {@link #execute execute}, the element |
801 |
> |
* will be a zero-delay {@code ScheduledFuture}. |
802 |
|
* |
803 |
|
* <p>Iteration over this queue is <em>not</em> guaranteed to traverse |
804 |
|
* tasks in the order in which they will execute. |
1070 |
|
lock.lock(); |
1071 |
|
try { |
1072 |
|
RunnableScheduledFuture<?> first = queue[0]; |
1073 |
< |
if (first == null || first.getDelay(NANOSECONDS) > 0) |
1074 |
< |
return null; |
1075 |
< |
else |
1066 |
< |
return finishPoll(first); |
1073 |
> |
return (first == null || first.getDelay(NANOSECONDS) > 0) |
1074 |
> |
? null |
1075 |
> |
: finishPoll(first); |
1076 |
|
} finally { |
1077 |
|
lock.unlock(); |
1078 |
|
} |
1088 |
|
available.await(); |
1089 |
|
else { |
1090 |
|
long delay = first.getDelay(NANOSECONDS); |
1091 |
< |
if (delay <= 0) |
1091 |
> |
if (delay <= 0L) |
1092 |
|
return finishPoll(first); |
1093 |
|
first = null; // don't retain ref while waiting |
1094 |
|
if (leader != null) |
1121 |
|
for (;;) { |
1122 |
|
RunnableScheduledFuture<?> first = queue[0]; |
1123 |
|
if (first == null) { |
1124 |
< |
if (nanos <= 0) |
1124 |
> |
if (nanos <= 0L) |
1125 |
|
return null; |
1126 |
|
else |
1127 |
|
nanos = available.awaitNanos(nanos); |
1128 |
|
} else { |
1129 |
|
long delay = first.getDelay(NANOSECONDS); |
1130 |
< |
if (delay <= 0) |
1130 |
> |
if (delay <= 0L) |
1131 |
|
return finishPoll(first); |
1132 |
< |
if (nanos <= 0) |
1132 |
> |
if (nanos <= 0L) |
1133 |
|
return null; |
1134 |
|
first = null; // don't retain ref while waiting |
1135 |
|
if (nanos < delay || leader != null) |
1261 |
|
*/ |
1262 |
|
private class Itr implements Iterator<Runnable> { |
1263 |
|
final RunnableScheduledFuture<?>[] array; |
1264 |
< |
int cursor = 0; // index of next element to return |
1265 |
< |
int lastRet = -1; // index of last element, or -1 if no such |
1264 |
> |
int cursor; // index of next element to return; initially 0 |
1265 |
> |
int lastRet = -1; // index of last element returned; -1 if no such |
1266 |
|
|
1267 |
|
Itr(RunnableScheduledFuture<?>[] array) { |
1268 |
|
this.array = array; |