ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.29
Committed: Tue Apr 26 01:17:18 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.28: +3 -3 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
14     * <tt>Delayed</tt> elements, in which an element can only be taken
15     * when its delay has expired. The <em>head</em> of the queue is that
16     * <tt>Delayed</tt> element whose delay expired furthest in the
17     * past. If no delay has expired there is no head and <tt>poll</tt>
18     * will return <tt>null</tt>. Expiration occurs when an element's
19     * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20     * than or equal to zero. This queue does not permit <tt>null</tt>
21     * elements.
22 dl 1.25 *
23 dl 1.26 * <p>This class and its iterator implement all of the
24     * <em>optional</em> methods of the {@link Collection} and {@link
25 jsr166 1.29 * Iterator} interfaces.
26 dl 1.26 *
27 dl 1.25 * <p>This class is a member of the
28     * <a href="{@docRoot}/../guide/collections/index.html">
29     * Java Collections Framework</a>.
30     *
31 dl 1.4 * @since 1.5
32     * @author Doug Lea
33 dl 1.20 * @param <E> the type of elements held in this collection
34 dl 1.19 */
35 tim 1.1
36 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
37     implements BlockingQueue<E> {
38 tim 1.6
39 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
40 dl 1.22 private transient final Condition available = lock.newCondition();
41 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
42 tim 1.1
43 dl 1.4 /**
44 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
45 dl 1.4 */
46 tim 1.1 public DelayQueue() {}
47    
48 tim 1.6 /**
49 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50 dholmes 1.7 * given collection of {@link Delayed} instances.
51     *
52 dl 1.24 * @param c the collection
53 dholmes 1.7 * @throws NullPointerException if <tt>c</tt> or any element within it
54 jsr166 1.29 * is <tt>null</tt>.
55 tim 1.6 *
56     */
57     public DelayQueue(Collection<? extends E> c) {
58     this.addAll(c);
59     }
60    
61 dholmes 1.7 /**
62 dl 1.16 * Inserts the specified element into this delay queue.
63 dholmes 1.7 *
64 dl 1.14 * @param o the element to add
65 dholmes 1.7 * @return <tt>true</tt>
66 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
67 dholmes 1.7 */
68     public boolean offer(E o) {
69 dl 1.21 final ReentrantLock lock = this.lock;
70 dl 1.2 lock.lock();
71     try {
72 dl 1.3 E first = q.peek();
73 dholmes 1.7 q.offer(o);
74     if (first == null || o.compareTo(first) < 0)
75 dl 1.4 available.signalAll();
76 dl 1.2 return true;
77 tim 1.13 } finally {
78 dl 1.2 lock.unlock();
79     }
80     }
81    
82 dholmes 1.7
83     /**
84     * Adds the specified element to this delay queue. As the queue is
85     * unbounded this method will never block.
86 dl 1.14 * @param o the element to add
87     * @throws NullPointerException if the specified element is <tt>null</tt>.
88 dholmes 1.7 */
89     public void put(E o) {
90     offer(o);
91 dl 1.2 }
92    
93 dholmes 1.7 /**
94 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
95 dholmes 1.7 * unbounded this method will never block.
96 dl 1.14 * @param o the element to add
97     * @param timeout This parameter is ignored as the method never blocks
98 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
99     * @return <tt>true</tt>
100 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
101 dholmes 1.7 */
102 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
103 dholmes 1.7 return offer(o);
104 dl 1.2 }
105    
106 dholmes 1.7 /**
107     * Adds the specified element to this queue.
108 dl 1.14 * @param o the element to add
109 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
110     * <tt>Collection.add</tt>).
111     *
112 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
113 dholmes 1.7 */
114     public boolean add(E o) {
115     return offer(o);
116 dl 1.2 }
117    
118 dl 1.27 /**
119     * Retrieves and removes the head of this queue, waiting
120     * if no elements with an unexpired delay are present on this queue.
121     * @return the head of this queue
122     * @throws InterruptedException if interrupted while waiting.
123     */
124 dl 1.3 public E take() throws InterruptedException {
125 dl 1.21 final ReentrantLock lock = this.lock;
126 dl 1.2 lock.lockInterruptibly();
127     try {
128     for (;;) {
129 dl 1.3 E first = q.peek();
130 dl 1.12 if (first == null) {
131 dl 1.4 available.await();
132 tim 1.13 } else {
133 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
134 dl 1.12 if (delay > 0) {
135     long tl = available.awaitNanos(delay);
136 tim 1.13 } else {
137 dl 1.3 E x = q.poll();
138 dl 1.2 assert x != null;
139     if (q.size() != 0)
140 dl 1.4 available.signalAll(); // wake up other takers
141 dl 1.2 return x;
142 tim 1.6
143 dl 1.2 }
144     }
145     }
146 tim 1.13 } finally {
147 dl 1.2 lock.unlock();
148     }
149     }
150    
151 dl 1.27 /**
152     * Retrieves and removes the head of this queue, waiting
153     * if necessary up to the specified wait time if no elements with
154     * an unexpired delay are
155     * present on this queue.
156     * @param timeout how long to wait before giving up, in units of
157     * <tt>unit</tt>
158     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
159     * <tt>timeout</tt> parameter
160     * @return the head of this queue, or <tt>null</tt> if the
161     * specified waiting time elapses before an element with
162 dl 1.28 * an unexpired delay is present.
163 dl 1.27 * @throws InterruptedException if interrupted while waiting.
164     */
165     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
166 dl 1.21 final ReentrantLock lock = this.lock;
167 dl 1.2 lock.lockInterruptibly();
168 dl 1.27 long nanos = unit.toNanos(timeout);
169 dl 1.2 try {
170     for (;;) {
171 dl 1.3 E first = q.peek();
172 dl 1.2 if (first == null) {
173     if (nanos <= 0)
174     return null;
175     else
176 dl 1.4 nanos = available.awaitNanos(nanos);
177 tim 1.13 } else {
178 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
179     if (delay > 0) {
180     if (delay > nanos)
181     delay = nanos;
182 dl 1.4 long timeLeft = available.awaitNanos(delay);
183 dl 1.2 nanos -= delay - timeLeft;
184 tim 1.13 } else {
185 dl 1.3 E x = q.poll();
186 dl 1.2 assert x != null;
187     if (q.size() != 0)
188 tim 1.6 available.signalAll();
189 dl 1.2 return x;
190     }
191     }
192     }
193 tim 1.13 } finally {
194 dl 1.2 lock.unlock();
195     }
196     }
197    
198    
199 dl 1.27 /**
200     * Retrieves and removes the head of this queue, or <tt>null</tt>
201     * if this queue has no elements with an unexpired delay.
202     *
203     * @return the head of this queue, or <tt>null</tt> if this
204     * queue has no elements with an unexpired delay.
205     */
206 dl 1.3 public E poll() {
207 dl 1.21 final ReentrantLock lock = this.lock;
208 dl 1.2 lock.lock();
209     try {
210 dl 1.3 E first = q.peek();
211 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
212     return null;
213     else {
214 dl 1.3 E x = q.poll();
215 dl 1.2 assert x != null;
216     if (q.size() != 0)
217 tim 1.6 available.signalAll();
218 dl 1.2 return x;
219     }
220 tim 1.13 } finally {
221 dl 1.2 lock.unlock();
222     }
223     }
224    
225 dl 1.27 /**
226     * Retrieves, but does not remove, the head of this queue,
227     * returning <tt>null</tt> if this queue has no elements with an
228     * unexpired delay.
229     *
230     * @return the head of this queue, or <tt>null</tt> if this queue
231     * has no elements with an unexpired delay.
232     */
233 dl 1.3 public E peek() {
234 dl 1.21 final ReentrantLock lock = this.lock;
235 dl 1.2 lock.lock();
236     try {
237     return q.peek();
238 tim 1.13 } finally {
239 dl 1.2 lock.unlock();
240     }
241 tim 1.1 }
242    
243 dl 1.2 public int size() {
244 dl 1.21 final ReentrantLock lock = this.lock;
245 dl 1.2 lock.lock();
246     try {
247     return q.size();
248 tim 1.13 } finally {
249 dl 1.2 lock.unlock();
250     }
251     }
252    
253 dl 1.17 public int drainTo(Collection<? super E> c) {
254     if (c == null)
255     throw new NullPointerException();
256     if (c == this)
257     throw new IllegalArgumentException();
258 dl 1.21 final ReentrantLock lock = this.lock;
259 dl 1.17 lock.lock();
260     try {
261     int n = 0;
262     for (;;) {
263     E first = q.peek();
264     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
265     break;
266     c.add(q.poll());
267     ++n;
268     }
269     if (n > 0)
270     available.signalAll();
271     return n;
272     } finally {
273     lock.unlock();
274     }
275     }
276    
277     public int drainTo(Collection<? super E> c, int maxElements) {
278     if (c == null)
279     throw new NullPointerException();
280     if (c == this)
281     throw new IllegalArgumentException();
282     if (maxElements <= 0)
283     return 0;
284 dl 1.21 final ReentrantLock lock = this.lock;
285 dl 1.17 lock.lock();
286     try {
287     int n = 0;
288     while (n < maxElements) {
289     E first = q.peek();
290     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
291     break;
292     c.add(q.poll());
293     ++n;
294     }
295     if (n > 0)
296     available.signalAll();
297     return n;
298     } finally {
299     lock.unlock();
300     }
301     }
302    
303 dholmes 1.7 /**
304     * Atomically removes all of the elements from this delay queue.
305     * The queue will be empty after this call returns.
306     */
307 dl 1.2 public void clear() {
308 dl 1.21 final ReentrantLock lock = this.lock;
309 dl 1.2 lock.lock();
310     try {
311     q.clear();
312 tim 1.13 } finally {
313 dl 1.2 lock.unlock();
314     }
315     }
316 tim 1.1
317 dholmes 1.7 /**
318     * Always returns <tt>Integer.MAX_VALUE</tt> because
319     * a <tt>DelayQueue</tt> is not capacity constrained.
320     * @return <tt>Integer.MAX_VALUE</tt>
321     */
322 dl 1.2 public int remainingCapacity() {
323     return Integer.MAX_VALUE;
324 tim 1.1 }
325 dl 1.2
326     public Object[] toArray() {
327 dl 1.21 final ReentrantLock lock = this.lock;
328 dl 1.2 lock.lock();
329     try {
330     return q.toArray();
331 tim 1.13 } finally {
332 dl 1.2 lock.unlock();
333     }
334 tim 1.1 }
335 dl 1.2
336     public <T> T[] toArray(T[] array) {
337 dl 1.21 final ReentrantLock lock = this.lock;
338 dl 1.2 lock.lock();
339     try {
340     return q.toArray(array);
341 tim 1.13 } finally {
342 dl 1.2 lock.unlock();
343     }
344 tim 1.1 }
345    
346 dl 1.26 /**
347     * Removes a single instance of the specified element from this
348 dl 1.28 * queue, if it is present.
349 dl 1.26 */
350 dholmes 1.7 public boolean remove(Object o) {
351 dl 1.21 final ReentrantLock lock = this.lock;
352 dl 1.2 lock.lock();
353     try {
354 dholmes 1.7 return q.remove(o);
355 tim 1.13 } finally {
356 dl 1.2 lock.unlock();
357     }
358     }
359    
360 dholmes 1.7 /**
361     * Returns an iterator over the elements in this queue. The iterator
362 dl 1.8 * does not return the elements in any particular order. The
363 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
364 jsr166 1.29 * throw {@link ConcurrentModificationException}
365 dl 1.8 * upon detected interference.
366 dholmes 1.7 *
367     * @return an iterator over the elements in this queue.
368     */
369 dl 1.3 public Iterator<E> iterator() {
370 dl 1.21 final ReentrantLock lock = this.lock;
371 dl 1.2 lock.lock();
372     try {
373     return new Itr(q.iterator());
374 tim 1.13 } finally {
375 dl 1.2 lock.unlock();
376     }
377     }
378    
379 dl 1.3 private class Itr<E> implements Iterator<E> {
380     private final Iterator<E> iter;
381 tim 1.6 Itr(Iterator<E> i) {
382     iter = i;
383 dl 1.2 }
384    
385 tim 1.6 public boolean hasNext() {
386 dl 1.2 return iter.hasNext();
387 tim 1.6 }
388    
389     public E next() {
390 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
391 dl 1.2 lock.lock();
392     try {
393     return iter.next();
394 tim 1.13 } finally {
395 dl 1.2 lock.unlock();
396     }
397 tim 1.6 }
398    
399     public void remove() {
400 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
401 dl 1.2 lock.lock();
402     try {
403     iter.remove();
404 tim 1.13 } finally {
405 dl 1.2 lock.unlock();
406     }
407 tim 1.6 }
408 tim 1.1 }
409    
410     }