ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.49
Committed: Sun May 18 23:47:56 2008 UTC (16 years ago) by jsr166
Branch: MAIN
Changes since 1.48: +41 -41 lines
Log Message:
Sync with OpenJDK; untabify

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An unbounded {@linkplain BlockingQueue blocking queue} of
13 * <tt>Delayed</tt> elements, in which an element can only be taken
14 * when its delay has expired. The <em>head</em> of the queue is that
15 * <tt>Delayed</tt> element whose delay expired furthest in the
16 * past. If no delay has expired there is no head and <tt>poll</tt>
17 * will return <tt>null</tt>. Expiration occurs when an element's
18 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
19 * than or equal to zero. Even though unexpired elements cannot be
20 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
21 * treated as normal elements. For example, the <tt>size</tt> method
22 * returns the count of both expired and unexpired elements.
23 * This queue does not permit null elements.
24 *
25 * <p>This class and its iterator implement all of the
26 * <em>optional</em> methods of the {@link Collection} and {@link
27 * Iterator} interfaces.
28 *
29 * <p>This class is a member of the
30 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
31 * Java Collections Framework</a>.
32 *
33 * @since 1.5
34 * @author Doug Lea
35 * @param <E> the type of elements held in this collection
36 */
37
38 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
39 implements BlockingQueue<E> {
40
41 private transient final ReentrantLock lock = new ReentrantLock();
42 private final PriorityQueue<E> q = new PriorityQueue<E>();
43
44 /**
45 * Thread designated to wait for the element at the head of
46 * the queue. This variant of the Leader-Follower pattern
47 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
48 * minimize unnecessary timed waiting. When a thread becomes
49 * the leader, it waits only for the next delay to elapse, but
50 * other threads await indefinitely. The leader thread must
51 * signal some other thread before returning from take() or
52 * poll(...), unless some other thread becomes leader in the
53 * interim. Whenever the head of the queue is replaced with
54 * an element with an earlier expiration time, the leader
55 * field is invalidated by being reset to null, and some
56 * waiting thread, but not necessarily the current leader, is
57 * signalled. So waiting threads must be prepared to acquire
58 * and lose leadership while waiting.
59 */
60 private Thread leader = null;
61
62 /**
63 * Condition signalled when a newer element becomes available
64 * at the head of the queue or a new thread may need to
65 * become leader.
66 */
67 private final Condition available = lock.newCondition();
68
69 /**
70 * Creates a new <tt>DelayQueue</tt> that is initially empty.
71 */
72 public DelayQueue() {}
73
74 /**
75 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
76 * given collection of {@link Delayed} instances.
77 *
78 * @param c the collection of elements to initially contain
79 * @throws NullPointerException if the specified collection or any
80 * of its elements are null
81 */
82 public DelayQueue(Collection<? extends E> c) {
83 this.addAll(c);
84 }
85
86 /**
87 * Inserts the specified element into this delay queue.
88 *
89 * @param e the element to add
90 * @return <tt>true</tt> (as specified by {@link Collection#add})
91 * @throws NullPointerException if the specified element is null
92 */
93 public boolean add(E e) {
94 return offer(e);
95 }
96
97 /**
98 * Inserts the specified element into this delay queue.
99 *
100 * @param e the element to add
101 * @return <tt>true</tt>
102 * @throws NullPointerException if the specified element is null
103 */
104 public boolean offer(E e) {
105 final ReentrantLock lock = this.lock;
106 lock.lock();
107 try {
108 q.offer(e);
109 if (q.peek() == e) {
110 leader = null;
111 available.signal();
112 }
113 return true;
114 } finally {
115 lock.unlock();
116 }
117 }
118
119 /**
120 * Inserts the specified element into this delay queue. As the queue is
121 * unbounded this method will never block.
122 *
123 * @param e the element to add
124 * @throws NullPointerException {@inheritDoc}
125 */
126 public void put(E e) {
127 offer(e);
128 }
129
130 /**
131 * Inserts the specified element into this delay queue. As the queue is
132 * unbounded this method will never block.
133 *
134 * @param e the element to add
135 * @param timeout This parameter is ignored as the method never blocks
136 * @param unit This parameter is ignored as the method never blocks
137 * @return <tt>true</tt>
138 * @throws NullPointerException {@inheritDoc}
139 */
140 public boolean offer(E e, long timeout, TimeUnit unit) {
141 return offer(e);
142 }
143
144 /**
145 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
146 * if this queue has no elements with an expired delay.
147 *
148 * @return the head of this queue, or <tt>null</tt> if this
149 * queue has no elements with an expired delay
150 */
151 public E poll() {
152 final ReentrantLock lock = this.lock;
153 lock.lock();
154 try {
155 E first = q.peek();
156 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
157 return null;
158 else
159 return q.poll();
160 } finally {
161 lock.unlock();
162 }
163 }
164
165 /**
166 * Retrieves and removes the head of this queue, waiting if necessary
167 * until an element with an expired delay is available on this queue.
168 *
169 * @return the head of this queue
170 * @throws InterruptedException {@inheritDoc}
171 */
172 public E take() throws InterruptedException {
173 final ReentrantLock lock = this.lock;
174 lock.lockInterruptibly();
175 try {
176 for (;;) {
177 E first = q.peek();
178 if (first == null)
179 available.await();
180 else {
181 long delay = first.getDelay(TimeUnit.NANOSECONDS);
182 if (delay <= 0)
183 return q.poll();
184 else if (leader != null)
185 available.await();
186 else {
187 Thread thisThread = Thread.currentThread();
188 leader = thisThread;
189 try {
190 available.awaitNanos(delay);
191 } finally {
192 if (leader == thisThread)
193 leader = null;
194 }
195 }
196 }
197 }
198 } finally {
199 if (leader == null && q.peek() != null)
200 available.signal();
201 lock.unlock();
202 }
203 }
204
205 /**
206 * Retrieves and removes the head of this queue, waiting if necessary
207 * until an element with an expired delay is available on this queue,
208 * or the specified wait time expires.
209 *
210 * @return the head of this queue, or <tt>null</tt> if the
211 * specified waiting time elapses before an element with
212 * an expired delay becomes available
213 * @throws InterruptedException {@inheritDoc}
214 */
215 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
216 long nanos = unit.toNanos(timeout);
217 final ReentrantLock lock = this.lock;
218 lock.lockInterruptibly();
219 try {
220 for (;;) {
221 E first = q.peek();
222 if (first == null) {
223 if (nanos <= 0)
224 return null;
225 else
226 nanos = available.awaitNanos(nanos);
227 } else {
228 long delay = first.getDelay(TimeUnit.NANOSECONDS);
229 if (delay <= 0)
230 return q.poll();
231 if (nanos <= 0)
232 return null;
233 if (nanos < delay || leader != null)
234 nanos = available.awaitNanos(nanos);
235 else {
236 Thread thisThread = Thread.currentThread();
237 leader = thisThread;
238 try {
239 long timeLeft = available.awaitNanos(delay);
240 nanos -= delay - timeLeft;
241 } finally {
242 if (leader == thisThread)
243 leader = null;
244 }
245 }
246 }
247 }
248 } finally {
249 if (leader == null && q.peek() != null)
250 available.signal();
251 lock.unlock();
252 }
253 }
254
255 /**
256 * Retrieves, but does not remove, the head of this queue, or
257 * returns <tt>null</tt> if this queue is empty. Unlike
258 * <tt>poll</tt>, if no expired elements are available in the queue,
259 * this method returns the element that will expire next,
260 * if one exists.
261 *
262 * @return the head of this queue, or <tt>null</tt> if this
263 * queue is empty.
264 */
265 public E peek() {
266 final ReentrantLock lock = this.lock;
267 lock.lock();
268 try {
269 return q.peek();
270 } finally {
271 lock.unlock();
272 }
273 }
274
275 public int size() {
276 final ReentrantLock lock = this.lock;
277 lock.lock();
278 try {
279 return q.size();
280 } finally {
281 lock.unlock();
282 }
283 }
284
285 /**
286 * @throws UnsupportedOperationException {@inheritDoc}
287 * @throws ClassCastException {@inheritDoc}
288 * @throws NullPointerException {@inheritDoc}
289 * @throws IllegalArgumentException {@inheritDoc}
290 */
291 public int drainTo(Collection<? super E> c) {
292 if (c == null)
293 throw new NullPointerException();
294 if (c == this)
295 throw new IllegalArgumentException();
296 final ReentrantLock lock = this.lock;
297 lock.lock();
298 try {
299 int n = 0;
300 for (;;) {
301 E first = q.peek();
302 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
303 break;
304 c.add(q.poll());
305 ++n;
306 }
307 return n;
308 } finally {
309 lock.unlock();
310 }
311 }
312
313 /**
314 * @throws UnsupportedOperationException {@inheritDoc}
315 * @throws ClassCastException {@inheritDoc}
316 * @throws NullPointerException {@inheritDoc}
317 * @throws IllegalArgumentException {@inheritDoc}
318 */
319 public int drainTo(Collection<? super E> c, int maxElements) {
320 if (c == null)
321 throw new NullPointerException();
322 if (c == this)
323 throw new IllegalArgumentException();
324 if (maxElements <= 0)
325 return 0;
326 final ReentrantLock lock = this.lock;
327 lock.lock();
328 try {
329 int n = 0;
330 while (n < maxElements) {
331 E first = q.peek();
332 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
333 break;
334 c.add(q.poll());
335 ++n;
336 }
337 return n;
338 } finally {
339 lock.unlock();
340 }
341 }
342
343 /**
344 * Atomically removes all of the elements from this delay queue.
345 * The queue will be empty after this call returns.
346 * Elements with an unexpired delay are not waited for; they are
347 * simply discarded from the queue.
348 */
349 public void clear() {
350 final ReentrantLock lock = this.lock;
351 lock.lock();
352 try {
353 q.clear();
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 /**
360 * Always returns <tt>Integer.MAX_VALUE</tt> because
361 * a <tt>DelayQueue</tt> is not capacity constrained.
362 *
363 * @return <tt>Integer.MAX_VALUE</tt>
364 */
365 public int remainingCapacity() {
366 return Integer.MAX_VALUE;
367 }
368
369 /**
370 * Returns an array containing all of the elements in this queue.
371 * The returned array elements are in no particular order.
372 *
373 * <p>The returned array will be "safe" in that no references to it are
374 * maintained by this queue. (In other words, this method must allocate
375 * a new array). The caller is thus free to modify the returned array.
376 *
377 * <p>This method acts as bridge between array-based and collection-based
378 * APIs.
379 *
380 * @return an array containing all of the elements in this queue
381 */
382 public Object[] toArray() {
383 final ReentrantLock lock = this.lock;
384 lock.lock();
385 try {
386 return q.toArray();
387 } finally {
388 lock.unlock();
389 }
390 }
391
392 /**
393 * Returns an array containing all of the elements in this queue; the
394 * runtime type of the returned array is that of the specified array.
395 * The returned array elements are in no particular order.
396 * If the queue fits in the specified array, it is returned therein.
397 * Otherwise, a new array is allocated with the runtime type of the
398 * specified array and the size of this queue.
399 *
400 * <p>If this queue fits in the specified array with room to spare
401 * (i.e., the array has more elements than this queue), the element in
402 * the array immediately following the end of the queue is set to
403 * <tt>null</tt>.
404 *
405 * <p>Like the {@link #toArray()} method, this method acts as bridge between
406 * array-based and collection-based APIs. Further, this method allows
407 * precise control over the runtime type of the output array, and may,
408 * under certain circumstances, be used to save allocation costs.
409 *
410 * <p>The following code can be used to dump a delay queue into a newly
411 * allocated array of <tt>Delayed</tt>:
412 *
413 * <pre>
414 * Delayed[] a = q.toArray(new Delayed[0]);</pre>
415 *
416 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
417 * <tt>toArray()</tt>.
418 *
419 * @param a the array into which the elements of the queue are to
420 * be stored, if it is big enough; otherwise, a new array of the
421 * same runtime type is allocated for this purpose
422 * @return an array containing all of the elements in this queue
423 * @throws ArrayStoreException if the runtime type of the specified array
424 * is not a supertype of the runtime type of every element in
425 * this queue
426 * @throws NullPointerException if the specified array is null
427 */
428 public <T> T[] toArray(T[] a) {
429 final ReentrantLock lock = this.lock;
430 lock.lock();
431 try {
432 return q.toArray(a);
433 } finally {
434 lock.unlock();
435 }
436 }
437
438 /**
439 * Removes a single instance of the specified element from this
440 * queue, if it is present, whether or not it has expired.
441 */
442 public boolean remove(Object o) {
443 final ReentrantLock lock = this.lock;
444 lock.lock();
445 try {
446 return q.remove(o);
447 } finally {
448 lock.unlock();
449 }
450 }
451
452 /**
453 * Returns an iterator over all the elements (both expired and
454 * unexpired) in this queue. The iterator does not return the
455 * elements in any particular order. The returned
456 * <tt>Iterator</tt> is a "weakly consistent" iterator that will
457 * never throw {@link ConcurrentModificationException}, and
458 * guarantees to traverse elements as they existed upon
459 * construction of the iterator, and may (but is not guaranteed
460 * to) reflect any modifications subsequent to construction.
461 *
462 * @return an iterator over the elements in this queue
463 */
464 public Iterator<E> iterator() {
465 return new Itr(toArray());
466 }
467
468 /**
469 * Snapshot iterator that works off copy of underlying q array.
470 */
471 private class Itr implements Iterator<E> {
472 final Object[] array; // Array of all elements
473 int cursor; // index of next element to return;
474 int lastRet; // index of last element, or -1 if no such
475
476 Itr(Object[] array) {
477 lastRet = -1;
478 this.array = array;
479 }
480
481 public boolean hasNext() {
482 return cursor < array.length;
483 }
484
485 @SuppressWarnings("unchecked")
486 public E next() {
487 if (cursor >= array.length)
488 throw new NoSuchElementException();
489 lastRet = cursor;
490 return (E)array[cursor++];
491 }
492
493 public void remove() {
494 if (lastRet < 0)
495 throw new IllegalStateException();
496 Object x = array[lastRet];
497 lastRet = -1;
498 // Traverse underlying queue to find == element,
499 // not just a .equals element.
500 lock.lock();
501 try {
502 for (Iterator it = q.iterator(); it.hasNext(); ) {
503 if (it.next() == x) {
504 it.remove();
505 return;
506 }
507 }
508 } finally {
509 lock.unlock();
510 }
511 }
512 }
513
514 }