ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.21
Committed: Tue Dec 23 19:38:09 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.20: +15 -0 lines
Log Message:
cache finals across volatiles; avoid readResolve; doc improvments; timed invokeAll interleaves

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * <p>This class implements all of the <em>optional</em> methods
20 * of the {@link Collection} and {@link Iterator} interfaces.
21 * @since 1.5
22 * @author Doug Lea
23 * @param <E> the type of elements held in this collection
24 */
25
26 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
27 implements BlockingQueue<E> {
28
29 private transient final ReentrantLock lock = new ReentrantLock();
30 private transient final ReentrantLock.ConditionObject available = lock.newCondition();
31 private final PriorityQueue<E> q = new PriorityQueue<E>();
32
33 /**
34 * Creates a new <tt>DelayQueue</tt> that is initially empty.
35 */
36 public DelayQueue() {}
37
38 /**
39 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
40 * given collection of {@link Delayed} instances.
41 *
42 * @throws NullPointerException if <tt>c</tt> or any element within it
43 * is <tt>null</tt>
44 *
45 */
46 public DelayQueue(Collection<? extends E> c) {
47 this.addAll(c);
48 }
49
50 /**
51 * Inserts the specified element into this delay queue.
52 *
53 * @param o the element to add
54 * @return <tt>true</tt>
55 * @throws NullPointerException if the specified element is <tt>null</tt>.
56 */
57 public boolean offer(E o) {
58 final ReentrantLock lock = this.lock;
59 lock.lock();
60 try {
61 E first = q.peek();
62 q.offer(o);
63 if (first == null || o.compareTo(first) < 0)
64 available.signalAll();
65 return true;
66 } finally {
67 lock.unlock();
68 }
69 }
70
71
72 /**
73 * Adds the specified element to this delay queue. As the queue is
74 * unbounded this method will never block.
75 * @param o the element to add
76 * @throws NullPointerException if the specified element is <tt>null</tt>.
77 */
78 public void put(E o) {
79 offer(o);
80 }
81
82 /**
83 * Inserts the specified element into this delay queue. As the queue is
84 * unbounded this method will never block.
85 * @param o the element to add
86 * @param timeout This parameter is ignored as the method never blocks
87 * @param unit This parameter is ignored as the method never blocks
88 * @return <tt>true</tt>
89 * @throws NullPointerException if the specified element is <tt>null</tt>.
90 */
91 public boolean offer(E o, long timeout, TimeUnit unit) {
92 return offer(o);
93 }
94
95 /**
96 * Adds the specified element to this queue.
97 * @param o the element to add
98 * @return <tt>true</tt> (as per the general contract of
99 * <tt>Collection.add</tt>).
100 *
101 * @throws NullPointerException if the specified element is <tt>null</tt>.
102 */
103 public boolean add(E o) {
104 return offer(o);
105 }
106
107 public E take() throws InterruptedException {
108 final ReentrantLock lock = this.lock;
109 lock.lockInterruptibly();
110 try {
111 for (;;) {
112 E first = q.peek();
113 if (first == null) {
114 available.await();
115 } else {
116 long delay = first.getDelay(TimeUnit.NANOSECONDS);
117 if (delay > 0) {
118 long tl = available.awaitNanos(delay);
119 } else {
120 E x = q.poll();
121 assert x != null;
122 if (q.size() != 0)
123 available.signalAll(); // wake up other takers
124 return x;
125
126 }
127 }
128 }
129 } finally {
130 lock.unlock();
131 }
132 }
133
134 public E poll(long time, TimeUnit unit) throws InterruptedException {
135 final ReentrantLock lock = this.lock;
136 lock.lockInterruptibly();
137 long nanos = unit.toNanos(time);
138 try {
139 for (;;) {
140 E first = q.peek();
141 if (first == null) {
142 if (nanos <= 0)
143 return null;
144 else
145 nanos = available.awaitNanos(nanos);
146 } else {
147 long delay = first.getDelay(TimeUnit.NANOSECONDS);
148 if (delay > 0) {
149 if (delay > nanos)
150 delay = nanos;
151 long timeLeft = available.awaitNanos(delay);
152 nanos -= delay - timeLeft;
153 } else {
154 E x = q.poll();
155 assert x != null;
156 if (q.size() != 0)
157 available.signalAll();
158 return x;
159 }
160 }
161 }
162 } finally {
163 lock.unlock();
164 }
165 }
166
167
168 public E poll() {
169 final ReentrantLock lock = this.lock;
170 lock.lock();
171 try {
172 E first = q.peek();
173 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
174 return null;
175 else {
176 E x = q.poll();
177 assert x != null;
178 if (q.size() != 0)
179 available.signalAll();
180 return x;
181 }
182 } finally {
183 lock.unlock();
184 }
185 }
186
187 public E peek() {
188 final ReentrantLock lock = this.lock;
189 lock.lock();
190 try {
191 return q.peek();
192 } finally {
193 lock.unlock();
194 }
195 }
196
197 public int size() {
198 final ReentrantLock lock = this.lock;
199 lock.lock();
200 try {
201 return q.size();
202 } finally {
203 lock.unlock();
204 }
205 }
206
207 public int drainTo(Collection<? super E> c) {
208 if (c == null)
209 throw new NullPointerException();
210 if (c == this)
211 throw new IllegalArgumentException();
212 final ReentrantLock lock = this.lock;
213 lock.lock();
214 try {
215 int n = 0;
216 for (;;) {
217 E first = q.peek();
218 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
219 break;
220 c.add(q.poll());
221 ++n;
222 }
223 if (n > 0)
224 available.signalAll();
225 return n;
226 } finally {
227 lock.unlock();
228 }
229 }
230
231 public int drainTo(Collection<? super E> c, int maxElements) {
232 if (c == null)
233 throw new NullPointerException();
234 if (c == this)
235 throw new IllegalArgumentException();
236 if (maxElements <= 0)
237 return 0;
238 final ReentrantLock lock = this.lock;
239 lock.lock();
240 try {
241 int n = 0;
242 while (n < maxElements) {
243 E first = q.peek();
244 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
245 break;
246 c.add(q.poll());
247 ++n;
248 }
249 if (n > 0)
250 available.signalAll();
251 return n;
252 } finally {
253 lock.unlock();
254 }
255 }
256
257 /**
258 * Atomically removes all of the elements from this delay queue.
259 * The queue will be empty after this call returns.
260 */
261 public void clear() {
262 final ReentrantLock lock = this.lock;
263 lock.lock();
264 try {
265 q.clear();
266 } finally {
267 lock.unlock();
268 }
269 }
270
271 /**
272 * Always returns <tt>Integer.MAX_VALUE</tt> because
273 * a <tt>DelayQueue</tt> is not capacity constrained.
274 * @return <tt>Integer.MAX_VALUE</tt>
275 */
276 public int remainingCapacity() {
277 return Integer.MAX_VALUE;
278 }
279
280 public Object[] toArray() {
281 final ReentrantLock lock = this.lock;
282 lock.lock();
283 try {
284 return q.toArray();
285 } finally {
286 lock.unlock();
287 }
288 }
289
290 public <T> T[] toArray(T[] array) {
291 final ReentrantLock lock = this.lock;
292 lock.lock();
293 try {
294 return q.toArray(array);
295 } finally {
296 lock.unlock();
297 }
298 }
299
300 public boolean remove(Object o) {
301 final ReentrantLock lock = this.lock;
302 lock.lock();
303 try {
304 return q.remove(o);
305 } finally {
306 lock.unlock();
307 }
308 }
309
310 /**
311 * Returns an iterator over the elements in this queue. The iterator
312 * does not return the elements in any particular order. The
313 * returned iterator is a thread-safe "fast-fail" iterator that will
314 * throw {@link java.util.ConcurrentModificationException}
315 * upon detected interference.
316 *
317 * @return an iterator over the elements in this queue.
318 */
319 public Iterator<E> iterator() {
320 final ReentrantLock lock = this.lock;
321 lock.lock();
322 try {
323 return new Itr(q.iterator());
324 } finally {
325 lock.unlock();
326 }
327 }
328
329 private class Itr<E> implements Iterator<E> {
330 private final Iterator<E> iter;
331 Itr(Iterator<E> i) {
332 iter = i;
333 }
334
335 public boolean hasNext() {
336 return iter.hasNext();
337 }
338
339 public E next() {
340 final ReentrantLock lock = DelayQueue.this.lock;
341 lock.lock();
342 try {
343 return iter.next();
344 } finally {
345 lock.unlock();
346 }
347 }
348
349 public void remove() {
350 final ReentrantLock lock = DelayQueue.this.lock;
351 lock.lock();
352 try {
353 iter.remove();
354 } finally {
355 lock.unlock();
356 }
357 }
358 }
359
360 }