ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.35
Committed: Fri Jun 10 18:12:59 2005 UTC (19 years ago) by dl
Branch: MAIN
Changes since 1.34: +4 -3 lines
Log Message:
Fix peek spec

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of
14 * <tt>Delayed</tt> elements, in which an element can only be taken
15 * when its delay has expired. The <em>head</em> of the queue is that
16 * <tt>Delayed</tt> element whose delay expired furthest in the
17 * past. If no delay has expired there is no head and <tt>poll</tt>
18 * will return <tt>null</tt>. Expiration occurs when an element's
19 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20 * than or equal to zero. This queue does not permit <tt>null</tt>
21 * elements.
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces.
26 *
27 * <p>This class is a member of the
28 * <a href="{@docRoot}/../guide/collections/index.html">
29 * Java Collections Framework</a>.
30 *
31 * @since 1.5
32 * @author Doug Lea
33 * @param <E> the type of elements held in this collection
34 */
35
36 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
37 implements BlockingQueue<E> {
38
39 private transient final ReentrantLock lock = new ReentrantLock();
40 private transient final Condition available = lock.newCondition();
41 private final PriorityQueue<E> q = new PriorityQueue<E>();
42
43 /**
44 * Creates a new <tt>DelayQueue</tt> that is initially empty.
45 */
46 public DelayQueue() {}
47
48 /**
49 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50 * given collection of {@link Delayed} instances.
51 *
52 * @param c the collection of elements to initially contain
53 * @throws NullPointerException if the specified collection or any
54 * of its elements are null
55 */
56 public DelayQueue(Collection<? extends E> c) {
57 this.addAll(c);
58 }
59
60 /**
61 * Inserts the specified element into this delay queue.
62 *
63 * @param e the element to add
64 * @return <tt>true</tt> (as per the spec for {@link Collection#add})
65 * @throws NullPointerException if the specified element is null
66 */
67 public boolean add(E e) {
68 return offer(e);
69 }
70
71 /**
72 * Inserts the specified element into this delay queue.
73 *
74 * @param e the element to add
75 * @return <tt>true</tt>
76 * @throws NullPointerException if the specified element is null
77 */
78 public boolean offer(E e) {
79 final ReentrantLock lock = this.lock;
80 lock.lock();
81 try {
82 E first = q.peek();
83 q.offer(e);
84 if (first == null || e.compareTo(first) < 0)
85 available.signalAll();
86 return true;
87 } finally {
88 lock.unlock();
89 }
90 }
91
92 /**
93 * Inserts the specified element into this delay queue. As the queue is
94 * unbounded this method will never block.
95 *
96 * @param e the element to add
97 * @throws NullPointerException {@inheritDoc}
98 */
99 public void put(E e) {
100 offer(e);
101 }
102
103 /**
104 * Inserts the specified element into this delay queue. As the queue is
105 * unbounded this method will never block.
106 *
107 * @param e the element to add
108 * @param timeout This parameter is ignored as the method never blocks
109 * @param unit This parameter is ignored as the method never blocks
110 * @return <tt>true</tt>
111 * @throws NullPointerException {@inheritDoc}
112 */
113 public boolean offer(E e, long timeout, TimeUnit unit) {
114 return offer(e);
115 }
116
117 /**
118 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
119 * if this queue has no elements with an expired delay.
120 *
121 * @return the head of this queue, or <tt>null</tt> if this
122 * queue has no elements with an expired delay
123 */
124 public E poll() {
125 final ReentrantLock lock = this.lock;
126 lock.lock();
127 try {
128 E first = q.peek();
129 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
130 return null;
131 else {
132 E x = q.poll();
133 assert x != null;
134 if (q.size() != 0)
135 available.signalAll();
136 return x;
137 }
138 } finally {
139 lock.unlock();
140 }
141 }
142
143 /**
144 * Retrieves and removes the head of this queue, waiting if necessary
145 * until an element with an expired delay is available on this queue.
146 *
147 * @return the head of this queue
148 * @throws InterruptedException {@inheritDoc}
149 */
150 public E take() throws InterruptedException {
151 final ReentrantLock lock = this.lock;
152 lock.lockInterruptibly();
153 try {
154 for (;;) {
155 E first = q.peek();
156 if (first == null) {
157 available.await();
158 } else {
159 long delay = first.getDelay(TimeUnit.NANOSECONDS);
160 if (delay > 0) {
161 long tl = available.awaitNanos(delay);
162 } else {
163 E x = q.poll();
164 assert x != null;
165 if (q.size() != 0)
166 available.signalAll(); // wake up other takers
167 return x;
168
169 }
170 }
171 }
172 } finally {
173 lock.unlock();
174 }
175 }
176
177 /**
178 * Retrieves and removes the head of this queue, waiting if necessary
179 * until an element with an expired delay is available on this queue,
180 * or the specified wait time expires.
181 *
182 * @return the head of this queue, or <tt>null</tt> if the
183 * specified waiting time elapses before an element with
184 * an expired delay becomes available
185 * @throws InterruptedException {@inheritDoc}
186 */
187 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
188 final ReentrantLock lock = this.lock;
189 lock.lockInterruptibly();
190 long nanos = unit.toNanos(timeout);
191 try {
192 for (;;) {
193 E first = q.peek();
194 if (first == null) {
195 if (nanos <= 0)
196 return null;
197 else
198 nanos = available.awaitNanos(nanos);
199 } else {
200 long delay = first.getDelay(TimeUnit.NANOSECONDS);
201 if (delay > 0) {
202 if (delay > nanos)
203 delay = nanos;
204 long timeLeft = available.awaitNanos(delay);
205 nanos -= delay - timeLeft;
206 } else {
207 E x = q.poll();
208 assert x != null;
209 if (q.size() != 0)
210 available.signalAll();
211 return x;
212 }
213 }
214 }
215 } finally {
216 lock.unlock();
217 }
218 }
219
220 /**
221 * Retrieves, but does not remove, the head of this queue,
222 * or returns <tt>null</tt> if this queue is empty.
223 * Unlike <tt>poll</tt>, this method can be used to inspect
224 * elements that have not yet expired.
225 *
226 * @return the head of this queue, or <tt>null</tt> if this
227 * queue is empty.
228 */
229 public E peek() {
230 final ReentrantLock lock = this.lock;
231 lock.lock();
232 try {
233 return q.peek();
234 } finally {
235 lock.unlock();
236 }
237 }
238
239 public int size() {
240 final ReentrantLock lock = this.lock;
241 lock.lock();
242 try {
243 return q.size();
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 /**
250 * @throws UnsupportedOperationException {@inheritDoc}
251 * @throws ClassCastException {@inheritDoc}
252 * @throws NullPointerException {@inheritDoc}
253 * @throws IllegalArgumentException {@inheritDoc}
254 */
255 public int drainTo(Collection<? super E> c) {
256 if (c == null)
257 throw new NullPointerException();
258 if (c == this)
259 throw new IllegalArgumentException();
260 final ReentrantLock lock = this.lock;
261 lock.lock();
262 try {
263 int n = 0;
264 for (;;) {
265 E first = q.peek();
266 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
267 break;
268 c.add(q.poll());
269 ++n;
270 }
271 if (n > 0)
272 available.signalAll();
273 return n;
274 } finally {
275 lock.unlock();
276 }
277 }
278
279 /**
280 * @throws UnsupportedOperationException {@inheritDoc}
281 * @throws ClassCastException {@inheritDoc}
282 * @throws NullPointerException {@inheritDoc}
283 * @throws IllegalArgumentException {@inheritDoc}
284 */
285 public int drainTo(Collection<? super E> c, int maxElements) {
286 if (c == null)
287 throw new NullPointerException();
288 if (c == this)
289 throw new IllegalArgumentException();
290 if (maxElements <= 0)
291 return 0;
292 final ReentrantLock lock = this.lock;
293 lock.lock();
294 try {
295 int n = 0;
296 while (n < maxElements) {
297 E first = q.peek();
298 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
299 break;
300 c.add(q.poll());
301 ++n;
302 }
303 if (n > 0)
304 available.signalAll();
305 return n;
306 } finally {
307 lock.unlock();
308 }
309 }
310
311 /**
312 * Atomically removes all of the elements from this delay queue.
313 * The queue will be empty after this call returns.
314 * Elements with an unexpired delay are not waited for; they are
315 * simply discarded from the queue.
316 */
317 public void clear() {
318 final ReentrantLock lock = this.lock;
319 lock.lock();
320 try {
321 q.clear();
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 /**
328 * Always returns <tt>Integer.MAX_VALUE</tt> because
329 * a <tt>DelayQueue</tt> is not capacity constrained.
330 *
331 * @return <tt>Integer.MAX_VALUE</tt>
332 */
333 public int remainingCapacity() {
334 return Integer.MAX_VALUE;
335 }
336
337 /**
338 * Returns an array containing all of the elements in this queue.
339 * The returned array elements are in no particular order.
340 *
341 * <p>The returned array will be "safe" in that no references to it are
342 * maintained by this queue. (In other words, this method must allocate
343 * a new array). The caller is thus free to modify the returned array.
344 *
345 * <p>This method acts as bridge between array-based and collection-based
346 * APIs.
347 *
348 * @return an array containing all of the elements in this queue
349 */
350 public Object[] toArray() {
351 final ReentrantLock lock = this.lock;
352 lock.lock();
353 try {
354 return q.toArray();
355 } finally {
356 lock.unlock();
357 }
358 }
359
360 /**
361 * Returns an array containing all of the elements in this queue; the
362 * runtime type of the returned array is that of the specified array.
363 * The returned array elements are in no particular order.
364 * If the queue fits in the specified array, it is returned therein.
365 * Otherwise, a new array is allocated with the runtime type of the
366 * specified array and the size of this queue.
367 *
368 * <p>If this queue fits in the specified array with room to spare
369 * (i.e., the array has more elements than this queue), the element in
370 * the array immediately following the end of the queue is set to
371 * <tt>null</tt>.
372 *
373 * <p>Like the {@link #toArray()} method, this method acts as bridge between
374 * array-based and collection-based APIs. Further, this method allows
375 * precise control over the runtime type of the output array, and may,
376 * under certain circumstances, be used to save allocation costs.
377 *
378 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
379 * The following code can be used to dump the queue into a newly
380 * allocated array of <tt>String</tt>:
381 *
382 * <pre>
383 * String[] y = x.toArray(new String[0]);</pre>
384 *
385 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
386 * <tt>toArray()</tt>.
387 *
388 * @param a the array into which the elements of the queue are to
389 * be stored, if it is big enough; otherwise, a new array of the
390 * same runtime type is allocated for this purpose
391 * @return an array containing all of the elements in this queue
392 * @throws ArrayStoreException if the runtime type of the specified array
393 * is not a supertype of the runtime type of every element in
394 * this queue
395 * @throws NullPointerException if the specified array is null
396 */
397 public <T> T[] toArray(T[] a) {
398 final ReentrantLock lock = this.lock;
399 lock.lock();
400 try {
401 return q.toArray(a);
402 } finally {
403 lock.unlock();
404 }
405 }
406
407 /**
408 * Removes a single instance of the specified element from this
409 * queue, if it is present.
410 */
411 public boolean remove(Object o) {
412 final ReentrantLock lock = this.lock;
413 lock.lock();
414 try {
415 return q.remove(o);
416 } finally {
417 lock.unlock();
418 }
419 }
420
421 /**
422 * Returns an iterator over the elements in this queue. The iterator
423 * does not return the elements in any particular order. The
424 * returned iterator is a thread-safe "fast-fail" iterator that will
425 * throw {@link ConcurrentModificationException}
426 * upon detected interference.
427 *
428 * @return an iterator over the elements in this queue
429 */
430 public Iterator<E> iterator() {
431 final ReentrantLock lock = this.lock;
432 lock.lock();
433 try {
434 return new Itr<E>(q.iterator());
435 } finally {
436 lock.unlock();
437 }
438 }
439
440 private class Itr<E> implements Iterator<E> {
441 private final Iterator<E> iter;
442 Itr(Iterator<E> i) {
443 iter = i;
444 }
445
446 public boolean hasNext() {
447 return iter.hasNext();
448 }
449
450 public E next() {
451 final ReentrantLock lock = DelayQueue.this.lock;
452 lock.lock();
453 try {
454 return iter.next();
455 } finally {
456 lock.unlock();
457 }
458 }
459
460 public void remove() {
461 final ReentrantLock lock = DelayQueue.this.lock;
462 lock.lock();
463 try {
464 iter.remove();
465 } finally {
466 lock.unlock();
467 }
468 }
469 }
470
471 }