ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.32
Committed: Tue May 17 06:44:59 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.31: +123 -63 lines
Log Message:
doc fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of
14 * <tt>Delayed</tt> elements, in which an element can only be taken
15 * when its delay has expired. The <em>head</em> of the queue is that
16 * <tt>Delayed</tt> element whose delay expired furthest in the
17 * past. If no delay has expired there is no head and <tt>poll</tt>
18 * will return <tt>null</tt>. Expiration occurs when an element's
19 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
20 * than or equal to zero. This queue does not permit <tt>null</tt>
21 * elements.
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces.
26 *
27 * <p>This class is a member of the
28 * <a href="{@docRoot}/../guide/collections/index.html">
29 * Java Collections Framework</a>.
30 *
31 * @since 1.5
32 * @author Doug Lea
33 * @param <E> the type of elements held in this collection
34 */
35
36 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
37 implements BlockingQueue<E> {
38
39 private transient final ReentrantLock lock = new ReentrantLock();
40 private transient final Condition available = lock.newCondition();
41 private final PriorityQueue<E> q = new PriorityQueue<E>();
42
43 /**
44 * Creates a new <tt>DelayQueue</tt> that is initially empty.
45 */
46 public DelayQueue() {}
47
48 /**
49 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
50 * given collection of {@link Delayed} instances.
51 *
52 * @param c the collection of elements to initially contain
53 * @throws NullPointerException if the specified collection or any
54 * of its elements are null
55 */
56 public DelayQueue(Collection<? extends E> c) {
57 this.addAll(c);
58 }
59
60 /**
61 * Inserts the specified element into this delay queue.
62 *
63 * @param e the element to add
64 * @return <tt>true</tt> (as per the spec for {@link Collection#add})
65 * @throws NullPointerException if the specified element is null
66 */
67 public boolean add(E e) {
68 return offer(e);
69 }
70
71 /**
72 * Inserts the specified element into this delay queue.
73 *
74 * @param e the element to add
75 * @return <tt>true</tt>
76 * @throws NullPointerException if the specified element is null
77 */
78 public boolean offer(E e) {
79 final ReentrantLock lock = this.lock;
80 lock.lock();
81 try {
82 E first = q.peek();
83 q.offer(e);
84 if (first == null || e.compareTo(first) < 0)
85 available.signalAll();
86 return true;
87 } finally {
88 lock.unlock();
89 }
90 }
91
92 /**
93 * Inserts the specified element into this delay queue. As the queue is
94 * unbounded this method will never block.
95 *
96 * @param e the element to add
97 * @throws NullPointerException {@inheritDoc}
98 */
99 public void put(E e) {
100 offer(e);
101 }
102
103 /**
104 * Inserts the specified element into this delay queue. As the queue is
105 * unbounded this method will never block.
106 *
107 * @param e the element to add
108 * @param timeout This parameter is ignored as the method never blocks
109 * @param unit This parameter is ignored as the method never blocks
110 * @return <tt>true</tt>
111 * @throws NullPointerException {@inheritDoc}
112 */
113 public boolean offer(E e, long timeout, TimeUnit unit) {
114 return offer(e);
115 }
116
117 /**
118 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
119 * if this queue has no elements with an expired delay.
120 *
121 * @return the head of this queue, or <tt>null</tt> if this
122 * queue has no elements with an expired delay
123 */
124 public E poll() {
125 final ReentrantLock lock = this.lock;
126 lock.lock();
127 try {
128 E first = q.peek();
129 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
130 return null;
131 else {
132 E x = q.poll();
133 assert x != null;
134 if (q.size() != 0)
135 available.signalAll();
136 return x;
137 }
138 } finally {
139 lock.unlock();
140 }
141 }
142
143 /**
144 * Retrieves and removes the head of this queue, waiting if necessary
145 * until an element with an expired delay is available on this queue.
146 *
147 * @return the head of this queue
148 * @throws InterruptedException {@inheritDoc}
149 */
150 public E take() throws InterruptedException {
151 final ReentrantLock lock = this.lock;
152 lock.lockInterruptibly();
153 try {
154 for (;;) {
155 E first = q.peek();
156 if (first == null) {
157 available.await();
158 } else {
159 long delay = first.getDelay(TimeUnit.NANOSECONDS);
160 if (delay > 0) {
161 long tl = available.awaitNanos(delay);
162 } else {
163 E x = q.poll();
164 assert x != null;
165 if (q.size() != 0)
166 available.signalAll(); // wake up other takers
167 return x;
168
169 }
170 }
171 }
172 } finally {
173 lock.unlock();
174 }
175 }
176
177 /**
178 * Retrieves and removes the head of this queue, waiting if necessary
179 * until an element with an expired delay is available on this queue,
180 * or the specified wait time expires.
181 *
182 * @return the head of this queue, or <tt>null</tt> if the
183 * specified waiting time elapses before an element with
184 * an expired delay becomes available
185 * @throws InterruptedException {@inheritDoc}
186 */
187 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
188 final ReentrantLock lock = this.lock;
189 lock.lockInterruptibly();
190 long nanos = unit.toNanos(timeout);
191 try {
192 for (;;) {
193 E first = q.peek();
194 if (first == null) {
195 if (nanos <= 0)
196 return null;
197 else
198 nanos = available.awaitNanos(nanos);
199 } else {
200 long delay = first.getDelay(TimeUnit.NANOSECONDS);
201 if (delay > 0) {
202 if (delay > nanos)
203 delay = nanos;
204 long timeLeft = available.awaitNanos(delay);
205 nanos -= delay - timeLeft;
206 } else {
207 E x = q.poll();
208 assert x != null;
209 if (q.size() != 0)
210 available.signalAll();
211 return x;
212 }
213 }
214 }
215 } finally {
216 lock.unlock();
217 }
218 }
219
220 /**
221 * Retrieves, but does not remove, the head of this queue,
222 * returning <tt>null</tt> if this queue has no elements with an
223 * expired delay.
224 *
225 * @return the head of this queue, or <tt>null</tt> if this
226 * queue has no elements with an expired delay
227 */
228 public E peek() {
229 final ReentrantLock lock = this.lock;
230 lock.lock();
231 try {
232 return q.peek();
233 } finally {
234 lock.unlock();
235 }
236 }
237
238 public int size() {
239 final ReentrantLock lock = this.lock;
240 lock.lock();
241 try {
242 return q.size();
243 } finally {
244 lock.unlock();
245 }
246 }
247
248 /**
249 * @throws UnsupportedOperationException {@inheritDoc}
250 * @throws ClassCastException {@inheritDoc}
251 * @throws NullPointerException {@inheritDoc}
252 * @throws IllegalArgumentException {@inheritDoc}
253 */
254 public int drainTo(Collection<? super E> c) {
255 if (c == null)
256 throw new NullPointerException();
257 if (c == this)
258 throw new IllegalArgumentException();
259 final ReentrantLock lock = this.lock;
260 lock.lock();
261 try {
262 int n = 0;
263 for (;;) {
264 E first = q.peek();
265 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
266 break;
267 c.add(q.poll());
268 ++n;
269 }
270 if (n > 0)
271 available.signalAll();
272 return n;
273 } finally {
274 lock.unlock();
275 }
276 }
277
278 /**
279 * @throws UnsupportedOperationException {@inheritDoc}
280 * @throws ClassCastException {@inheritDoc}
281 * @throws NullPointerException {@inheritDoc}
282 * @throws IllegalArgumentException {@inheritDoc}
283 */
284 public int drainTo(Collection<? super E> c, int maxElements) {
285 if (c == null)
286 throw new NullPointerException();
287 if (c == this)
288 throw new IllegalArgumentException();
289 if (maxElements <= 0)
290 return 0;
291 final ReentrantLock lock = this.lock;
292 lock.lock();
293 try {
294 int n = 0;
295 while (n < maxElements) {
296 E first = q.peek();
297 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
298 break;
299 c.add(q.poll());
300 ++n;
301 }
302 if (n > 0)
303 available.signalAll();
304 return n;
305 } finally {
306 lock.unlock();
307 }
308 }
309
310 /**
311 * Atomically removes all of the elements from this delay queue.
312 * The queue will be empty after this call returns.
313 * Elements with an unexpired delay are not waited for; they are
314 * simply discarded from the queue.
315 */
316 public void clear() {
317 final ReentrantLock lock = this.lock;
318 lock.lock();
319 try {
320 q.clear();
321 } finally {
322 lock.unlock();
323 }
324 }
325
326 /**
327 * Always returns <tt>Integer.MAX_VALUE</tt> because
328 * a <tt>DelayQueue</tt> is not capacity constrained.
329 *
330 * @return <tt>Integer.MAX_VALUE</tt>
331 */
332 public int remainingCapacity() {
333 return Integer.MAX_VALUE;
334 }
335
336 /**
337 * Returns an array containing all of the elements in this queue.
338 * The returned array elements are in no particular order.
339 *
340 * <p>The returned array will be "safe" in that no references to it are
341 * maintained by this queue. (In other words, this method must allocate
342 * a new array). The caller is thus free to modify the returned array.
343 *
344 * <p>This method acts as bridge between array-based and collection-based
345 * APIs.
346 *
347 * @return an array containing all of the elements in this queue
348 */
349 public Object[] toArray() {
350 final ReentrantLock lock = this.lock;
351 lock.lock();
352 try {
353 return q.toArray();
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 /**
360 * Returns an array containing all of the elements in this queue; the
361 * runtime type of the returned array is that of the specified array.
362 * The returned array elements are in no particular order.
363 * If the queue fits in the specified array, it is returned therein.
364 * Otherwise, a new array is allocated with the runtime type of the
365 * specified array and the size of this queue.
366 *
367 * <p>If this queue fits in the specified array with room to spare
368 * (i.e., the array has more elements than this queue), the element in
369 * the array immediately following the end of the queue is set to
370 * <tt>null</tt>.
371 *
372 * <p>Like the {@link #toArray()} method, this method acts as bridge between
373 * array-based and collection-based APIs. Further, this method allows
374 * precise control over the runtime type of the output array, and may,
375 * under certain circumstances, be used to save allocation costs.
376 *
377 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
378 * The following code can be used to dump the queue into a newly
379 * allocated array of <tt>String</tt>:
380 *
381 * <pre>
382 * String[] y = x.toArray(new String[0]);</pre>
383 *
384 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
385 * <tt>toArray()</tt>.
386 *
387 * @param a the array into which the elements of the queue are to
388 * be stored, if it is big enough; otherwise, a new array of the
389 * same runtime type is allocated for this purpose
390 * @return an array containing all of the elements in this queue
391 * @throws ArrayStoreException if the runtime type of the specified array
392 * is not a supertype of the runtime type of every element in
393 * this queue
394 * @throws NullPointerException if the specified array is null
395 */
396 public <T> T[] toArray(T[] a) {
397 final ReentrantLock lock = this.lock;
398 lock.lock();
399 try {
400 return q.toArray(a);
401 } finally {
402 lock.unlock();
403 }
404 }
405
406 /**
407 * Removes a single instance of the specified element from this
408 * queue, if it is present.
409 */
410 public boolean remove(Object o) {
411 final ReentrantLock lock = this.lock;
412 lock.lock();
413 try {
414 return q.remove(o);
415 } finally {
416 lock.unlock();
417 }
418 }
419
420 /**
421 * Returns an iterator over the elements in this queue. The iterator
422 * does not return the elements in any particular order. The
423 * returned iterator is a thread-safe "fast-fail" iterator that will
424 * throw {@link ConcurrentModificationException}
425 * upon detected interference.
426 *
427 * @return an iterator over the elements in this queue
428 */
429 public Iterator<E> iterator() {
430 final ReentrantLock lock = this.lock;
431 lock.lock();
432 try {
433 return new Itr(q.iterator());
434 } finally {
435 lock.unlock();
436 }
437 }
438
439 private class Itr<E> implements Iterator<E> {
440 private final Iterator<E> iter;
441 Itr(Iterator<E> i) {
442 iter = i;
443 }
444
445 public boolean hasNext() {
446 return iter.hasNext();
447 }
448
449 public E next() {
450 final ReentrantLock lock = DelayQueue.this.lock;
451 lock.lock();
452 try {
453 return iter.next();
454 } finally {
455 lock.unlock();
456 }
457 }
458
459 public void remove() {
460 final ReentrantLock lock = DelayQueue.this.lock;
461 lock.lock();
462 try {
463 iter.remove();
464 } finally {
465 lock.unlock();
466 }
467 }
468 }
469
470 }