ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.35
Committed: Fri Jun 10 18:12:59 2005 UTC (18 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.34: +4 -3 lines
Log Message:
Fix peek spec

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
14     * <tt>Delayed</tt> elements, in which an element can only be taken
15     * when its delay has expired. The <em>head</em> of the queue is that
16     * <tt>Delayed</tt> element whose delay expired furthest in the
17 jsr166 1.32 * past. If no delay has expired there is no head and <tt>poll</tt>
18 dl 1.27 * will return <tt>null</tt>. Expiration occurs when an element's
19     * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20     * than or equal to zero. This queue does not permit <tt>null</tt>
21     * elements.
22 dl 1.25 *
23 dl 1.26 * <p>This class and its iterator implement all of the
24     * <em>optional</em> methods of the {@link Collection} and {@link
25 jsr166 1.29 * Iterator} interfaces.
26 dl 1.26 *
27 dl 1.25 * <p>This class is a member of the
28     * <a href="{@docRoot}/../guide/collections/index.html">
29     * Java Collections Framework</a>.
30     *
31 dl 1.4 * @since 1.5
32     * @author Doug Lea
33 dl 1.20 * @param <E> the type of elements held in this collection
34 dl 1.19 */
35 tim 1.1
36 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
37     implements BlockingQueue<E> {
38 tim 1.6
39 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
40 dl 1.22 private transient final Condition available = lock.newCondition();
41 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
42 tim 1.1
43 dl 1.4 /**
44 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
45 dl 1.4 */
46 tim 1.1 public DelayQueue() {}
47    
48 tim 1.6 /**
49 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50 dholmes 1.7 * given collection of {@link Delayed} instances.
51     *
52 jsr166 1.32 * @param c the collection of elements to initially contain
53     * @throws NullPointerException if the specified collection or any
54     * of its elements are null
55 tim 1.6 */
56     public DelayQueue(Collection<? extends E> c) {
57     this.addAll(c);
58     }
59    
60 dholmes 1.7 /**
61 dl 1.16 * Inserts the specified element into this delay queue.
62 dholmes 1.7 *
63 jsr166 1.30 * @param e the element to add
64 jsr166 1.32 * @return <tt>true</tt> (as per the spec for {@link Collection#add})
65     * @throws NullPointerException if the specified element is null
66     */
67     public boolean add(E e) {
68     return offer(e);
69     }
70    
71     /**
72     * Inserts the specified element into this delay queue.
73     *
74     * @param e the element to add
75 dholmes 1.7 * @return <tt>true</tt>
76 jsr166 1.32 * @throws NullPointerException if the specified element is null
77 dholmes 1.7 */
78 jsr166 1.30 public boolean offer(E e) {
79 dl 1.21 final ReentrantLock lock = this.lock;
80 dl 1.2 lock.lock();
81     try {
82 dl 1.3 E first = q.peek();
83 jsr166 1.30 q.offer(e);
84     if (first == null || e.compareTo(first) < 0)
85 dl 1.4 available.signalAll();
86 dl 1.2 return true;
87 tim 1.13 } finally {
88 dl 1.2 lock.unlock();
89     }
90     }
91    
92 dholmes 1.7 /**
93 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
94 dholmes 1.7 * unbounded this method will never block.
95 jsr166 1.32 *
96 jsr166 1.30 * @param e the element to add
97 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
98 dholmes 1.7 */
99 jsr166 1.30 public void put(E e) {
100     offer(e);
101 dl 1.2 }
102    
103 dholmes 1.7 /**
104 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
105 dholmes 1.7 * unbounded this method will never block.
106 jsr166 1.32 *
107 jsr166 1.30 * @param e the element to add
108 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
109 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
110     * @return <tt>true</tt>
111 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
112 dholmes 1.7 */
113 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
114     return offer(e);
115 dl 1.2 }
116    
117 dholmes 1.7 /**
118 jsr166 1.32 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
119     * if this queue has no elements with an expired delay.
120 dholmes 1.7 *
121 jsr166 1.32 * @return the head of this queue, or <tt>null</tt> if this
122     * queue has no elements with an expired delay
123 dholmes 1.7 */
124 jsr166 1.32 public E poll() {
125     final ReentrantLock lock = this.lock;
126     lock.lock();
127     try {
128     E first = q.peek();
129     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
130     return null;
131     else {
132     E x = q.poll();
133     assert x != null;
134     if (q.size() != 0)
135     available.signalAll();
136     return x;
137     }
138     } finally {
139     lock.unlock();
140     }
141 dl 1.2 }
142    
143 dl 1.27 /**
144 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
145     * until an element with an expired delay is available on this queue.
146     *
147 dl 1.27 * @return the head of this queue
148 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
149 dl 1.27 */
150 dl 1.3 public E take() throws InterruptedException {
151 dl 1.21 final ReentrantLock lock = this.lock;
152 dl 1.2 lock.lockInterruptibly();
153     try {
154     for (;;) {
155 dl 1.3 E first = q.peek();
156 dl 1.12 if (first == null) {
157 dl 1.4 available.await();
158 tim 1.13 } else {
159 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
160 dl 1.12 if (delay > 0) {
161     long tl = available.awaitNanos(delay);
162 tim 1.13 } else {
163 dl 1.3 E x = q.poll();
164 dl 1.2 assert x != null;
165     if (q.size() != 0)
166 dl 1.4 available.signalAll(); // wake up other takers
167 dl 1.2 return x;
168 tim 1.6
169 dl 1.2 }
170     }
171     }
172 tim 1.13 } finally {
173 dl 1.2 lock.unlock();
174     }
175     }
176    
177 dl 1.27 /**
178 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
179     * until an element with an expired delay is available on this queue,
180     * or the specified wait time expires.
181     *
182 dl 1.27 * @return the head of this queue, or <tt>null</tt> if the
183 jsr166 1.32 * specified waiting time elapses before an element with
184     * an expired delay becomes available
185     * @throws InterruptedException {@inheritDoc}
186 dl 1.27 */
187     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
188 dl 1.21 final ReentrantLock lock = this.lock;
189 dl 1.2 lock.lockInterruptibly();
190 dl 1.27 long nanos = unit.toNanos(timeout);
191 dl 1.2 try {
192     for (;;) {
193 dl 1.3 E first = q.peek();
194 dl 1.2 if (first == null) {
195     if (nanos <= 0)
196     return null;
197     else
198 dl 1.4 nanos = available.awaitNanos(nanos);
199 tim 1.13 } else {
200 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
201     if (delay > 0) {
202     if (delay > nanos)
203     delay = nanos;
204 dl 1.4 long timeLeft = available.awaitNanos(delay);
205 dl 1.2 nanos -= delay - timeLeft;
206 tim 1.13 } else {
207 dl 1.3 E x = q.poll();
208 dl 1.2 assert x != null;
209     if (q.size() != 0)
210 tim 1.6 available.signalAll();
211 dl 1.2 return x;
212     }
213     }
214     }
215 tim 1.13 } finally {
216 dl 1.2 lock.unlock();
217     }
218     }
219    
220 dl 1.27 /**
221     * Retrieves, but does not remove, the head of this queue,
222 dl 1.35 * or returns <tt>null</tt> if this queue is empty.
223     * Unlike <tt>poll</tt>, this method can be used to inspect
224     * elements that have not yet expired.
225 dl 1.27 *
226 jsr166 1.32 * @return the head of this queue, or <tt>null</tt> if this
227 dl 1.35 * queue is empty.
228 dl 1.27 */
229 dl 1.3 public E peek() {
230 dl 1.21 final ReentrantLock lock = this.lock;
231 dl 1.2 lock.lock();
232     try {
233     return q.peek();
234 tim 1.13 } finally {
235 dl 1.2 lock.unlock();
236     }
237 tim 1.1 }
238    
239 dl 1.2 public int size() {
240 dl 1.21 final ReentrantLock lock = this.lock;
241 dl 1.2 lock.lock();
242     try {
243     return q.size();
244 tim 1.13 } finally {
245 dl 1.2 lock.unlock();
246     }
247     }
248    
249 jsr166 1.32 /**
250     * @throws UnsupportedOperationException {@inheritDoc}
251     * @throws ClassCastException {@inheritDoc}
252     * @throws NullPointerException {@inheritDoc}
253     * @throws IllegalArgumentException {@inheritDoc}
254     */
255 dl 1.17 public int drainTo(Collection<? super E> c) {
256     if (c == null)
257     throw new NullPointerException();
258     if (c == this)
259     throw new IllegalArgumentException();
260 dl 1.21 final ReentrantLock lock = this.lock;
261 dl 1.17 lock.lock();
262     try {
263     int n = 0;
264     for (;;) {
265     E first = q.peek();
266     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
267     break;
268     c.add(q.poll());
269     ++n;
270     }
271     if (n > 0)
272     available.signalAll();
273     return n;
274     } finally {
275     lock.unlock();
276     }
277     }
278    
279 jsr166 1.32 /**
280     * @throws UnsupportedOperationException {@inheritDoc}
281     * @throws ClassCastException {@inheritDoc}
282     * @throws NullPointerException {@inheritDoc}
283     * @throws IllegalArgumentException {@inheritDoc}
284     */
285 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
286     if (c == null)
287     throw new NullPointerException();
288     if (c == this)
289     throw new IllegalArgumentException();
290     if (maxElements <= 0)
291     return 0;
292 dl 1.21 final ReentrantLock lock = this.lock;
293 dl 1.17 lock.lock();
294     try {
295     int n = 0;
296     while (n < maxElements) {
297     E first = q.peek();
298     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
299     break;
300     c.add(q.poll());
301     ++n;
302     }
303     if (n > 0)
304     available.signalAll();
305     return n;
306     } finally {
307     lock.unlock();
308     }
309     }
310    
311 dholmes 1.7 /**
312     * Atomically removes all of the elements from this delay queue.
313     * The queue will be empty after this call returns.
314 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
315     * simply discarded from the queue.
316 dholmes 1.7 */
317 dl 1.2 public void clear() {
318 dl 1.21 final ReentrantLock lock = this.lock;
319 dl 1.2 lock.lock();
320     try {
321     q.clear();
322 tim 1.13 } finally {
323 dl 1.2 lock.unlock();
324     }
325     }
326 tim 1.1
327 dholmes 1.7 /**
328     * Always returns <tt>Integer.MAX_VALUE</tt> because
329     * a <tt>DelayQueue</tt> is not capacity constrained.
330 jsr166 1.32 *
331 dholmes 1.7 * @return <tt>Integer.MAX_VALUE</tt>
332     */
333 dl 1.2 public int remainingCapacity() {
334     return Integer.MAX_VALUE;
335 tim 1.1 }
336 dl 1.2
337 jsr166 1.32 /**
338     * Returns an array containing all of the elements in this queue.
339     * The returned array elements are in no particular order.
340     *
341     * <p>The returned array will be "safe" in that no references to it are
342     * maintained by this queue. (In other words, this method must allocate
343     * a new array). The caller is thus free to modify the returned array.
344 jsr166 1.33 *
345 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
346     * APIs.
347     *
348     * @return an array containing all of the elements in this queue
349     */
350 dl 1.2 public Object[] toArray() {
351 dl 1.21 final ReentrantLock lock = this.lock;
352 dl 1.2 lock.lock();
353     try {
354     return q.toArray();
355 tim 1.13 } finally {
356 dl 1.2 lock.unlock();
357     }
358 tim 1.1 }
359 dl 1.2
360 jsr166 1.32 /**
361     * Returns an array containing all of the elements in this queue; the
362     * runtime type of the returned array is that of the specified array.
363     * The returned array elements are in no particular order.
364     * If the queue fits in the specified array, it is returned therein.
365     * Otherwise, a new array is allocated with the runtime type of the
366     * specified array and the size of this queue.
367     *
368     * <p>If this queue fits in the specified array with room to spare
369     * (i.e., the array has more elements than this queue), the element in
370     * the array immediately following the end of the queue is set to
371     * <tt>null</tt>.
372     *
373     * <p>Like the {@link #toArray()} method, this method acts as bridge between
374     * array-based and collection-based APIs. Further, this method allows
375     * precise control over the runtime type of the output array, and may,
376     * under certain circumstances, be used to save allocation costs.
377     *
378     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
379     * The following code can be used to dump the queue into a newly
380     * allocated array of <tt>String</tt>:
381     *
382     * <pre>
383     * String[] y = x.toArray(new String[0]);</pre>
384     *
385     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
386     * <tt>toArray()</tt>.
387     *
388     * @param a the array into which the elements of the queue are to
389     * be stored, if it is big enough; otherwise, a new array of the
390     * same runtime type is allocated for this purpose
391     * @return an array containing all of the elements in this queue
392     * @throws ArrayStoreException if the runtime type of the specified array
393     * is not a supertype of the runtime type of every element in
394     * this queue
395     * @throws NullPointerException if the specified array is null
396     */
397     public <T> T[] toArray(T[] a) {
398 dl 1.21 final ReentrantLock lock = this.lock;
399 dl 1.2 lock.lock();
400     try {
401 jsr166 1.32 return q.toArray(a);
402 tim 1.13 } finally {
403 dl 1.2 lock.unlock();
404     }
405 tim 1.1 }
406    
407 dl 1.26 /**
408     * Removes a single instance of the specified element from this
409 dl 1.28 * queue, if it is present.
410 dl 1.26 */
411 dholmes 1.7 public boolean remove(Object o) {
412 dl 1.21 final ReentrantLock lock = this.lock;
413 dl 1.2 lock.lock();
414     try {
415 dholmes 1.7 return q.remove(o);
416 tim 1.13 } finally {
417 dl 1.2 lock.unlock();
418     }
419     }
420    
421 dholmes 1.7 /**
422     * Returns an iterator over the elements in this queue. The iterator
423 dl 1.8 * does not return the elements in any particular order. The
424 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
425 jsr166 1.29 * throw {@link ConcurrentModificationException}
426 dl 1.8 * upon detected interference.
427 dholmes 1.7 *
428 jsr166 1.32 * @return an iterator over the elements in this queue
429 dholmes 1.7 */
430 dl 1.3 public Iterator<E> iterator() {
431 dl 1.21 final ReentrantLock lock = this.lock;
432 dl 1.2 lock.lock();
433     try {
434 dl 1.34 return new Itr<E>(q.iterator());
435 tim 1.13 } finally {
436 dl 1.2 lock.unlock();
437     }
438     }
439    
440 dl 1.3 private class Itr<E> implements Iterator<E> {
441     private final Iterator<E> iter;
442 tim 1.6 Itr(Iterator<E> i) {
443     iter = i;
444 dl 1.2 }
445    
446 tim 1.6 public boolean hasNext() {
447 dl 1.2 return iter.hasNext();
448 tim 1.6 }
449    
450     public E next() {
451 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
452 dl 1.2 lock.lock();
453     try {
454     return iter.next();
455 tim 1.13 } finally {
456 dl 1.2 lock.unlock();
457     }
458 tim 1.6 }
459    
460     public void remove() {
461 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
462 dl 1.2 lock.lock();
463     try {
464     iter.remove();
465 tim 1.13 } finally {
466 dl 1.2 lock.unlock();
467     }
468 tim 1.6 }
469 tim 1.1 }
470    
471     }