ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.31
Committed: Tue May 3 04:53:52 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.30: +7 -7 lines
Log Message:
unexpired -> expired

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of
14 * <tt>Delayed</tt> elements, in which an element can only be taken
15 * when its delay has expired. The <em>head</em> of the queue is that
16 * <tt>Delayed</tt> element whose delay expired furthest in the
17 * past. If no delay has expired there is no head and <tt>poll</tt>
18 * will return <tt>null</tt>. Expiration occurs when an element's
19 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20 * than or equal to zero. This queue does not permit <tt>null</tt>
21 * elements.
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces.
26 *
27 * <p>This class is a member of the
28 * <a href="{@docRoot}/../guide/collections/index.html">
29 * Java Collections Framework</a>.
30 *
31 * @since 1.5
32 * @author Doug Lea
33 * @param <E> the type of elements held in this collection
34 */
35
36 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
37 implements BlockingQueue<E> {
38
39 private transient final ReentrantLock lock = new ReentrantLock();
40 private transient final Condition available = lock.newCondition();
41 private final PriorityQueue<E> q = new PriorityQueue<E>();
42
43 /**
44 * Creates a new <tt>DelayQueue</tt> that is initially empty.
45 */
46 public DelayQueue() {}
47
48 /**
49 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50 * given collection of {@link Delayed} instances.
51 *
52 * @param c the collection
53 * @throws NullPointerException if <tt>c</tt> or any element within it
54 * is <tt>null</tt>.
55 *
56 */
57 public DelayQueue(Collection<? extends E> c) {
58 this.addAll(c);
59 }
60
61 /**
62 * Inserts the specified element into this delay queue.
63 *
64 * @param e the element to add
65 * @return <tt>true</tt>
66 * @throws NullPointerException if the specified element is <tt>null</tt>.
67 */
68 public boolean offer(E e) {
69 final ReentrantLock lock = this.lock;
70 lock.lock();
71 try {
72 E first = q.peek();
73 q.offer(e);
74 if (first == null || e.compareTo(first) < 0)
75 available.signalAll();
76 return true;
77 } finally {
78 lock.unlock();
79 }
80 }
81
82
83 /**
84 * Adds the specified element to this delay queue. As the queue is
85 * unbounded this method will never block.
86 * @param e the element to add
87 * @throws NullPointerException if the specified element is <tt>null</tt>.
88 */
89 public void put(E e) {
90 offer(e);
91 }
92
93 /**
94 * Inserts the specified element into this delay queue. As the queue is
95 * unbounded this method will never block.
96 * @param e the element to add
97 * @param timeout This parameter is ignored as the method never blocks
98 * @param unit This parameter is ignored as the method never blocks
99 * @return <tt>true</tt>
100 * @throws NullPointerException if the specified element is <tt>null</tt>.
101 */
102 public boolean offer(E e, long timeout, TimeUnit unit) {
103 return offer(e);
104 }
105
106 /**
107 * Adds the specified element to this queue.
108 * @param e the element to add
109 * @return <tt>true</tt> (as per the general contract of
110 * <tt>Collection.add</tt>).
111 *
112 * @throws NullPointerException if the specified element is <tt>null</tt>.
113 */
114 public boolean add(E e) {
115 return offer(e);
116 }
117
118 /**
119 * Retrieves and removes the head of this queue, waiting
120 * if no elements with an expired delay are present on this queue.
121 * @return the head of this queue
122 * @throws InterruptedException if interrupted while waiting.
123 */
124 public E take() throws InterruptedException {
125 final ReentrantLock lock = this.lock;
126 lock.lockInterruptibly();
127 try {
128 for (;;) {
129 E first = q.peek();
130 if (first == null) {
131 available.await();
132 } else {
133 long delay = first.getDelay(TimeUnit.NANOSECONDS);
134 if (delay > 0) {
135 long tl = available.awaitNanos(delay);
136 } else {
137 E x = q.poll();
138 assert x != null;
139 if (q.size() != 0)
140 available.signalAll(); // wake up other takers
141 return x;
142
143 }
144 }
145 }
146 } finally {
147 lock.unlock();
148 }
149 }
150
151 /**
152 * Retrieves and removes the head of this queue, waiting
153 * if necessary up to the specified wait time if no elements with
154 * an expired delay are
155 * present on this queue.
156 * @param timeout how long to wait before giving up, in units of
157 * <tt>unit</tt>
158 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
159 * <tt>timeout</tt> parameter
160 * @return the head of this queue, or <tt>null</tt> if the
161 * specified waiting time elapses before an element with
162 * an expired delay is present.
163 * @throws InterruptedException if interrupted while waiting.
164 */
165 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
166 final ReentrantLock lock = this.lock;
167 lock.lockInterruptibly();
168 long nanos = unit.toNanos(timeout);
169 try {
170 for (;;) {
171 E first = q.peek();
172 if (first == null) {
173 if (nanos <= 0)
174 return null;
175 else
176 nanos = available.awaitNanos(nanos);
177 } else {
178 long delay = first.getDelay(TimeUnit.NANOSECONDS);
179 if (delay > 0) {
180 if (delay > nanos)
181 delay = nanos;
182 long timeLeft = available.awaitNanos(delay);
183 nanos -= delay - timeLeft;
184 } else {
185 E x = q.poll();
186 assert x != null;
187 if (q.size() != 0)
188 available.signalAll();
189 return x;
190 }
191 }
192 }
193 } finally {
194 lock.unlock();
195 }
196 }
197
198
199 /**
200 * Retrieves and removes the head of this queue, or <tt>null</tt>
201 * if this queue has no elements with an expired delay.
202 *
203 * @return the head of this queue, or <tt>null</tt> if this
204 * queue has no elements with an expired delay.
205 */
206 public E poll() {
207 final ReentrantLock lock = this.lock;
208 lock.lock();
209 try {
210 E first = q.peek();
211 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
212 return null;
213 else {
214 E x = q.poll();
215 assert x != null;
216 if (q.size() != 0)
217 available.signalAll();
218 return x;
219 }
220 } finally {
221 lock.unlock();
222 }
223 }
224
225 /**
226 * Retrieves, but does not remove, the head of this queue,
227 * returning <tt>null</tt> if this queue has no elements with an
228 * expired delay.
229 *
230 * @return the head of this queue, or <tt>null</tt> if this queue
231 * has no elements with an expired delay.
232 */
233 public E peek() {
234 final ReentrantLock lock = this.lock;
235 lock.lock();
236 try {
237 return q.peek();
238 } finally {
239 lock.unlock();
240 }
241 }
242
243 public int size() {
244 final ReentrantLock lock = this.lock;
245 lock.lock();
246 try {
247 return q.size();
248 } finally {
249 lock.unlock();
250 }
251 }
252
253 public int drainTo(Collection<? super E> c) {
254 if (c == null)
255 throw new NullPointerException();
256 if (c == this)
257 throw new IllegalArgumentException();
258 final ReentrantLock lock = this.lock;
259 lock.lock();
260 try {
261 int n = 0;
262 for (;;) {
263 E first = q.peek();
264 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
265 break;
266 c.add(q.poll());
267 ++n;
268 }
269 if (n > 0)
270 available.signalAll();
271 return n;
272 } finally {
273 lock.unlock();
274 }
275 }
276
277 public int drainTo(Collection<? super E> c, int maxElements) {
278 if (c == null)
279 throw new NullPointerException();
280 if (c == this)
281 throw new IllegalArgumentException();
282 if (maxElements <= 0)
283 return 0;
284 final ReentrantLock lock = this.lock;
285 lock.lock();
286 try {
287 int n = 0;
288 while (n < maxElements) {
289 E first = q.peek();
290 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
291 break;
292 c.add(q.poll());
293 ++n;
294 }
295 if (n > 0)
296 available.signalAll();
297 return n;
298 } finally {
299 lock.unlock();
300 }
301 }
302
303 /**
304 * Atomically removes all of the elements from this delay queue.
305 * The queue will be empty after this call returns.
306 */
307 public void clear() {
308 final ReentrantLock lock = this.lock;
309 lock.lock();
310 try {
311 q.clear();
312 } finally {
313 lock.unlock();
314 }
315 }
316
317 /**
318 * Always returns <tt>Integer.MAX_VALUE</tt> because
319 * a <tt>DelayQueue</tt> is not capacity constrained.
320 * @return <tt>Integer.MAX_VALUE</tt>
321 */
322 public int remainingCapacity() {
323 return Integer.MAX_VALUE;
324 }
325
326 public Object[] toArray() {
327 final ReentrantLock lock = this.lock;
328 lock.lock();
329 try {
330 return q.toArray();
331 } finally {
332 lock.unlock();
333 }
334 }
335
336 public <T> T[] toArray(T[] array) {
337 final ReentrantLock lock = this.lock;
338 lock.lock();
339 try {
340 return q.toArray(array);
341 } finally {
342 lock.unlock();
343 }
344 }
345
346 /**
347 * Removes a single instance of the specified element from this
348 * queue, if it is present.
349 */
350 public boolean remove(Object o) {
351 final ReentrantLock lock = this.lock;
352 lock.lock();
353 try {
354 return q.remove(o);
355 } finally {
356 lock.unlock();
357 }
358 }
359
360 /**
361 * Returns an iterator over the elements in this queue. The iterator
362 * does not return the elements in any particular order. The
363 * returned iterator is a thread-safe "fast-fail" iterator that will
364 * throw {@link ConcurrentModificationException}
365 * upon detected interference.
366 *
367 * @return an iterator over the elements in this queue.
368 */
369 public Iterator<E> iterator() {
370 final ReentrantLock lock = this.lock;
371 lock.lock();
372 try {
373 return new Itr(q.iterator());
374 } finally {
375 lock.unlock();
376 }
377 }
378
379 private class Itr<E> implements Iterator<E> {
380 private final Iterator<E> iter;
381 Itr(Iterator<E> i) {
382 iter = i;
383 }
384
385 public boolean hasNext() {
386 return iter.hasNext();
387 }
388
389 public E next() {
390 final ReentrantLock lock = DelayQueue.this.lock;
391 lock.lock();
392 try {
393 return iter.next();
394 } finally {
395 lock.unlock();
396 }
397 }
398
399 public void remove() {
400 final ReentrantLock lock = DelayQueue.this.lock;
401 lock.lock();
402 try {
403 iter.remove();
404 } finally {
405 lock.unlock();
406 }
407 }
408 }
409
410 }