17 |
|
* flexibility or capabilities of {@link ThreadPoolExecutor} (which |
18 |
|
* this class extends) are required. |
19 |
|
* |
20 |
< |
* <p> Delayed tasks execute no sooner than they are enabled, but |
20 |
> |
* <p>Delayed tasks execute no sooner than they are enabled, but |
21 |
|
* without any real-time guarantees about when, after they are |
22 |
|
* enabled, they will commence. Tasks scheduled for exactly the same |
23 |
|
* execution time are enabled in first-in-first-out (FIFO) order of |
24 |
< |
* submission. If {@link #setRemoveOnCancelPolicy} is set {@code true}, |
25 |
< |
* cancelled tasks are automatically removed from the work queue. |
24 |
> |
* submission. |
25 |
> |
* |
26 |
> |
* <p>When a submitted task is cancelled before it is run, execution |
27 |
> |
* is suppressed. By default, such a cancelled task is not |
28 |
> |
* automatically removed from the work queue until its delay |
29 |
> |
* elapses. While this enables further inspection and monitoring, it |
30 |
> |
* may also cause unbounded retention of cancelled tasks. To avoid |
31 |
> |
* this, set {@link #setRemoveOnCancelPolicy} to {@code true}, which |
32 |
> |
* causes tasks to be immediately removed from the work queue at |
33 |
> |
* time of cancellation. |
34 |
|
* |
35 |
|
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few |
36 |
|
* of the inherited tuning methods are not useful for it. In |
92 |
|
* ScheduledExecutorService methods) which are treated as |
93 |
|
* delayed tasks with a delay of zero. |
94 |
|
* |
95 |
< |
* 2. Using a custom queue (DelayedWorkQueue) based on an |
95 |
> |
* 2. Using a custom queue (DelayedWorkQueue), a variant of |
96 |
|
* unbounded DelayQueue. The lack of capacity constraint and |
97 |
|
* the fact that corePoolSize and maximumPoolSize are |
98 |
|
* effectively identical simplifies some execution mechanics |
99 |
< |
* (see delayedExecute) compared to ThreadPoolExecutor |
92 |
< |
* version. |
93 |
< |
* |
94 |
< |
* The DelayedWorkQueue class is defined below for the sake of |
95 |
< |
* ensuring that all elements are instances of |
96 |
< |
* RunnableScheduledFuture. Since DelayQueue otherwise |
97 |
< |
* requires type be Delayed, but not necessarily Runnable, and |
98 |
< |
* the workQueue requires the opposite, we need to explicitly |
99 |
< |
* define a class that requires both to ensure that users don't |
100 |
< |
* add objects that aren't RunnableScheduledFutures via |
101 |
< |
* getQueue().add() etc. |
99 |
> |
* (see delayedExecute) compared to ThreadPoolExecutor. |
100 |
|
* |
101 |
|
* 3. Supporting optional run-after-shutdown parameters, which |
102 |
|
* leads to overrides of shutdown methods to remove and cancel |
645 |
|
} |
646 |
|
|
647 |
|
/** |
648 |
< |
* Sets the policy on whether cancellation of a task should remove |
649 |
< |
* it from the work queue. This value is by default {@code false}. |
648 |
> |
* Sets the policy on whether cancelled tasks should be immediately |
649 |
> |
* removed from the work queue at time of cancellation. This value is |
650 |
> |
* by default {@code false}. |
651 |
|
* |
652 |
|
* @param value if {@code true}, remove on cancellation, else don't |
653 |
|
* @see #getRemoveOnCancelPolicy |
658 |
|
} |
659 |
|
|
660 |
|
/** |
661 |
< |
* Gets the policy on whether cancellation of a task should remove |
662 |
< |
* it from the work queue. This value is by default {@code false}. |
661 |
> |
* Gets the policy on whether cancelled tasks should be immediately |
662 |
> |
* removed from the work queue at time of cancellation. This value is |
663 |
> |
* by default {@code false}. |
664 |
|
* |
665 |
< |
* @return {@code true} if cancelled tasks are removed from the queue |
665 |
> |
* @return {@code true} if cancelled tasks are immediately removed |
666 |
> |
* from the queue |
667 |
|
* @see #setRemoveOnCancelPolicy |
668 |
|
* @since 1.7 |
669 |
|
*/ |
754 |
|
* identified by heapIndex. |
755 |
|
*/ |
756 |
|
|
757 |
< |
private static final int INITIAL_CAPACITY = 64; |
758 |
< |
private transient RunnableScheduledFuture[] queue = |
757 |
> |
private static final int INITIAL_CAPACITY = 16; |
758 |
> |
private RunnableScheduledFuture[] queue = |
759 |
|
new RunnableScheduledFuture[INITIAL_CAPACITY]; |
760 |
< |
private transient final ReentrantLock lock = new ReentrantLock(); |
760 |
< |
private transient final Condition available = lock.newCondition(); |
760 |
> |
private final ReentrantLock lock = new ReentrantLock(); |
761 |
|
private int size = 0; |
762 |
|
|
763 |
+ |
/** |
764 |
+ |
* Thread designated to wait for the task at the head of the |
765 |
+ |
* queue. This variant of the Leader-Follower pattern |
766 |
+ |
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to |
767 |
+ |
* minimize unnecessary timed waiting. When a thread becomes |
768 |
+ |
* the leader, it waits only for the next delay to elapse, but |
769 |
+ |
* other threads await indefinitely. The leader thread must |
770 |
+ |
* signal some other thread before returning from take() or |
771 |
+ |
* poll(...), unless some other thread becomes leader in the |
772 |
+ |
* interim. Whenever the head of the queue is replaced with a |
773 |
+ |
* task with an earlier expiration time, the leader field is |
774 |
+ |
* invalidated by being reset to null, and some waiting |
775 |
+ |
* thread, but not necessarily the current leader, is |
776 |
+ |
* signalled. So waiting threads must be prepared to acquire |
777 |
+ |
* and lose leadership while waiting. |
778 |
+ |
*/ |
779 |
+ |
private Thread leader = null; |
780 |
+ |
|
781 |
+ |
/** |
782 |
+ |
* Condition signalled when a newer task becomes available at the |
783 |
+ |
* head of the queue or a new thread may need to become leader. |
784 |
+ |
*/ |
785 |
+ |
private final Condition available = lock.newCondition(); |
786 |
|
|
787 |
|
/** |
788 |
|
* Set f's heapIndex if it is a ScheduledFutureTask. |
833 |
|
} |
834 |
|
|
835 |
|
/** |
813 |
– |
* Performs common bookkeeping for poll and take: Replaces |
814 |
– |
* first element with last; sifts it down, and signals any |
815 |
– |
* waiting consumers. Call only when holding lock. |
816 |
– |
* @param f the task to remove and return |
817 |
– |
*/ |
818 |
– |
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { |
819 |
– |
int s = --size; |
820 |
– |
RunnableScheduledFuture x = queue[s]; |
821 |
– |
queue[s] = null; |
822 |
– |
if (s != 0) { |
823 |
– |
siftDown(0, x); |
824 |
– |
available.signalAll(); |
825 |
– |
} |
826 |
– |
setIndex(f, -1); |
827 |
– |
return f; |
828 |
– |
} |
829 |
– |
|
830 |
– |
/** |
836 |
|
* Resize the heap array. Call only when holding lock. |
837 |
|
*/ |
838 |
|
private void grow() { |
941 |
|
} else { |
942 |
|
siftUp(i, e); |
943 |
|
} |
944 |
< |
if (queue[0] == e) |
945 |
< |
available.signalAll(); |
944 |
> |
if (queue[0] == e) { |
945 |
> |
leader = null; |
946 |
> |
available.signal(); |
947 |
> |
} |
948 |
|
} finally { |
949 |
|
lock.unlock(); |
950 |
|
} |
963 |
|
return offer(e); |
964 |
|
} |
965 |
|
|
966 |
+ |
/** |
967 |
+ |
* Performs common bookkeeping for poll and take: Replaces |
968 |
+ |
* first element with last; sifts it down, and signals another |
969 |
+ |
* waiting consumer. Call only when holding lock. |
970 |
+ |
* @param f the task to remove and return |
971 |
+ |
*/ |
972 |
+ |
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { |
973 |
+ |
int s = --size; |
974 |
+ |
RunnableScheduledFuture x = queue[s]; |
975 |
+ |
queue[s] = null; |
976 |
+ |
if (s != 0) |
977 |
+ |
siftDown(0, x); |
978 |
+ |
setIndex(f, -1); |
979 |
+ |
return f; |
980 |
+ |
} |
981 |
+ |
|
982 |
|
public RunnableScheduledFuture poll() { |
983 |
|
final ReentrantLock lock = this.lock; |
984 |
|
lock.lock(); |
1002 |
|
if (first == null) |
1003 |
|
available.await(); |
1004 |
|
else { |
1005 |
< |
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
1006 |
< |
if (delay > 0) |
1007 |
< |
available.awaitNanos(delay); |
1008 |
< |
else |
1009 |
< |
return finishPoll(first); |
1005 |
> |
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
1006 |
> |
if (delay <= 0) |
1007 |
> |
return finishPoll(first); |
1008 |
> |
else if (leader != null) |
1009 |
> |
available.await(); |
1010 |
> |
else { |
1011 |
> |
Thread thisThread = Thread.currentThread(); |
1012 |
> |
leader = thisThread; |
1013 |
> |
try { |
1014 |
> |
available.awaitNanos(delay); |
1015 |
> |
} finally { |
1016 |
> |
if (leader == thisThread) |
1017 |
> |
leader = null; |
1018 |
> |
} |
1019 |
> |
} |
1020 |
|
} |
1021 |
|
} |
1022 |
|
} finally { |
1023 |
+ |
if (leader == null && queue[0] != null) |
1024 |
+ |
available.signal(); |
1025 |
|
lock.unlock(); |
1026 |
|
} |
1027 |
|
} |
1041 |
|
nanos = available.awaitNanos(nanos); |
1042 |
|
} else { |
1043 |
|
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
1044 |
< |
if (delay > 0) { |
1010 |
< |
if (nanos <= 0) |
1011 |
< |
return null; |
1012 |
< |
if (delay > nanos) |
1013 |
< |
delay = nanos; |
1014 |
< |
long timeLeft = available.awaitNanos(delay); |
1015 |
< |
nanos -= delay - timeLeft; |
1016 |
< |
} else |
1044 |
> |
if (delay <= 0) |
1045 |
|
return finishPoll(first); |
1046 |
< |
} |
1047 |
< |
} |
1046 |
> |
if (nanos <= 0) |
1047 |
> |
return null; |
1048 |
> |
if (nanos < delay || leader != null) |
1049 |
> |
nanos = available.awaitNanos(nanos); |
1050 |
> |
else { |
1051 |
> |
Thread thisThread = Thread.currentThread(); |
1052 |
> |
leader = thisThread; |
1053 |
> |
try { |
1054 |
> |
long timeLeft = available.awaitNanos(delay); |
1055 |
> |
nanos -= delay - timeLeft; |
1056 |
> |
} finally { |
1057 |
> |
if (leader == thisThread) |
1058 |
> |
leader = null; |
1059 |
> |
} |
1060 |
> |
} |
1061 |
> |
} |
1062 |
> |
} |
1063 |
|
} finally { |
1064 |
+ |
if (leader == null && queue[0] != null) |
1065 |
+ |
available.signal(); |
1066 |
|
lock.unlock(); |
1067 |
|
} |
1068 |
|
} |
1092 |
|
RunnableScheduledFuture first = queue[0]; |
1093 |
|
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) |
1094 |
|
return null; |
1095 |
< |
setIndex(first, -1); |
1051 |
< |
int s = --size; |
1052 |
< |
RunnableScheduledFuture x = queue[s]; |
1053 |
< |
queue[s] = null; |
1054 |
< |
if (s != 0) |
1055 |
< |
siftDown(0, x); |
1056 |
< |
return first; |
1095 |
> |
return finishPoll(first); |
1096 |
|
} |
1097 |
|
|
1098 |
|
public int drainTo(Collection<? super Runnable> c) { |