ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.25
Committed: Tue Jan 27 11:36:31 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PFD
Changes since 1.24: +5 -0 lines
Log Message:
Add Collection framework membership doc

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.15 * <p>This class implements all of the <em>optional</em> methods
20     * of the {@link Collection} and {@link Iterator} interfaces.
21 dl 1.25 *
22     * <p>This class is a member of the
23     * <a href="{@docRoot}/../guide/collections/index.html">
24     * Java Collections Framework</a>.
25     *
26 dl 1.4 * @since 1.5
27     * @author Doug Lea
28 dl 1.20 * @param <E> the type of elements held in this collection
29 dl 1.19 */
30 tim 1.1
31 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
32     implements BlockingQueue<E> {
33 tim 1.6
34 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
35 dl 1.22 private transient final Condition available = lock.newCondition();
36 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
37 tim 1.1
38 dl 1.4 /**
39 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
40 dl 1.4 */
41 tim 1.1 public DelayQueue() {}
42    
43 tim 1.6 /**
44 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
45 dholmes 1.7 * given collection of {@link Delayed} instances.
46     *
47 dl 1.24 * @param c the collection
48 dholmes 1.7 * @throws NullPointerException if <tt>c</tt> or any element within it
49     * is <tt>null</tt>
50 tim 1.6 *
51     */
52     public DelayQueue(Collection<? extends E> c) {
53     this.addAll(c);
54     }
55    
56 dholmes 1.7 /**
57 dl 1.16 * Inserts the specified element into this delay queue.
58 dholmes 1.7 *
59 dl 1.14 * @param o the element to add
60 dholmes 1.7 * @return <tt>true</tt>
61 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
62 dholmes 1.7 */
63     public boolean offer(E o) {
64 dl 1.21 final ReentrantLock lock = this.lock;
65 dl 1.2 lock.lock();
66     try {
67 dl 1.3 E first = q.peek();
68 dholmes 1.7 q.offer(o);
69     if (first == null || o.compareTo(first) < 0)
70 dl 1.4 available.signalAll();
71 dl 1.2 return true;
72 tim 1.13 } finally {
73 dl 1.2 lock.unlock();
74     }
75     }
76    
77 dholmes 1.7
78     /**
79     * Adds the specified element to this delay queue. As the queue is
80     * unbounded this method will never block.
81 dl 1.14 * @param o the element to add
82     * @throws NullPointerException if the specified element is <tt>null</tt>.
83 dholmes 1.7 */
84     public void put(E o) {
85     offer(o);
86 dl 1.2 }
87    
88 dholmes 1.7 /**
89 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
90 dholmes 1.7 * unbounded this method will never block.
91 dl 1.14 * @param o the element to add
92     * @param timeout This parameter is ignored as the method never blocks
93 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
94     * @return <tt>true</tt>
95 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
96 dholmes 1.7 */
97 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
98 dholmes 1.7 return offer(o);
99 dl 1.2 }
100    
101 dholmes 1.7 /**
102     * Adds the specified element to this queue.
103 dl 1.14 * @param o the element to add
104 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
105     * <tt>Collection.add</tt>).
106     *
107 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
108 dholmes 1.7 */
109     public boolean add(E o) {
110     return offer(o);
111 dl 1.2 }
112    
113 dl 1.3 public E take() throws InterruptedException {
114 dl 1.21 final ReentrantLock lock = this.lock;
115 dl 1.2 lock.lockInterruptibly();
116     try {
117     for (;;) {
118 dl 1.3 E first = q.peek();
119 dl 1.12 if (first == null) {
120 dl 1.4 available.await();
121 tim 1.13 } else {
122 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
123 dl 1.12 if (delay > 0) {
124     long tl = available.awaitNanos(delay);
125 tim 1.13 } else {
126 dl 1.3 E x = q.poll();
127 dl 1.2 assert x != null;
128     if (q.size() != 0)
129 dl 1.4 available.signalAll(); // wake up other takers
130 dl 1.2 return x;
131 tim 1.6
132 dl 1.2 }
133     }
134     }
135 tim 1.13 } finally {
136 dl 1.2 lock.unlock();
137     }
138     }
139    
140 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
141 dl 1.21 final ReentrantLock lock = this.lock;
142 dl 1.2 lock.lockInterruptibly();
143     long nanos = unit.toNanos(time);
144     try {
145     for (;;) {
146 dl 1.3 E first = q.peek();
147 dl 1.2 if (first == null) {
148     if (nanos <= 0)
149     return null;
150     else
151 dl 1.4 nanos = available.awaitNanos(nanos);
152 tim 1.13 } else {
153 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
154     if (delay > 0) {
155     if (delay > nanos)
156     delay = nanos;
157 dl 1.4 long timeLeft = available.awaitNanos(delay);
158 dl 1.2 nanos -= delay - timeLeft;
159 tim 1.13 } else {
160 dl 1.3 E x = q.poll();
161 dl 1.2 assert x != null;
162     if (q.size() != 0)
163 tim 1.6 available.signalAll();
164 dl 1.2 return x;
165     }
166     }
167     }
168 tim 1.13 } finally {
169 dl 1.2 lock.unlock();
170     }
171     }
172    
173    
174 dl 1.3 public E poll() {
175 dl 1.21 final ReentrantLock lock = this.lock;
176 dl 1.2 lock.lock();
177     try {
178 dl 1.3 E first = q.peek();
179 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
180     return null;
181     else {
182 dl 1.3 E x = q.poll();
183 dl 1.2 assert x != null;
184     if (q.size() != 0)
185 tim 1.6 available.signalAll();
186 dl 1.2 return x;
187     }
188 tim 1.13 } finally {
189 dl 1.2 lock.unlock();
190     }
191     }
192    
193 dl 1.3 public E peek() {
194 dl 1.21 final ReentrantLock lock = this.lock;
195 dl 1.2 lock.lock();
196     try {
197     return q.peek();
198 tim 1.13 } finally {
199 dl 1.2 lock.unlock();
200     }
201 tim 1.1 }
202    
203 dl 1.2 public int size() {
204 dl 1.21 final ReentrantLock lock = this.lock;
205 dl 1.2 lock.lock();
206     try {
207     return q.size();
208 tim 1.13 } finally {
209 dl 1.2 lock.unlock();
210     }
211     }
212    
213 dl 1.17 public int drainTo(Collection<? super E> c) {
214     if (c == null)
215     throw new NullPointerException();
216     if (c == this)
217     throw new IllegalArgumentException();
218 dl 1.21 final ReentrantLock lock = this.lock;
219 dl 1.17 lock.lock();
220     try {
221     int n = 0;
222     for (;;) {
223     E first = q.peek();
224     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
225     break;
226     c.add(q.poll());
227     ++n;
228     }
229     if (n > 0)
230     available.signalAll();
231     return n;
232     } finally {
233     lock.unlock();
234     }
235     }
236    
237     public int drainTo(Collection<? super E> c, int maxElements) {
238     if (c == null)
239     throw new NullPointerException();
240     if (c == this)
241     throw new IllegalArgumentException();
242     if (maxElements <= 0)
243     return 0;
244 dl 1.21 final ReentrantLock lock = this.lock;
245 dl 1.17 lock.lock();
246     try {
247     int n = 0;
248     while (n < maxElements) {
249     E first = q.peek();
250     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
251     break;
252     c.add(q.poll());
253     ++n;
254     }
255     if (n > 0)
256     available.signalAll();
257     return n;
258     } finally {
259     lock.unlock();
260     }
261     }
262    
263 dholmes 1.7 /**
264     * Atomically removes all of the elements from this delay queue.
265     * The queue will be empty after this call returns.
266     */
267 dl 1.2 public void clear() {
268 dl 1.21 final ReentrantLock lock = this.lock;
269 dl 1.2 lock.lock();
270     try {
271     q.clear();
272 tim 1.13 } finally {
273 dl 1.2 lock.unlock();
274     }
275     }
276 tim 1.1
277 dholmes 1.7 /**
278     * Always returns <tt>Integer.MAX_VALUE</tt> because
279     * a <tt>DelayQueue</tt> is not capacity constrained.
280     * @return <tt>Integer.MAX_VALUE</tt>
281     */
282 dl 1.2 public int remainingCapacity() {
283     return Integer.MAX_VALUE;
284 tim 1.1 }
285 dl 1.2
286     public Object[] toArray() {
287 dl 1.21 final ReentrantLock lock = this.lock;
288 dl 1.2 lock.lock();
289     try {
290     return q.toArray();
291 tim 1.13 } finally {
292 dl 1.2 lock.unlock();
293     }
294 tim 1.1 }
295 dl 1.2
296     public <T> T[] toArray(T[] array) {
297 dl 1.21 final ReentrantLock lock = this.lock;
298 dl 1.2 lock.lock();
299     try {
300     return q.toArray(array);
301 tim 1.13 } finally {
302 dl 1.2 lock.unlock();
303     }
304 tim 1.1 }
305    
306 dholmes 1.7 public boolean remove(Object o) {
307 dl 1.21 final ReentrantLock lock = this.lock;
308 dl 1.2 lock.lock();
309     try {
310 dholmes 1.7 return q.remove(o);
311 tim 1.13 } finally {
312 dl 1.2 lock.unlock();
313     }
314     }
315    
316 dholmes 1.7 /**
317     * Returns an iterator over the elements in this queue. The iterator
318 dl 1.8 * does not return the elements in any particular order. The
319 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
320 dl 1.8 * throw {@link java.util.ConcurrentModificationException}
321     * upon detected interference.
322 dholmes 1.7 *
323     * @return an iterator over the elements in this queue.
324     */
325 dl 1.3 public Iterator<E> iterator() {
326 dl 1.21 final ReentrantLock lock = this.lock;
327 dl 1.2 lock.lock();
328     try {
329     return new Itr(q.iterator());
330 tim 1.13 } finally {
331 dl 1.2 lock.unlock();
332     }
333     }
334    
335 dl 1.3 private class Itr<E> implements Iterator<E> {
336     private final Iterator<E> iter;
337 tim 1.6 Itr(Iterator<E> i) {
338     iter = i;
339 dl 1.2 }
340    
341 tim 1.6 public boolean hasNext() {
342 dl 1.2 return iter.hasNext();
343 tim 1.6 }
344    
345     public E next() {
346 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
347 dl 1.2 lock.lock();
348     try {
349     return iter.next();
350 tim 1.13 } finally {
351 dl 1.2 lock.unlock();
352     }
353 tim 1.6 }
354    
355     public void remove() {
356 dl 1.21 final ReentrantLock lock = DelayQueue.this.lock;
357 dl 1.2 lock.lock();
358     try {
359     iter.remove();
360 tim 1.13 } finally {
361 dl 1.2 lock.unlock();
362     }
363 tim 1.6 }
364 tim 1.1 }
365    
366     }