ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.23
Committed: Sat Dec 27 19:26:25 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.22: +2 -2 lines
Log Message:
Headers reference Creative Commons

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.15 * <p>This class implements all of the <em>optional</em> methods
20     * of the {@link Collection} and {@link Iterator} interfaces.
21 dl 1.4 * @since 1.5
22     * @author Doug Lea
23 dl 1.20 * @param <E> the type of elements held in this collection
24 dl 1.19 */
25 tim 1.1
26 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
27     implements BlockingQueue<E> {
28 tim 1.6
29 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
30 dl 1.22 private transient final Condition available = lock.newCondition();
31 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
32 tim 1.1
33 dl 1.4 /**
34 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
35 dl 1.4 */
36 tim 1.1 public DelayQueue() {}
37    
38 tim 1.6 /**
39 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
40 dholmes 1.7 * given collection of {@link Delayed} instances.
41     *
42     * @throws NullPointerException if <tt>c</tt> or any element within it
43     * is <tt>null</tt>
44 tim 1.6 *
45     */
46     public DelayQueue(Collection<? extends E> c) {
47     this.addAll(c);
48     }
49    
50 dholmes 1.7 /**
51 dl 1.16 * Inserts the specified element into this delay queue.
52 dholmes 1.7 *
53 dl 1.14 * @param o the element to add
54 dholmes 1.7 * @return <tt>true</tt>
55 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
56 dholmes 1.7 */
57     public boolean offer(E o) {
58 dl 1.21 final ReentrantLock lock = this.lock;
59 dl 1.2 lock.lock();
60     try {
61 dl 1.3 E first = q.peek();
62 dholmes 1.7 q.offer(o);
63     if (first == null || o.compareTo(first) < 0)
64 dl 1.4 available.signalAll();
65 dl 1.2 return true;
66 tim 1.13 } finally {
67 dl 1.2 lock.unlock();
68     }
69     }
70    
71 dholmes 1.7
72     /**
73     * Adds the specified element to this delay queue. As the queue is
74     * unbounded this method will never block.
75 dl 1.14 * @param o the element to add
76     * @throws NullPointerException if the specified element is <tt>null</tt>.
77 dholmes 1.7 */
78     public void put(E o) {
79     offer(o);
80 dl 1.2 }
81    
82 dholmes 1.7 /**
83 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
84 dholmes 1.7 * unbounded this method will never block.
85 dl 1.14 * @param o the element to add
86     * @param timeout This parameter is ignored as the method never blocks
87 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
88     * @return <tt>true</tt>
89 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
90 dholmes 1.7 */
91 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
92 dholmes 1.7 return offer(o);
93 dl 1.2 }
94    
95 dholmes 1.7 /**
96     * Adds the specified element to this queue.
97 dl 1.14 * @param o the element to add
98 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
99     * <tt>Collection.add</tt>).
100     *
101 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
102 dholmes 1.7 */
103     public boolean add(E o) {
104     return offer(o);
105 dl 1.2 }
106    
107 dl 1.3 public E take() throws InterruptedException {
108 dl 1.21 final ReentrantLock lock = this.lock;
109 dl 1.2 lock.lockInterruptibly();
110     try {
111     for (;;) {
112 dl 1.3 E first = q.peek();
113 dl 1.12 if (first == null) {
114 dl 1.4 available.await();
115 tim 1.13 } else {
116 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
117 dl 1.12 if (delay > 0) {
118     long tl = available.awaitNanos(delay);
119 tim 1.13 } else {
120 dl 1.3 E x = q.poll();
121 dl 1.2 assert x != null;
122     if (q.size() != 0)
123 dl 1.4 available.signalAll(); // wake up other takers
124 dl 1.2 return x;
125 tim 1.6
126 dl 1.2 }
127     }
128     }
129 tim 1.13 } finally {
130 dl 1.2 lock.unlock();
131     }
132     }
133    
134 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
135 dl 1.21 final ReentrantLock lock = this.lock;
136 dl 1.2 lock.lockInterruptibly();
137     long nanos = unit.toNanos(time);
138     try {
139     for (;;) {
140 dl 1.3 E first = q.peek();
141 dl 1.2 if (first == null) {
142     if (nanos <= 0)
143     return null;
144     else
145 dl 1.4 nanos = available.awaitNanos(nanos);
146 tim 1.13 } else {
147 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
148     if (delay > 0) {
149     if (delay > nanos)
150     delay = nanos;
151 dl 1.4 long timeLeft = available.awaitNanos(delay);
152 dl 1.2 nanos -= delay - timeLeft;
153 tim 1.13 } else {
154 dl 1.3 E x = q.poll();
155 dl 1.2 assert x != null;
156     if (q.size() != 0)
157 tim 1.6 available.signalAll();
158 dl 1.2 return x;
159     }
160     }
161     }
162 tim 1.13 } finally {
163 dl 1.2 lock.unlock();
164     }
165     }
166    
167    
168 dl 1.3 public E poll() {
169 dl 1.21 final ReentrantLock lock = this.lock;
170 dl 1.2 lock.lock();
171     try {
172 dl 1.3 E first = q.peek();
173 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
174     return null;
175     else {
176 dl 1.3 E x = q.poll();
177 dl 1.2 assert x != null;
178     if (q.size() != 0)
179 tim 1.6 available.signalAll();
180 dl 1.2 return x;
181     }
182 tim 1.13 } finally {
183 dl 1.2 lock.unlock();
184     }
185     }
186    
187 dl 1.3 public E peek() {
188 dl 1.21 final ReentrantLock lock = this.lock;
189 dl 1.2 lock.lock();
190     try {
191     return q.peek();
192 tim 1.13 } finally {
193 dl 1.2 lock.unlock();
194     }
195 tim 1.1 }
196    
197 dl 1.2 public int size() {
198 dl 1.21 final ReentrantLock lock = this.lock;
199 dl 1.2 lock.lock();
200     try {
201     return q.size();
202 tim 1.13 } finally {
203 dl 1.2 lock.unlock();
204     }
205     }
206    
207 dl 1.17 public int drainTo(Collection<? super E> c) {
208     if (c == null)
209     throw new NullPointerException();
210     if (c == this)
211     throw new IllegalArgumentException();
212 dl 1.21 final ReentrantLock lock = this.lock;
213 dl 1.17 lock.lock();
214     try {
215     int n = 0;
216     for (;;) {
217     E first = q.peek();
218     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
219     break;
220     c.add(q.poll());
221     ++n;
222     }
223     if (n > 0)
224     available.signalAll();
225     return n;
226     } finally {
227     lock.unlock();
228     }
229     }
230    
231     public int drainTo(Collection<? super E> c, int maxElements) {
232     if (c == null)
233     throw new NullPointerException();
234     if (c == this)
235     throw new IllegalArgumentException();
236     if (maxElements <= 0)
237     return 0;
238 dl 1.21 final ReentrantLock lock = this.lock;
239 dl 1.17 lock.lock();
240     try {
241     int n = 0;
242     while (n < maxElements) {
243     E first = q.peek();
244     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
245     break;
246     c.add(q.poll());
247     ++n;
248     }
249     if (n > 0)
250     available.signalAll();
251     return n;
252     } finally {
253     lock.unlock();
254     }
255     }
256    
257 dholmes 1.7 /**
258     * Atomically removes all of the elements from this delay queue.
259     * The queue will be empty after this call returns.
260     */
261 dl 1.2 public void clear() {
262 dl 1.21 final ReentrantLock lock = this.lock;
263 dl 1.2 lock.lock();
264     try {
265     q.clear();
266 tim 1.13 } finally {
267 dl 1.2 lock.unlock();
268     }
269     }
270 tim 1.1
271 dholmes 1.7 /**
272     * Always returns <tt>Integer.MAX_VALUE</tt> because
273     * a <tt>DelayQueue</tt> is not capacity constrained.
274     * @return <tt>Integer.MAX_VALUE</tt>
275     */
276 dl 1.2 public int remainingCapacity() {
277     return Integer.MAX_VALUE;
278 tim 1.1 }
279 dl 1.2
280     public Object[] toArray() {
281 dl 1.21 final ReentrantLock lock = this.lock;
282 dl 1.2 lock.lock();
283     try {
284     return q.toArray();
285 tim 1.13 } finally {
286 dl 1.2 lock.unlock();
287     }
288 tim 1.1 }
289 dl 1.2
290     public <T> T[] toArray(T[] array) {
291 dl 1.21 final ReentrantLock lock = this.lock;
292 dl 1.2 lock.lock();
293     try {
294     return q.toArray(array);
295 tim 1.13 } finally {
296 dl 1.2 lock.unlock();
297     }
298 tim 1.1 }
299    
300 dholmes 1.7 public boolean remove(Object o) {
301 dl 1.21 final ReentrantLock lock = this.lock;
302 dl 1.2 lock.lock();
303     try {
304 dholmes 1.7 return q.remove(o);
305 tim 1.13 } finally {
306 dl 1.2 lock.unlock();
307     }
308     }
309    
310 dholmes 1.7 /**
311     * Returns an iterator over the elements in this queue. The iterator
312 dl 1.8 * does not return the elements in any particular order. The
313 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
314 dl 1.8 * throw {@link java.util.ConcurrentModificationException}
315     * upon detected interference.
316 dholmes 1.7 *
317     * @return an iterator over the elements in this queue.
318     */
319 dl 1.3 public Iterator<E> iterator() {
320 dl 1.21 final ReentrantLock lock = this.lock;
321 dl 1.2 lock.lock();
322     try {
323     return new Itr(q.iterator());
324 tim 1.13 } finally {
325 dl 1.2 lock.unlock();
326     }
327     }
328    
329 dl 1.3 private class Itr<E> implements Iterator<E> {
330     private final Iterator<E> iter;
331 tim 1.6 Itr(Iterator<E> i) {
332     iter = i;
333 dl 1.2 }
334    
335 tim 1.6 public boolean hasNext() {
336 dl 1.2 return iter.hasNext();
337 tim 1.6 }
338    
339     public E next() {
340 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
341 dl 1.2 lock.lock();
342     try {
343     return iter.next();
344 tim 1.13 } finally {
345 dl 1.2 lock.unlock();
346     }
347 tim 1.6 }
348    
349     public void remove() {
350 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
351 dl 1.2 lock.lock();
352     try {
353     iter.remove();
354 tim 1.13 } finally {
355 dl 1.2 lock.unlock();
356     }
357 tim 1.6 }
358 tim 1.1 }
359    
360     }