6 |
|
|
7 |
|
package java.util.concurrent; |
8 |
|
import java.util.concurrent.atomic.*; |
9 |
+ |
import java.util.concurrent.locks.*; |
10 |
|
import java.util.*; |
11 |
|
|
12 |
|
/** |
21 |
|
* without any real-time guarantees about when, after they are |
22 |
|
* enabled, they will commence. Tasks scheduled for exactly the same |
23 |
|
* execution time are enabled in first-in-first-out (FIFO) order of |
24 |
< |
* submission. |
24 |
> |
* submission. Cancelled tasks are automatically removed from the |
25 |
> |
* work queue. |
26 |
|
* |
27 |
|
* <p>While this class inherits from {@link ThreadPoolExecutor}, a few |
28 |
|
* of the inherited tuning methods are not useful for it. In |
151 |
|
private final long period; |
152 |
|
|
153 |
|
/** |
154 |
+ |
* Index into delay queue, to support faster cancellation. |
155 |
+ |
*/ |
156 |
+ |
int heapIndex; |
157 |
+ |
|
158 |
+ |
/** |
159 |
|
* Creates a one-shot action with given nanoTime-based trigger time. |
160 |
|
*/ |
161 |
|
ScheduledFutureTask(Runnable r, V result, long ns) { |
230 |
|
time = now() - p; |
231 |
|
} |
232 |
|
|
233 |
+ |
public boolean cancel(boolean mayInterruptIfRunning) { |
234 |
+ |
remove(this); // unconditionally remove |
235 |
+ |
return super.cancel(mayInterruptIfRunning); |
236 |
+ |
} |
237 |
+ |
|
238 |
|
/** |
239 |
|
* Overrides FutureTask version so as to reset/requeue if periodic. |
240 |
|
*/ |
322 |
|
if (e instanceof RunnableScheduledFuture) { |
323 |
|
RunnableScheduledFuture<?> t = |
324 |
|
(RunnableScheduledFuture<?>)e; |
325 |
< |
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || |
326 |
< |
t.isCancelled()) { // also remove if already cancelled |
315 |
< |
if (q.remove(t)) |
316 |
< |
t.cancel(false); |
317 |
< |
} |
325 |
> |
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed)) |
326 |
> |
t.cancel(false); |
327 |
|
} |
328 |
|
} |
329 |
|
} |
680 |
|
} |
681 |
|
|
682 |
|
/** |
683 |
< |
* An annoying wrapper class to convince javac to use a |
684 |
< |
* DelayQueue<RunnableScheduledFuture> as a BlockingQueue<Runnable> |
683 |
> |
* Specialized delay queue. To mesh with TPE declarations, this |
684 |
> |
* class must be declared as a BlockingQueue<Runnable> even though |
685 |
> |
* it can only hold RunnableScheduledFutures |
686 |
|
*/ |
687 |
< |
private static class DelayedWorkQueue |
678 |
< |
extends AbstractCollection<Runnable> |
687 |
> |
static class DelayedWorkQueue extends AbstractQueue<Runnable> |
688 |
|
implements BlockingQueue<Runnable> { |
689 |
|
|
690 |
< |
private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>(); |
691 |
< |
public Runnable poll() { return dq.poll(); } |
692 |
< |
public Runnable peek() { return dq.peek(); } |
693 |
< |
public Runnable take() throws InterruptedException { return dq.take(); } |
694 |
< |
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { |
695 |
< |
return dq.poll(timeout, unit); |
690 |
> |
/* |
691 |
> |
* A DelayedWorkQueue is based on a heap-based data structure |
692 |
> |
* like those in DelayQueue and PriorityQueue, except that |
693 |
> |
* every ScheduledFutureTask also records its index into the |
694 |
> |
* heap array. This eliminates the need to find a task upon |
695 |
> |
* cancellation, greatly speeding up removal (down from O(n) |
696 |
> |
* to O(log n)), and reducing garbage retention that would |
697 |
> |
* otherwise occur by waiting for the element to rise to top |
698 |
> |
* before clearing. But because the queue may also hold |
699 |
> |
* RunnableScheduledFutures that are not ScheduledFutureTasks, |
700 |
> |
* we are not guaranteed to have such indices available, in |
701 |
> |
* which case we fall back to linear search. (We expect that |
702 |
> |
* most tasks will not be decorated, and that the faster cases |
703 |
> |
* will be much more common.) |
704 |
> |
* |
705 |
> |
* All heap operations must record index changes -- mainly |
706 |
> |
* within siftUp and siftDown. Upon removal, a task's |
707 |
> |
* heapIndex is set to -1. Note that ScheduledFutureTasks can |
708 |
> |
* appear at most once in the queue (this need not be true for |
709 |
> |
* other kinds of tasks or work queues), so are uniquely |
710 |
> |
* identified by heapIndex. |
711 |
> |
*/ |
712 |
> |
|
713 |
> |
private static final int INITIAL_CAPACITY = 64; |
714 |
> |
private transient RunnableScheduledFuture[] queue = |
715 |
> |
new RunnableScheduledFuture[INITIAL_CAPACITY]; |
716 |
> |
private transient final ReentrantLock lock = new ReentrantLock(); |
717 |
> |
private transient final Condition available = lock.newCondition(); |
718 |
> |
private int size = 0; |
719 |
> |
|
720 |
> |
|
721 |
> |
/** |
722 |
> |
* Set f's heapIndex if it is a ScheduledFutureTask |
723 |
> |
*/ |
724 |
> |
private void setIndex(Object f, int idx) { |
725 |
> |
if (f instanceof ScheduledFutureTask) |
726 |
> |
((ScheduledFutureTask)f).heapIndex = idx; |
727 |
> |
} |
728 |
> |
|
729 |
> |
/** |
730 |
> |
* Sift element added at bottom up to its heap-ordered spot |
731 |
> |
* Call only when holding lock. |
732 |
> |
*/ |
733 |
> |
private void siftUp(int k, RunnableScheduledFuture key) { |
734 |
> |
while (k > 0) { |
735 |
> |
int parent = (k - 1) >>> 1; |
736 |
> |
RunnableScheduledFuture e = queue[parent]; |
737 |
> |
if (key.compareTo(e) >= 0) |
738 |
> |
break; |
739 |
> |
queue[k] = e; |
740 |
> |
setIndex(e, k); |
741 |
> |
k = parent; |
742 |
> |
} |
743 |
> |
queue[k] = key; |
744 |
> |
setIndex(key, k); |
745 |
> |
} |
746 |
> |
|
747 |
> |
/** |
748 |
> |
* Sift element added at top down to its heap-ordered spot |
749 |
> |
* Call only when holding lock. |
750 |
> |
*/ |
751 |
> |
private void siftDown(int k, RunnableScheduledFuture key) { |
752 |
> |
int half = size >>> 1; |
753 |
> |
while (k < half) { |
754 |
> |
int child = (k << 1) + 1; |
755 |
> |
RunnableScheduledFuture c = queue[child]; |
756 |
> |
int right = child + 1; |
757 |
> |
if (right < size && c.compareTo(queue[right]) > 0) |
758 |
> |
c = queue[child = right]; |
759 |
> |
if (key.compareTo(c) <= 0) |
760 |
> |
break; |
761 |
> |
queue[k] = c; |
762 |
> |
setIndex(c, k); |
763 |
> |
k = child; |
764 |
> |
} |
765 |
> |
queue[k] = key; |
766 |
> |
setIndex(key, k); |
767 |
> |
} |
768 |
> |
|
769 |
> |
/** |
770 |
> |
* Performs common bookkeeping for poll and take: Replaces |
771 |
> |
* first element with last; sifts it down, and signals any |
772 |
> |
* waiting consumers. Call only when holding lock. |
773 |
> |
* @param f the task to remove and return |
774 |
> |
*/ |
775 |
> |
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { |
776 |
> |
int s = --size; |
777 |
> |
RunnableScheduledFuture x = queue[s]; |
778 |
> |
queue[s] = null; |
779 |
> |
if (s != 0) { |
780 |
> |
siftDown(0, x); |
781 |
> |
available.signalAll(); |
782 |
> |
} |
783 |
> |
setIndex(f, -1); |
784 |
> |
return f; |
785 |
> |
} |
786 |
> |
|
787 |
> |
/** |
788 |
> |
* Resize the heap array. Call only when holding lock. |
789 |
> |
*/ |
790 |
> |
private void grow() { |
791 |
> |
int oldCapacity = queue.length; |
792 |
> |
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% |
793 |
> |
if (newCapacity < 0) // overflow |
794 |
> |
newCapacity = Integer.MAX_VALUE; |
795 |
> |
queue = Arrays.copyOf(queue, newCapacity); |
796 |
> |
} |
797 |
> |
|
798 |
> |
/** |
799 |
> |
* Find index of given object, or -1 if absent |
800 |
> |
*/ |
801 |
> |
private int indexOf(Object x) { |
802 |
> |
if (x != null) { |
803 |
> |
for (int i = 0; i < size; i++) |
804 |
> |
if (x.equals(queue[i])) |
805 |
> |
return i; |
806 |
> |
} |
807 |
> |
return -1; |
808 |
> |
} |
809 |
> |
|
810 |
> |
public boolean remove(Object x) { |
811 |
> |
boolean removed; |
812 |
> |
final ReentrantLock lock = this.lock; |
813 |
> |
lock.lock(); |
814 |
> |
try { |
815 |
> |
int i; |
816 |
> |
if (x instanceof ScheduledFutureTask) |
817 |
> |
i = ((ScheduledFutureTask)x).heapIndex; |
818 |
> |
else |
819 |
> |
i = indexOf(x); |
820 |
> |
if (removed = (i >= 0 && i < size && queue[i] == x)) { |
821 |
> |
setIndex(x, -1); |
822 |
> |
int s = --size; |
823 |
> |
RunnableScheduledFuture replacement = queue[s]; |
824 |
> |
queue[s] = null; |
825 |
> |
if (s != i) { |
826 |
> |
siftDown(i, replacement); |
827 |
> |
if (queue[i] == replacement) |
828 |
> |
siftUp(i, replacement); |
829 |
> |
} |
830 |
> |
} |
831 |
> |
} finally { |
832 |
> |
lock.unlock(); |
833 |
> |
} |
834 |
> |
return removed; |
835 |
> |
} |
836 |
> |
|
837 |
> |
public int size() { |
838 |
> |
int s; |
839 |
> |
final ReentrantLock lock = this.lock; |
840 |
> |
lock.lock(); |
841 |
> |
try { |
842 |
> |
s = size; |
843 |
> |
} finally { |
844 |
> |
lock.unlock(); |
845 |
> |
} |
846 |
> |
return s; |
847 |
> |
} |
848 |
> |
|
849 |
> |
public boolean isEmpty() { |
850 |
> |
return size() == 0; |
851 |
> |
} |
852 |
> |
|
853 |
> |
public int remainingCapacity() { |
854 |
> |
return Integer.MAX_VALUE; |
855 |
> |
} |
856 |
> |
|
857 |
> |
public RunnableScheduledFuture peek() { |
858 |
> |
final ReentrantLock lock = this.lock; |
859 |
> |
lock.lock(); |
860 |
> |
try { |
861 |
> |
return queue[0]; |
862 |
> |
} finally { |
863 |
> |
lock.unlock(); |
864 |
> |
} |
865 |
|
} |
866 |
|
|
689 |
– |
public boolean add(Runnable x) { |
690 |
– |
return dq.add((RunnableScheduledFuture)x); |
691 |
– |
} |
867 |
|
public boolean offer(Runnable x) { |
868 |
< |
return dq.offer((RunnableScheduledFuture)x); |
868 |
> |
if (x == null) |
869 |
> |
throw new NullPointerException(); |
870 |
> |
RunnableScheduledFuture e = (RunnableScheduledFuture)x; |
871 |
> |
final ReentrantLock lock = this.lock; |
872 |
> |
lock.lock(); |
873 |
> |
try { |
874 |
> |
int i = size; |
875 |
> |
if (i >= queue.length) |
876 |
> |
grow(); |
877 |
> |
size = i + 1; |
878 |
> |
boolean notify; |
879 |
> |
if (i == 0) { |
880 |
> |
notify = true; |
881 |
> |
queue[0] = e; |
882 |
> |
setIndex(e, 0); |
883 |
> |
} |
884 |
> |
else { |
885 |
> |
notify = e.compareTo(queue[0]) < 0; |
886 |
> |
siftUp(i, e); |
887 |
> |
} |
888 |
> |
if (notify) |
889 |
> |
available.signalAll(); |
890 |
> |
} finally { |
891 |
> |
lock.unlock(); |
892 |
> |
} |
893 |
> |
return true; |
894 |
|
} |
895 |
< |
public void put(Runnable x) { |
896 |
< |
dq.put((RunnableScheduledFuture)x); |
895 |
> |
|
896 |
> |
public void put(Runnable e) { |
897 |
> |
offer(e); |
898 |
> |
} |
899 |
> |
|
900 |
> |
public boolean add(Runnable e) { |
901 |
> |
return offer(e); |
902 |
> |
} |
903 |
> |
|
904 |
> |
public boolean offer(Runnable e, long timeout, TimeUnit unit) { |
905 |
> |
return offer(e); |
906 |
> |
} |
907 |
> |
|
908 |
> |
public RunnableScheduledFuture poll() { |
909 |
> |
final ReentrantLock lock = this.lock; |
910 |
> |
lock.lock(); |
911 |
> |
try { |
912 |
> |
RunnableScheduledFuture first = queue[0]; |
913 |
> |
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) |
914 |
> |
return null; |
915 |
> |
else |
916 |
> |
return finishPoll(first); |
917 |
> |
} finally { |
918 |
> |
lock.unlock(); |
919 |
> |
} |
920 |
> |
} |
921 |
> |
|
922 |
> |
public RunnableScheduledFuture take() throws InterruptedException { |
923 |
> |
final ReentrantLock lock = this.lock; |
924 |
> |
lock.lockInterruptibly(); |
925 |
> |
try { |
926 |
> |
for (;;) { |
927 |
> |
RunnableScheduledFuture first = queue[0]; |
928 |
> |
if (first == null) |
929 |
> |
available.await(); |
930 |
> |
else { |
931 |
> |
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
932 |
> |
if (delay > 0) |
933 |
> |
available.awaitNanos(delay); |
934 |
> |
else |
935 |
> |
return finishPoll(first); |
936 |
> |
} |
937 |
> |
} |
938 |
> |
} finally { |
939 |
> |
lock.unlock(); |
940 |
> |
} |
941 |
> |
} |
942 |
> |
|
943 |
> |
public RunnableScheduledFuture poll(long timeout, TimeUnit unit) |
944 |
> |
throws InterruptedException { |
945 |
> |
long nanos = unit.toNanos(timeout); |
946 |
> |
final ReentrantLock lock = this.lock; |
947 |
> |
lock.lockInterruptibly(); |
948 |
> |
try { |
949 |
> |
for (;;) { |
950 |
> |
RunnableScheduledFuture first = queue[0]; |
951 |
> |
if (first == null) { |
952 |
> |
if (nanos <= 0) |
953 |
> |
return null; |
954 |
> |
else |
955 |
> |
nanos = available.awaitNanos(nanos); |
956 |
> |
} else { |
957 |
> |
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
958 |
> |
if (delay > 0) { |
959 |
> |
if (nanos <= 0) |
960 |
> |
return null; |
961 |
> |
if (delay > nanos) |
962 |
> |
delay = nanos; |
963 |
> |
long timeLeft = available.awaitNanos(delay); |
964 |
> |
nanos -= delay - timeLeft; |
965 |
> |
} else |
966 |
> |
return finishPoll(first); |
967 |
> |
} |
968 |
> |
} |
969 |
> |
} finally { |
970 |
> |
lock.unlock(); |
971 |
> |
} |
972 |
> |
} |
973 |
> |
|
974 |
> |
public void clear() { |
975 |
> |
final ReentrantLock lock = this.lock; |
976 |
> |
lock.lock(); |
977 |
> |
try { |
978 |
> |
for (int i = 0; i < size; i++) { |
979 |
> |
RunnableScheduledFuture t = queue[i]; |
980 |
> |
if (t != null) { |
981 |
> |
queue[i] = null; |
982 |
> |
setIndex(t, -1); |
983 |
> |
} |
984 |
> |
} |
985 |
> |
size = 0; |
986 |
> |
} finally { |
987 |
> |
lock.unlock(); |
988 |
> |
} |
989 |
|
} |
990 |
< |
public boolean offer(Runnable x, long timeout, TimeUnit unit) { |
991 |
< |
return dq.offer((RunnableScheduledFuture)x, timeout, unit); |
990 |
> |
|
991 |
> |
/** |
992 |
> |
* Return and remove first element only if it is expired. |
993 |
> |
* Used only by drainTo. Call only when holding lock. |
994 |
> |
*/ |
995 |
> |
private RunnableScheduledFuture pollExpired() { |
996 |
> |
RunnableScheduledFuture first = queue[0]; |
997 |
> |
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) |
998 |
> |
return null; |
999 |
> |
setIndex(first, -1); |
1000 |
> |
int s = --size; |
1001 |
> |
RunnableScheduledFuture x = queue[s]; |
1002 |
> |
queue[s] = null; |
1003 |
> |
if (s != 0) |
1004 |
> |
siftDown(0, x); |
1005 |
> |
return first; |
1006 |
> |
} |
1007 |
> |
|
1008 |
> |
public int drainTo(Collection<? super Runnable> c) { |
1009 |
> |
if (c == null) |
1010 |
> |
throw new NullPointerException(); |
1011 |
> |
if (c == this) |
1012 |
> |
throw new IllegalArgumentException(); |
1013 |
> |
final ReentrantLock lock = this.lock; |
1014 |
> |
lock.lock(); |
1015 |
> |
try { |
1016 |
> |
int n = 0; |
1017 |
> |
for (;;) { |
1018 |
> |
RunnableScheduledFuture first = pollExpired(); |
1019 |
> |
if (first != null) { |
1020 |
> |
c.add(first); |
1021 |
> |
++n; |
1022 |
> |
} |
1023 |
> |
else |
1024 |
> |
break; |
1025 |
> |
} |
1026 |
> |
if (n > 0) |
1027 |
> |
available.signalAll(); |
1028 |
> |
return n; |
1029 |
> |
} finally { |
1030 |
> |
lock.unlock(); |
1031 |
> |
} |
1032 |
|
} |
1033 |
|
|
702 |
– |
public Runnable remove() { return dq.remove(); } |
703 |
– |
public Runnable element() { return dq.element(); } |
704 |
– |
public void clear() { dq.clear(); } |
705 |
– |
public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); } |
1034 |
|
public int drainTo(Collection<? super Runnable> c, int maxElements) { |
1035 |
< |
return dq.drainTo(c, maxElements); |
1035 |
> |
if (c == null) |
1036 |
> |
throw new NullPointerException(); |
1037 |
> |
if (c == this) |
1038 |
> |
throw new IllegalArgumentException(); |
1039 |
> |
if (maxElements <= 0) |
1040 |
> |
return 0; |
1041 |
> |
final ReentrantLock lock = this.lock; |
1042 |
> |
lock.lock(); |
1043 |
> |
try { |
1044 |
> |
int n = 0; |
1045 |
> |
while (n < maxElements) { |
1046 |
> |
RunnableScheduledFuture first = pollExpired(); |
1047 |
> |
if (first != null) { |
1048 |
> |
c.add(first); |
1049 |
> |
++n; |
1050 |
> |
} |
1051 |
> |
else |
1052 |
> |
break; |
1053 |
> |
} |
1054 |
> |
if (n > 0) |
1055 |
> |
available.signalAll(); |
1056 |
> |
return n; |
1057 |
> |
} finally { |
1058 |
> |
lock.unlock(); |
1059 |
> |
} |
1060 |
> |
} |
1061 |
> |
|
1062 |
> |
public Object[] toArray() { |
1063 |
> |
final ReentrantLock lock = this.lock; |
1064 |
> |
lock.lock(); |
1065 |
> |
try { |
1066 |
> |
return Arrays.copyOf(queue, size); |
1067 |
> |
} finally { |
1068 |
> |
lock.unlock(); |
1069 |
> |
} |
1070 |
> |
} |
1071 |
> |
|
1072 |
> |
public <T> T[] toArray(T[] a) { |
1073 |
> |
final ReentrantLock lock = this.lock; |
1074 |
> |
lock.lock(); |
1075 |
> |
try { |
1076 |
> |
if (a.length < size) |
1077 |
> |
return (T[]) Arrays.copyOf(queue, size, a.getClass()); |
1078 |
> |
System.arraycopy(queue, 0, a, 0, size); |
1079 |
> |
if (a.length > size) |
1080 |
> |
a[size] = null; |
1081 |
> |
return a; |
1082 |
> |
} finally { |
1083 |
> |
lock.unlock(); |
1084 |
> |
} |
1085 |
|
} |
1086 |
|
|
710 |
– |
public int remainingCapacity() { return dq.remainingCapacity(); } |
711 |
– |
public boolean remove(Object x) { return dq.remove(x); } |
712 |
– |
public boolean contains(Object x) { return dq.contains(x); } |
713 |
– |
public int size() { return dq.size(); } |
714 |
– |
public boolean isEmpty() { return dq.isEmpty(); } |
715 |
– |
public Object[] toArray() { return dq.toArray(); } |
716 |
– |
public <T> T[] toArray(T[] array) { return dq.toArray(array); } |
1087 |
|
public Iterator<Runnable> iterator() { |
1088 |
< |
return new Iterator<Runnable>() { |
1089 |
< |
private Iterator<RunnableScheduledFuture> it = dq.iterator(); |
1090 |
< |
public boolean hasNext() { return it.hasNext(); } |
1091 |
< |
public Runnable next() { return it.next(); } |
1092 |
< |
public void remove() { it.remove(); } |
1093 |
< |
}; |
1088 |
> |
return new Itr(toArray()); |
1089 |
> |
} |
1090 |
> |
|
1091 |
> |
/** |
1092 |
> |
* Snapshot iterator that works off copy of underlying q array. |
1093 |
> |
*/ |
1094 |
> |
private class Itr implements Iterator<Runnable> { |
1095 |
> |
final Object[] array; // Array of all elements |
1096 |
> |
int cursor; // index of next element to return; |
1097 |
> |
int lastRet; // index of last element, or -1 if no such |
1098 |
> |
|
1099 |
> |
Itr(Object[] array) { |
1100 |
> |
lastRet = -1; |
1101 |
> |
this.array = array; |
1102 |
> |
} |
1103 |
> |
|
1104 |
> |
public boolean hasNext() { |
1105 |
> |
return cursor < array.length; |
1106 |
> |
} |
1107 |
> |
|
1108 |
> |
public Runnable next() { |
1109 |
> |
if (cursor >= array.length) |
1110 |
> |
throw new NoSuchElementException(); |
1111 |
> |
lastRet = cursor; |
1112 |
> |
return (Runnable)array[cursor++]; |
1113 |
> |
} |
1114 |
> |
|
1115 |
> |
public void remove() { |
1116 |
> |
if (lastRet < 0) |
1117 |
> |
throw new IllegalStateException(); |
1118 |
> |
DelayedWorkQueue.this.remove(array[lastRet]); |
1119 |
> |
lastRet = -1; |
1120 |
> |
} |
1121 |
|
} |
1122 |
|
} |
1123 |
|
} |