ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.34
Committed: Wed May 25 14:05:27 2005 UTC (19 years ago) by dl
Branch: MAIN
Changes since 1.33: +1 -1 lines
Log Message:
Avoid generics warnings; clarify javadocs

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
14     * <tt>Delayed</tt> elements, in which an element can only be taken
15     * when its delay has expired. The <em>head</em> of the queue is that
16     * <tt>Delayed</tt> element whose delay expired furthest in the
17 jsr166 1.32 * past. If no delay has expired there is no head and <tt>poll</tt>
18 dl 1.27 * will return <tt>null</tt>. Expiration occurs when an element's
19     * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20     * than or equal to zero. This queue does not permit <tt>null</tt>
21     * elements.
22 dl 1.25 *
23 dl 1.26 * <p>This class and its iterator implement all of the
24     * <em>optional</em> methods of the {@link Collection} and {@link
25 jsr166 1.29 * Iterator} interfaces.
26 dl 1.26 *
27 dl 1.25 * <p>This class is a member of the
28     * <a href="{@docRoot}/../guide/collections/index.html">
29     * Java Collections Framework</a>.
30     *
31 dl 1.4 * @since 1.5
32     * @author Doug Lea
33 dl 1.20 * @param <E> the type of elements held in this collection
34 dl 1.19 */
35 tim 1.1
36 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
37     implements BlockingQueue<E> {
38 tim 1.6
39 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
40 dl 1.22 private transient final Condition available = lock.newCondition();
41 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
42 tim 1.1
43 dl 1.4 /**
44 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
45 dl 1.4 */
46 tim 1.1 public DelayQueue() {}
47    
48 tim 1.6 /**
49 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50 dholmes 1.7 * given collection of {@link Delayed} instances.
51     *
52 jsr166 1.32 * @param c the collection of elements to initially contain
53     * @throws NullPointerException if the specified collection or any
54     * of its elements are null
55 tim 1.6 */
56     public DelayQueue(Collection<? extends E> c) {
57     this.addAll(c);
58     }
59    
60 dholmes 1.7 /**
61 dl 1.16 * Inserts the specified element into this delay queue.
62 dholmes 1.7 *
63 jsr166 1.30 * @param e the element to add
64 jsr166 1.32 * @return <tt>true</tt> (as per the spec for {@link Collection#add})
65     * @throws NullPointerException if the specified element is null
66     */
67     public boolean add(E e) {
68     return offer(e);
69     }
70    
71     /**
72     * Inserts the specified element into this delay queue.
73     *
74     * @param e the element to add
75 dholmes 1.7 * @return <tt>true</tt>
76 jsr166 1.32 * @throws NullPointerException if the specified element is null
77 dholmes 1.7 */
78 jsr166 1.30 public boolean offer(E e) {
79 dl 1.21 final ReentrantLock lock = this.lock;
80 dl 1.2 lock.lock();
81     try {
82 dl 1.3 E first = q.peek();
83 jsr166 1.30 q.offer(e);
84     if (first == null || e.compareTo(first) < 0)
85 dl 1.4 available.signalAll();
86 dl 1.2 return true;
87 tim 1.13 } finally {
88 dl 1.2 lock.unlock();
89     }
90     }
91    
92 dholmes 1.7 /**
93 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
94 dholmes 1.7 * unbounded this method will never block.
95 jsr166 1.32 *
96 jsr166 1.30 * @param e the element to add
97 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
98 dholmes 1.7 */
99 jsr166 1.30 public void put(E e) {
100     offer(e);
101 dl 1.2 }
102    
103 dholmes 1.7 /**
104 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
105 dholmes 1.7 * unbounded this method will never block.
106 jsr166 1.32 *
107 jsr166 1.30 * @param e the element to add
108 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
109 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
110     * @return <tt>true</tt>
111 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
112 dholmes 1.7 */
113 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
114     return offer(e);
115 dl 1.2 }
116    
117 dholmes 1.7 /**
118 jsr166 1.32 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
119     * if this queue has no elements with an expired delay.
120 dholmes 1.7 *
121 jsr166 1.32 * @return the head of this queue, or <tt>null</tt> if this
122     * queue has no elements with an expired delay
123 dholmes 1.7 */
124 jsr166 1.32 public E poll() {
125     final ReentrantLock lock = this.lock;
126     lock.lock();
127     try {
128     E first = q.peek();
129     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
130     return null;
131     else {
132     E x = q.poll();
133     assert x != null;
134     if (q.size() != 0)
135     available.signalAll();
136     return x;
137     }
138     } finally {
139     lock.unlock();
140     }
141 dl 1.2 }
142    
143 dl 1.27 /**
144 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
145     * until an element with an expired delay is available on this queue.
146     *
147 dl 1.27 * @return the head of this queue
148 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
149 dl 1.27 */
150 dl 1.3 public E take() throws InterruptedException {
151 dl 1.21 final ReentrantLock lock = this.lock;
152 dl 1.2 lock.lockInterruptibly();
153     try {
154     for (;;) {
155 dl 1.3 E first = q.peek();
156 dl 1.12 if (first == null) {
157 dl 1.4 available.await();
158 tim 1.13 } else {
159 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
160 dl 1.12 if (delay > 0) {
161     long tl = available.awaitNanos(delay);
162 tim 1.13 } else {
163 dl 1.3 E x = q.poll();
164 dl 1.2 assert x != null;
165     if (q.size() != 0)
166 dl 1.4 available.signalAll(); // wake up other takers
167 dl 1.2 return x;
168 tim 1.6
169 dl 1.2 }
170     }
171     }
172 tim 1.13 } finally {
173 dl 1.2 lock.unlock();
174     }
175     }
176    
177 dl 1.27 /**
178 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
179     * until an element with an expired delay is available on this queue,
180     * or the specified wait time expires.
181     *
182 dl 1.27 * @return the head of this queue, or <tt>null</tt> if the
183 jsr166 1.32 * specified waiting time elapses before an element with
184     * an expired delay becomes available
185     * @throws InterruptedException {@inheritDoc}
186 dl 1.27 */
187     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
188 dl 1.21 final ReentrantLock lock = this.lock;
189 dl 1.2 lock.lockInterruptibly();
190 dl 1.27 long nanos = unit.toNanos(timeout);
191 dl 1.2 try {
192     for (;;) {
193 dl 1.3 E first = q.peek();
194 dl 1.2 if (first == null) {
195     if (nanos <= 0)
196     return null;
197     else
198 dl 1.4 nanos = available.awaitNanos(nanos);
199 tim 1.13 } else {
200 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
201     if (delay > 0) {
202     if (delay > nanos)
203     delay = nanos;
204 dl 1.4 long timeLeft = available.awaitNanos(delay);
205 dl 1.2 nanos -= delay - timeLeft;
206 tim 1.13 } else {
207 dl 1.3 E x = q.poll();
208 dl 1.2 assert x != null;
209     if (q.size() != 0)
210 tim 1.6 available.signalAll();
211 dl 1.2 return x;
212     }
213     }
214     }
215 tim 1.13 } finally {
216 dl 1.2 lock.unlock();
217     }
218     }
219    
220 dl 1.27 /**
221     * Retrieves, but does not remove, the head of this queue,
222     * returning <tt>null</tt> if this queue has no elements with an
223 jsr166 1.31 * expired delay.
224 dl 1.27 *
225 jsr166 1.32 * @return the head of this queue, or <tt>null</tt> if this
226     * queue has no elements with an expired delay
227 dl 1.27 */
228 dl 1.3 public E peek() {
229 dl 1.21 final ReentrantLock lock = this.lock;
230 dl 1.2 lock.lock();
231     try {
232     return q.peek();
233 tim 1.13 } finally {
234 dl 1.2 lock.unlock();
235     }
236 tim 1.1 }
237    
238 dl 1.2 public int size() {
239 dl 1.21 final ReentrantLock lock = this.lock;
240 dl 1.2 lock.lock();
241     try {
242     return q.size();
243 tim 1.13 } finally {
244 dl 1.2 lock.unlock();
245     }
246     }
247    
248 jsr166 1.32 /**
249     * @throws UnsupportedOperationException {@inheritDoc}
250     * @throws ClassCastException {@inheritDoc}
251     * @throws NullPointerException {@inheritDoc}
252     * @throws IllegalArgumentException {@inheritDoc}
253     */
254 dl 1.17 public int drainTo(Collection<? super E> c) {
255     if (c == null)
256     throw new NullPointerException();
257     if (c == this)
258     throw new IllegalArgumentException();
259 dl 1.21 final ReentrantLock lock = this.lock;
260 dl 1.17 lock.lock();
261     try {
262     int n = 0;
263     for (;;) {
264     E first = q.peek();
265     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
266     break;
267     c.add(q.poll());
268     ++n;
269     }
270     if (n > 0)
271     available.signalAll();
272     return n;
273     } finally {
274     lock.unlock();
275     }
276     }
277    
278 jsr166 1.32 /**
279     * @throws UnsupportedOperationException {@inheritDoc}
280     * @throws ClassCastException {@inheritDoc}
281     * @throws NullPointerException {@inheritDoc}
282     * @throws IllegalArgumentException {@inheritDoc}
283     */
284 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
285     if (c == null)
286     throw new NullPointerException();
287     if (c == this)
288     throw new IllegalArgumentException();
289     if (maxElements <= 0)
290     return 0;
291 dl 1.21 final ReentrantLock lock = this.lock;
292 dl 1.17 lock.lock();
293     try {
294     int n = 0;
295     while (n < maxElements) {
296     E first = q.peek();
297     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
298     break;
299     c.add(q.poll());
300     ++n;
301     }
302     if (n > 0)
303     available.signalAll();
304     return n;
305     } finally {
306     lock.unlock();
307     }
308     }
309    
310 dholmes 1.7 /**
311     * Atomically removes all of the elements from this delay queue.
312     * The queue will be empty after this call returns.
313 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
314     * simply discarded from the queue.
315 dholmes 1.7 */
316 dl 1.2 public void clear() {
317 dl 1.21 final ReentrantLock lock = this.lock;
318 dl 1.2 lock.lock();
319     try {
320     q.clear();
321 tim 1.13 } finally {
322 dl 1.2 lock.unlock();
323     }
324     }
325 tim 1.1
326 dholmes 1.7 /**
327     * Always returns <tt>Integer.MAX_VALUE</tt> because
328     * a <tt>DelayQueue</tt> is not capacity constrained.
329 jsr166 1.32 *
330 dholmes 1.7 * @return <tt>Integer.MAX_VALUE</tt>
331     */
332 dl 1.2 public int remainingCapacity() {
333     return Integer.MAX_VALUE;
334 tim 1.1 }
335 dl 1.2
336 jsr166 1.32 /**
337     * Returns an array containing all of the elements in this queue.
338     * The returned array elements are in no particular order.
339     *
340     * <p>The returned array will be "safe" in that no references to it are
341     * maintained by this queue. (In other words, this method must allocate
342     * a new array). The caller is thus free to modify the returned array.
343 jsr166 1.33 *
344 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
345     * APIs.
346     *
347     * @return an array containing all of the elements in this queue
348     */
349 dl 1.2 public Object[] toArray() {
350 dl 1.21 final ReentrantLock lock = this.lock;
351 dl 1.2 lock.lock();
352     try {
353     return q.toArray();
354 tim 1.13 } finally {
355 dl 1.2 lock.unlock();
356     }
357 tim 1.1 }
358 dl 1.2
359 jsr166 1.32 /**
360     * Returns an array containing all of the elements in this queue; the
361     * runtime type of the returned array is that of the specified array.
362     * The returned array elements are in no particular order.
363     * If the queue fits in the specified array, it is returned therein.
364     * Otherwise, a new array is allocated with the runtime type of the
365     * specified array and the size of this queue.
366     *
367     * <p>If this queue fits in the specified array with room to spare
368     * (i.e., the array has more elements than this queue), the element in
369     * the array immediately following the end of the queue is set to
370     * <tt>null</tt>.
371     *
372     * <p>Like the {@link #toArray()} method, this method acts as bridge between
373     * array-based and collection-based APIs. Further, this method allows
374     * precise control over the runtime type of the output array, and may,
375     * under certain circumstances, be used to save allocation costs.
376     *
377     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
378     * The following code can be used to dump the queue into a newly
379     * allocated array of <tt>String</tt>:
380     *
381     * <pre>
382     * String[] y = x.toArray(new String[0]);</pre>
383     *
384     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
385     * <tt>toArray()</tt>.
386     *
387     * @param a the array into which the elements of the queue are to
388     * be stored, if it is big enough; otherwise, a new array of the
389     * same runtime type is allocated for this purpose
390     * @return an array containing all of the elements in this queue
391     * @throws ArrayStoreException if the runtime type of the specified array
392     * is not a supertype of the runtime type of every element in
393     * this queue
394     * @throws NullPointerException if the specified array is null
395     */
396     public <T> T[] toArray(T[] a) {
397 dl 1.21 final ReentrantLock lock = this.lock;
398 dl 1.2 lock.lock();
399     try {
400 jsr166 1.32 return q.toArray(a);
401 tim 1.13 } finally {
402 dl 1.2 lock.unlock();
403     }
404 tim 1.1 }
405    
406 dl 1.26 /**
407     * Removes a single instance of the specified element from this
408 dl 1.28 * queue, if it is present.
409 dl 1.26 */
410 dholmes 1.7 public boolean remove(Object o) {
411 dl 1.21 final ReentrantLock lock = this.lock;
412 dl 1.2 lock.lock();
413     try {
414 dholmes 1.7 return q.remove(o);
415 tim 1.13 } finally {
416 dl 1.2 lock.unlock();
417     }
418     }
419    
420 dholmes 1.7 /**
421     * Returns an iterator over the elements in this queue. The iterator
422 dl 1.8 * does not return the elements in any particular order. The
423 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
424 jsr166 1.29 * throw {@link ConcurrentModificationException}
425 dl 1.8 * upon detected interference.
426 dholmes 1.7 *
427 jsr166 1.32 * @return an iterator over the elements in this queue
428 dholmes 1.7 */
429 dl 1.3 public Iterator<E> iterator() {
430 dl 1.21 final ReentrantLock lock = this.lock;
431 dl 1.2 lock.lock();
432     try {
433 dl 1.34 return new Itr<E>(q.iterator());
434 tim 1.13 } finally {
435 dl 1.2 lock.unlock();
436     }
437     }
438    
439 dl 1.3 private class Itr<E> implements Iterator<E> {
440     private final Iterator<E> iter;
441 tim 1.6 Itr(Iterator<E> i) {
442     iter = i;
443 dl 1.2 }
444    
445 tim 1.6 public boolean hasNext() {
446 dl 1.2 return iter.hasNext();
447 tim 1.6 }
448    
449     public E next() {
450 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
451 dl 1.2 lock.lock();
452     try {
453     return iter.next();
454 tim 1.13 } finally {
455 dl 1.2 lock.unlock();
456     }
457 tim 1.6 }
458    
459     public void remove() {
460 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
461 dl 1.2 lock.lock();
462     try {
463     iter.remove();
464 tim 1.13 } finally {
465 dl 1.2 lock.unlock();
466     }
467 tim 1.6 }
468 tim 1.1 }
469    
470     }