ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.20
Committed: Sun Oct 19 13:38:34 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_NOV3_FREEZE, JSR166_DEC9_PRE_ES_SUBMIT, JSR166_DEC9_POST_ES_SUBMIT
Changes since 1.19: +1 -1 lines
Log Message:
Changed doc strings for generic params

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.15 * <p>This class implements all of the <em>optional</em> methods
20     * of the {@link Collection} and {@link Iterator} interfaces.
21 dl 1.4 * @since 1.5
22     * @author Doug Lea
23 dl 1.20 * @param <E> the type of elements held in this collection
24 dl 1.19 */
25 tim 1.1
26 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
27     implements BlockingQueue<E> {
28 tim 1.6
29 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
30 dl 1.18 private transient final ReentrantLock.ConditionObject available = lock.newCondition();
31 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
32 tim 1.1
33 dl 1.4 /**
34 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
35 dl 1.4 */
36 tim 1.1 public DelayQueue() {}
37    
38 tim 1.6 /**
39 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
40 dholmes 1.7 * given collection of {@link Delayed} instances.
41     *
42     * @throws NullPointerException if <tt>c</tt> or any element within it
43     * is <tt>null</tt>
44 tim 1.6 *
45     */
46     public DelayQueue(Collection<? extends E> c) {
47     this.addAll(c);
48     }
49    
50 dholmes 1.7 /**
51 dl 1.16 * Inserts the specified element into this delay queue.
52 dholmes 1.7 *
53 dl 1.14 * @param o the element to add
54 dholmes 1.7 * @return <tt>true</tt>
55 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
56 dholmes 1.7 */
57     public boolean offer(E o) {
58 dl 1.2 lock.lock();
59     try {
60 dl 1.3 E first = q.peek();
61 dholmes 1.7 q.offer(o);
62     if (first == null || o.compareTo(first) < 0)
63 dl 1.4 available.signalAll();
64 dl 1.2 return true;
65 tim 1.13 } finally {
66 dl 1.2 lock.unlock();
67     }
68     }
69    
70 dholmes 1.7
71     /**
72     * Adds the specified element to this delay queue. As the queue is
73     * unbounded this method will never block.
74 dl 1.14 * @param o the element to add
75     * @throws NullPointerException if the specified element is <tt>null</tt>.
76 dholmes 1.7 */
77     public void put(E o) {
78     offer(o);
79 dl 1.2 }
80    
81 dholmes 1.7 /**
82 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
83 dholmes 1.7 * unbounded this method will never block.
84 dl 1.14 * @param o the element to add
85     * @param timeout This parameter is ignored as the method never blocks
86 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
87     * @return <tt>true</tt>
88 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
89 dholmes 1.7 */
90 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
91 dholmes 1.7 return offer(o);
92 dl 1.2 }
93    
94 dholmes 1.7 /**
95     * Adds the specified element to this queue.
96 dl 1.14 * @param o the element to add
97 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
98     * <tt>Collection.add</tt>).
99     *
100 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
101 dholmes 1.7 */
102     public boolean add(E o) {
103     return offer(o);
104 dl 1.2 }
105    
106 dl 1.3 public E take() throws InterruptedException {
107 dl 1.2 lock.lockInterruptibly();
108     try {
109     for (;;) {
110 dl 1.3 E first = q.peek();
111 dl 1.12 if (first == null) {
112 dl 1.4 available.await();
113 tim 1.13 } else {
114 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
115 dl 1.12 if (delay > 0) {
116     long tl = available.awaitNanos(delay);
117 tim 1.13 } else {
118 dl 1.3 E x = q.poll();
119 dl 1.2 assert x != null;
120     if (q.size() != 0)
121 dl 1.4 available.signalAll(); // wake up other takers
122 dl 1.2 return x;
123 tim 1.6
124 dl 1.2 }
125     }
126     }
127 tim 1.13 } finally {
128 dl 1.2 lock.unlock();
129     }
130     }
131    
132 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
133 dl 1.2 lock.lockInterruptibly();
134     long nanos = unit.toNanos(time);
135     try {
136     for (;;) {
137 dl 1.3 E first = q.peek();
138 dl 1.2 if (first == null) {
139     if (nanos <= 0)
140     return null;
141     else
142 dl 1.4 nanos = available.awaitNanos(nanos);
143 tim 1.13 } else {
144 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
145     if (delay > 0) {
146     if (delay > nanos)
147     delay = nanos;
148 dl 1.4 long timeLeft = available.awaitNanos(delay);
149 dl 1.2 nanos -= delay - timeLeft;
150 tim 1.13 } else {
151 dl 1.3 E x = q.poll();
152 dl 1.2 assert x != null;
153     if (q.size() != 0)
154 tim 1.6 available.signalAll();
155 dl 1.2 return x;
156     }
157     }
158     }
159 tim 1.13 } finally {
160 dl 1.2 lock.unlock();
161     }
162     }
163    
164    
165 dl 1.3 public E poll() {
166 dl 1.2 lock.lock();
167     try {
168 dl 1.3 E first = q.peek();
169 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
170     return null;
171     else {
172 dl 1.3 E x = q.poll();
173 dl 1.2 assert x != null;
174     if (q.size() != 0)
175 tim 1.6 available.signalAll();
176 dl 1.2 return x;
177     }
178 tim 1.13 } finally {
179 dl 1.2 lock.unlock();
180     }
181     }
182    
183 dl 1.3 public E peek() {
184 dl 1.2 lock.lock();
185     try {
186     return q.peek();
187 tim 1.13 } finally {
188 dl 1.2 lock.unlock();
189     }
190 tim 1.1 }
191    
192 dl 1.2 public int size() {
193     lock.lock();
194     try {
195     return q.size();
196 tim 1.13 } finally {
197 dl 1.2 lock.unlock();
198     }
199     }
200    
201 dl 1.17 public int drainTo(Collection<? super E> c) {
202     if (c == null)
203     throw new NullPointerException();
204     if (c == this)
205     throw new IllegalArgumentException();
206     lock.lock();
207     try {
208     int n = 0;
209     for (;;) {
210     E first = q.peek();
211     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
212     break;
213     c.add(q.poll());
214     ++n;
215     }
216     if (n > 0)
217     available.signalAll();
218     return n;
219     } finally {
220     lock.unlock();
221     }
222     }
223    
224     public int drainTo(Collection<? super E> c, int maxElements) {
225     if (c == null)
226     throw new NullPointerException();
227     if (c == this)
228     throw new IllegalArgumentException();
229     if (maxElements <= 0)
230     return 0;
231     lock.lock();
232     try {
233     int n = 0;
234     while (n < maxElements) {
235     E first = q.peek();
236     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
237     break;
238     c.add(q.poll());
239     ++n;
240     }
241     if (n > 0)
242     available.signalAll();
243     return n;
244     } finally {
245     lock.unlock();
246     }
247     }
248    
249 dholmes 1.7 /**
250     * Atomically removes all of the elements from this delay queue.
251     * The queue will be empty after this call returns.
252     */
253 dl 1.2 public void clear() {
254     lock.lock();
255     try {
256     q.clear();
257 tim 1.13 } finally {
258 dl 1.2 lock.unlock();
259     }
260     }
261 tim 1.1
262 dholmes 1.7 /**
263     * Always returns <tt>Integer.MAX_VALUE</tt> because
264     * a <tt>DelayQueue</tt> is not capacity constrained.
265     * @return <tt>Integer.MAX_VALUE</tt>
266     */
267 dl 1.2 public int remainingCapacity() {
268     return Integer.MAX_VALUE;
269 tim 1.1 }
270 dl 1.2
271     public Object[] toArray() {
272     lock.lock();
273     try {
274     return q.toArray();
275 tim 1.13 } finally {
276 dl 1.2 lock.unlock();
277     }
278 tim 1.1 }
279 dl 1.2
280     public <T> T[] toArray(T[] array) {
281     lock.lock();
282     try {
283     return q.toArray(array);
284 tim 1.13 } finally {
285 dl 1.2 lock.unlock();
286     }
287 tim 1.1 }
288    
289 dholmes 1.7 public boolean remove(Object o) {
290 dl 1.2 lock.lock();
291     try {
292 dholmes 1.7 return q.remove(o);
293 tim 1.13 } finally {
294 dl 1.2 lock.unlock();
295     }
296     }
297    
298 dholmes 1.7 /**
299     * Returns an iterator over the elements in this queue. The iterator
300 dl 1.8 * does not return the elements in any particular order. The
301 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
302 dl 1.8 * throw {@link java.util.ConcurrentModificationException}
303     * upon detected interference.
304 dholmes 1.7 *
305     * @return an iterator over the elements in this queue.
306     */
307 dl 1.3 public Iterator<E> iterator() {
308 dl 1.2 lock.lock();
309     try {
310     return new Itr(q.iterator());
311 tim 1.13 } finally {
312 dl 1.2 lock.unlock();
313     }
314     }
315    
316 dl 1.3 private class Itr<E> implements Iterator<E> {
317     private final Iterator<E> iter;
318 tim 1.6 Itr(Iterator<E> i) {
319     iter = i;
320 dl 1.2 }
321    
322 tim 1.6 public boolean hasNext() {
323 dl 1.2 return iter.hasNext();
324 tim 1.6 }
325    
326     public E next() {
327 dl 1.2 lock.lock();
328     try {
329     return iter.next();
330 tim 1.13 } finally {
331 dl 1.2 lock.unlock();
332     }
333 tim 1.6 }
334    
335     public void remove() {
336 dl 1.2 lock.lock();
337     try {
338     iter.remove();
339 tim 1.13 } finally {
340 dl 1.2 lock.unlock();
341     }
342 tim 1.6 }
343 tim 1.1 }
344    
345     }