ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.24
Committed: Tue Jan 20 04:35:02 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.23: +1 -0 lines
Log Message:
javadoc lint; Thread.interrupt shouldn't throw exception if thread dead

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.15 * <p>This class implements all of the <em>optional</em> methods
20     * of the {@link Collection} and {@link Iterator} interfaces.
21 dl 1.4 * @since 1.5
22     * @author Doug Lea
23 dl 1.20 * @param <E> the type of elements held in this collection
24 dl 1.19 */
25 tim 1.1
26 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
27     implements BlockingQueue<E> {
28 tim 1.6
29 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
30 dl 1.22 private transient final Condition available = lock.newCondition();
31 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
32 tim 1.1
33 dl 1.4 /**
34 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
35 dl 1.4 */
36 tim 1.1 public DelayQueue() {}
37    
38 tim 1.6 /**
39 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
40 dholmes 1.7 * given collection of {@link Delayed} instances.
41     *
42 dl 1.24 * @param c the collection
43 dholmes 1.7 * @throws NullPointerException if <tt>c</tt> or any element within it
44     * is <tt>null</tt>
45 tim 1.6 *
46     */
47     public DelayQueue(Collection<? extends E> c) {
48     this.addAll(c);
49     }
50    
51 dholmes 1.7 /**
52 dl 1.16 * Inserts the specified element into this delay queue.
53 dholmes 1.7 *
54 dl 1.14 * @param o the element to add
55 dholmes 1.7 * @return <tt>true</tt>
56 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
57 dholmes 1.7 */
58     public boolean offer(E o) {
59 dl 1.21 final ReentrantLock lock = this.lock;
60 dl 1.2 lock.lock();
61     try {
62 dl 1.3 E first = q.peek();
63 dholmes 1.7 q.offer(o);
64     if (first == null || o.compareTo(first) < 0)
65 dl 1.4 available.signalAll();
66 dl 1.2 return true;
67 tim 1.13 } finally {
68 dl 1.2 lock.unlock();
69     }
70     }
71    
72 dholmes 1.7
73     /**
74     * Adds the specified element to this delay queue. As the queue is
75     * unbounded this method will never block.
76 dl 1.14 * @param o the element to add
77     * @throws NullPointerException if the specified element is <tt>null</tt>.
78 dholmes 1.7 */
79     public void put(E o) {
80     offer(o);
81 dl 1.2 }
82    
83 dholmes 1.7 /**
84 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
85 dholmes 1.7 * unbounded this method will never block.
86 dl 1.14 * @param o the element to add
87     * @param timeout This parameter is ignored as the method never blocks
88 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
89     * @return <tt>true</tt>
90 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
91 dholmes 1.7 */
92 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
93 dholmes 1.7 return offer(o);
94 dl 1.2 }
95    
96 dholmes 1.7 /**
97     * Adds the specified element to this queue.
98 dl 1.14 * @param o the element to add
99 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
100     * <tt>Collection.add</tt>).
101     *
102 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
103 dholmes 1.7 */
104     public boolean add(E o) {
105     return offer(o);
106 dl 1.2 }
107    
108 dl 1.3 public E take() throws InterruptedException {
109 dl 1.21 final ReentrantLock lock = this.lock;
110 dl 1.2 lock.lockInterruptibly();
111     try {
112     for (;;) {
113 dl 1.3 E first = q.peek();
114 dl 1.12 if (first == null) {
115 dl 1.4 available.await();
116 tim 1.13 } else {
117 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
118 dl 1.12 if (delay > 0) {
119     long tl = available.awaitNanos(delay);
120 tim 1.13 } else {
121 dl 1.3 E x = q.poll();
122 dl 1.2 assert x != null;
123     if (q.size() != 0)
124 dl 1.4 available.signalAll(); // wake up other takers
125 dl 1.2 return x;
126 tim 1.6
127 dl 1.2 }
128     }
129     }
130 tim 1.13 } finally {
131 dl 1.2 lock.unlock();
132     }
133     }
134    
135 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
136 dl 1.21 final ReentrantLock lock = this.lock;
137 dl 1.2 lock.lockInterruptibly();
138     long nanos = unit.toNanos(time);
139     try {
140     for (;;) {
141 dl 1.3 E first = q.peek();
142 dl 1.2 if (first == null) {
143     if (nanos <= 0)
144     return null;
145     else
146 dl 1.4 nanos = available.awaitNanos(nanos);
147 tim 1.13 } else {
148 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
149     if (delay > 0) {
150     if (delay > nanos)
151     delay = nanos;
152 dl 1.4 long timeLeft = available.awaitNanos(delay);
153 dl 1.2 nanos -= delay - timeLeft;
154 tim 1.13 } else {
155 dl 1.3 E x = q.poll();
156 dl 1.2 assert x != null;
157     if (q.size() != 0)
158 tim 1.6 available.signalAll();
159 dl 1.2 return x;
160     }
161     }
162     }
163 tim 1.13 } finally {
164 dl 1.2 lock.unlock();
165     }
166     }
167    
168    
169 dl 1.3 public E poll() {
170 dl 1.21 final ReentrantLock lock = this.lock;
171 dl 1.2 lock.lock();
172     try {
173 dl 1.3 E first = q.peek();
174 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
175     return null;
176     else {
177 dl 1.3 E x = q.poll();
178 dl 1.2 assert x != null;
179     if (q.size() != 0)
180 tim 1.6 available.signalAll();
181 dl 1.2 return x;
182     }
183 tim 1.13 } finally {
184 dl 1.2 lock.unlock();
185     }
186     }
187    
188 dl 1.3 public E peek() {
189 dl 1.21 final ReentrantLock lock = this.lock;
190 dl 1.2 lock.lock();
191     try {
192     return q.peek();
193 tim 1.13 } finally {
194 dl 1.2 lock.unlock();
195     }
196 tim 1.1 }
197    
198 dl 1.2 public int size() {
199 dl 1.21 final ReentrantLock lock = this.lock;
200 dl 1.2 lock.lock();
201     try {
202     return q.size();
203 tim 1.13 } finally {
204 dl 1.2 lock.unlock();
205     }
206     }
207    
208 dl 1.17 public int drainTo(Collection<? super E> c) {
209     if (c == null)
210     throw new NullPointerException();
211     if (c == this)
212     throw new IllegalArgumentException();
213 dl 1.21 final ReentrantLock lock = this.lock;
214 dl 1.17 lock.lock();
215     try {
216     int n = 0;
217     for (;;) {
218     E first = q.peek();
219     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
220     break;
221     c.add(q.poll());
222     ++n;
223     }
224     if (n > 0)
225     available.signalAll();
226     return n;
227     } finally {
228     lock.unlock();
229     }
230     }
231    
232     public int drainTo(Collection<? super E> c, int maxElements) {
233     if (c == null)
234     throw new NullPointerException();
235     if (c == this)
236     throw new IllegalArgumentException();
237     if (maxElements <= 0)
238     return 0;
239 dl 1.21 final ReentrantLock lock = this.lock;
240 dl 1.17 lock.lock();
241     try {
242     int n = 0;
243     while (n < maxElements) {
244     E first = q.peek();
245     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
246     break;
247     c.add(q.poll());
248     ++n;
249     }
250     if (n > 0)
251     available.signalAll();
252     return n;
253     } finally {
254     lock.unlock();
255     }
256     }
257    
258 dholmes 1.7 /**
259     * Atomically removes all of the elements from this delay queue.
260     * The queue will be empty after this call returns.
261     */
262 dl 1.2 public void clear() {
263 dl 1.21 final ReentrantLock lock = this.lock;
264 dl 1.2 lock.lock();
265     try {
266     q.clear();
267 tim 1.13 } finally {
268 dl 1.2 lock.unlock();
269     }
270     }
271 tim 1.1
272 dholmes 1.7 /**
273     * Always returns <tt>Integer.MAX_VALUE</tt> because
274     * a <tt>DelayQueue</tt> is not capacity constrained.
275     * @return <tt>Integer.MAX_VALUE</tt>
276     */
277 dl 1.2 public int remainingCapacity() {
278     return Integer.MAX_VALUE;
279 tim 1.1 }
280 dl 1.2
281     public Object[] toArray() {
282 dl 1.21 final ReentrantLock lock = this.lock;
283 dl 1.2 lock.lock();
284     try {
285     return q.toArray();
286 tim 1.13 } finally {
287 dl 1.2 lock.unlock();
288     }
289 tim 1.1 }
290 dl 1.2
291     public <T> T[] toArray(T[] array) {
292 dl 1.21 final ReentrantLock lock = this.lock;
293 dl 1.2 lock.lock();
294     try {
295     return q.toArray(array);
296 tim 1.13 } finally {
297 dl 1.2 lock.unlock();
298     }
299 tim 1.1 }
300    
301 dholmes 1.7 public boolean remove(Object o) {
302 dl 1.21 final ReentrantLock lock = this.lock;
303 dl 1.2 lock.lock();
304     try {
305 dholmes 1.7 return q.remove(o);
306 tim 1.13 } finally {
307 dl 1.2 lock.unlock();
308     }
309     }
310    
311 dholmes 1.7 /**
312     * Returns an iterator over the elements in this queue. The iterator
313 dl 1.8 * does not return the elements in any particular order. The
314 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
315 dl 1.8 * throw {@link java.util.ConcurrentModificationException}
316     * upon detected interference.
317 dholmes 1.7 *
318     * @return an iterator over the elements in this queue.
319     */
320 dl 1.3 public Iterator<E> iterator() {
321 dl 1.21 final ReentrantLock lock = this.lock;
322 dl 1.2 lock.lock();
323     try {
324     return new Itr(q.iterator());
325 tim 1.13 } finally {
326 dl 1.2 lock.unlock();
327     }
328     }
329    
330 dl 1.3 private class Itr<E> implements Iterator<E> {
331     private final Iterator<E> iter;
332 tim 1.6 Itr(Iterator<E> i) {
333     iter = i;
334 dl 1.2 }
335    
336 tim 1.6 public boolean hasNext() {
337 dl 1.2 return iter.hasNext();
338 tim 1.6 }
339    
340     public E next() {
341 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
342 dl 1.2 lock.lock();
343     try {
344     return iter.next();
345 tim 1.13 } finally {
346 dl 1.2 lock.unlock();
347     }
348 tim 1.6 }
349    
350     public void remove() {
351 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
352 dl 1.2 lock.lock();
353     try {
354     iter.remove();
355 tim 1.13 } finally {
356 dl 1.2 lock.unlock();
357     }
358 tim 1.6 }
359 tim 1.1 }
360    
361     }