ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRERELEASE_0_1
Changes since 1.1: +218 -90 lines
Log Message:
re-check-in initial implementations

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.*;
10
11 /**
12 * An unbounded queue of <tt>DelayEntry</tt> elements, in which
13 * elements can only be taken when their delay has expired.
14 **/
15
16 public class DelayQueue<E> extends AbstractQueue<DelayEntry<E>>
17 implements BlockingQueue<DelayEntry<E>> {
18
19 private transient final ReentrantLock lock = new ReentrantLock();
20 private transient final Condition canTake = lock.newCondition();
21 private final PriorityQueue<DelayEntry<E>> q = new PriorityQueue<DelayEntry<E>>();
22
23 public DelayQueue() {}
24
25 public boolean offer(DelayEntry<E> x) {
26 lock.lock();
27 try {
28 DelayEntry<E> first = q.peek();
29 q.offer(x);
30 if (first == null || x.compareTo(first) < 0)
31 canTake.signalAll();
32 return true;
33 }
34 finally {
35 lock.unlock();
36 }
37 }
38
39 public void put(DelayEntry<E> x) {
40 offer(x);
41 }
42
43 public boolean offer(DelayEntry<E> x, long time, TimeUnit unit) {
44 return offer(x);
45 }
46
47 public boolean add(DelayEntry<E> x) {
48 return offer(x);
49 }
50
51 public DelayEntry<E> take() throws InterruptedException {
52 lock.lockInterruptibly();
53 try {
54 for (;;) {
55 DelayEntry first = q.peek();
56 if (first == null)
57 canTake.await();
58 else {
59 long delay = first.getDelay(TimeUnit.NANOSECONDS);
60 if (delay > 0)
61 canTake.awaitNanos(delay);
62 else {
63 DelayEntry<E> x = q.poll();
64 assert x != null;
65 if (q.size() != 0)
66 canTake.signalAll(); // wake up other takers
67 return x;
68
69 }
70 }
71 }
72 }
73 finally {
74 lock.unlock();
75 }
76 }
77
78 public DelayEntry<E> poll(long time, TimeUnit unit) throws InterruptedException {
79 lock.lockInterruptibly();
80 long nanos = unit.toNanos(time);
81 try {
82 for (;;) {
83 DelayEntry first = q.peek();
84 if (first == null) {
85 if (nanos <= 0)
86 return null;
87 else
88 nanos = canTake.awaitNanos(nanos);
89 }
90 else {
91 long delay = first.getDelay(TimeUnit.NANOSECONDS);
92 if (delay > 0) {
93 if (delay > nanos)
94 delay = nanos;
95 long timeLeft = canTake.awaitNanos(delay);
96 nanos -= delay - timeLeft;
97 }
98 else {
99 DelayEntry<E> x = q.poll();
100 assert x != null;
101 if (q.size() != 0)
102 canTake.signalAll();
103 return x;
104 }
105 }
106 }
107 }
108 finally {
109 lock.unlock();
110 }
111 }
112
113
114 public DelayEntry<E> poll() {
115 lock.lock();
116 try {
117 DelayEntry first = q.peek();
118 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
119 return null;
120 else {
121 DelayEntry<E> x = q.poll();
122 assert x != null;
123 if (q.size() != 0)
124 canTake.signalAll();
125 return x;
126 }
127 }
128 finally {
129 lock.unlock();
130 }
131 }
132
133 public DelayEntry<E> peek() {
134 lock.lock();
135 try {
136 return q.peek();
137 }
138 finally {
139 lock.unlock();
140 }
141 }
142
143 public int size() {
144 lock.lock();
145 try {
146 return q.size();
147 }
148 finally {
149 lock.unlock();
150 }
151 }
152
153 public void clear() {
154 lock.lock();
155 try {
156 q.clear();
157 }
158 finally {
159 lock.unlock();
160 }
161 }
162
163 public int remainingCapacity() {
164 return Integer.MAX_VALUE;
165 }
166
167 public Object[] toArray() {
168 lock.lock();
169 try {
170 return q.toArray();
171 }
172 finally {
173 lock.unlock();
174 }
175 }
176
177 public <T> T[] toArray(T[] array) {
178 lock.lock();
179 try {
180 return q.toArray(array);
181 }
182 finally {
183 lock.unlock();
184 }
185 }
186
187 public boolean remove(Object x) {
188 lock.lock();
189 try {
190 return q.remove(x);
191 }
192 finally {
193 lock.unlock();
194 }
195 }
196
197 public Iterator<DelayEntry<E>> iterator() {
198 lock.lock();
199 try {
200 return new Itr(q.iterator());
201 }
202 finally {
203 lock.unlock();
204 }
205 }
206
207 private class Itr<E> implements Iterator<DelayEntry<E>> {
208 private final Iterator<DelayEntry<E>> iter;
209 Itr(Iterator<DelayEntry<E>> i) {
210 iter = i;
211 }
212
213 public boolean hasNext() {
214 return iter.hasNext();
215 }
216
217 public DelayEntry<E> next() {
218 lock.lock();
219 try {
220 return iter.next();
221 }
222 finally {
223 lock.unlock();
224 }
225 }
226
227 public void remove() {
228 lock.lock();
229 try {
230 iter.remove();
231 }
232 finally {
233 lock.unlock();
234 }
235 }
236 }
237
238 }