ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.59
Committed: Tue Feb 21 01:54:03 2012 UTC (12 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.58: +1 -1 lines
Log Message:
use third person in javadoc first sentence

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.52 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.56 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9     import java.util.concurrent.locks.Condition;
10     import java.util.concurrent.locks.ReentrantLock;
11 tim 1.1 import java.util.*;
12    
13     /**
14 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
15     * <tt>Delayed</tt> elements, in which an element can only be taken
16     * when its delay has expired. The <em>head</em> of the queue is that
17     * <tt>Delayed</tt> element whose delay expired furthest in the
18 jsr166 1.32 * past. If no delay has expired there is no head and <tt>poll</tt>
19 dl 1.27 * will return <tt>null</tt>. Expiration occurs when an element's
20     * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
21 dl 1.36 * than or equal to zero. Even though unexpired elements cannot be
22     * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
23     * treated as normal elements. For example, the <tt>size</tt> method
24     * returns the count of both expired and unexpired elements.
25 jsr166 1.39 * This queue does not permit null elements.
26 dl 1.25 *
27 dl 1.26 * <p>This class and its iterator implement all of the
28     * <em>optional</em> methods of the {@link Collection} and {@link
29 dl 1.53 * Iterator} interfaces. The Iterator provided in method {@link
30     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
31     * the DelayQueue in any particular order.
32 dl 1.26 *
33 dl 1.25 * <p>This class is a member of the
34 jsr166 1.46 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
35 dl 1.25 * Java Collections Framework</a>.
36     *
37 dl 1.4 * @since 1.5
38     * @author Doug Lea
39 dl 1.20 * @param <E> the type of elements held in this collection
40 dl 1.19 */
41 tim 1.1
42 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
43     implements BlockingQueue<E> {
44 tim 1.6
45 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
46 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
47 tim 1.1
48 dl 1.4 /**
49 jsr166 1.48 * Thread designated to wait for the element at the head of
50     * the queue. This variant of the Leader-Follower pattern
51     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
52     * minimize unnecessary timed waiting. When a thread becomes
53     * the leader, it waits only for the next delay to elapse, but
54     * other threads await indefinitely. The leader thread must
55     * signal some other thread before returning from take() or
56     * poll(...), unless some other thread becomes leader in the
57     * interim. Whenever the head of the queue is replaced with
58     * an element with an earlier expiration time, the leader
59     * field is invalidated by being reset to null, and some
60     * waiting thread, but not necessarily the current leader, is
61     * signalled. So waiting threads must be prepared to acquire
62     * and lose leadership while waiting.
63     */
64     private Thread leader = null;
65    
66     /**
67     * Condition signalled when a newer element becomes available
68     * at the head of the queue or a new thread may need to
69     * become leader.
70     */
71     private final Condition available = lock.newCondition();
72    
73     /**
74 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
75 dl 1.4 */
76 tim 1.1 public DelayQueue() {}
77    
78 tim 1.6 /**
79 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
80 dholmes 1.7 * given collection of {@link Delayed} instances.
81     *
82 jsr166 1.32 * @param c the collection of elements to initially contain
83     * @throws NullPointerException if the specified collection or any
84     * of its elements are null
85 tim 1.6 */
86     public DelayQueue(Collection<? extends E> c) {
87     this.addAll(c);
88     }
89    
90 dholmes 1.7 /**
91 dl 1.16 * Inserts the specified element into this delay queue.
92 dholmes 1.7 *
93 jsr166 1.30 * @param e the element to add
94 jsr166 1.38 * @return <tt>true</tt> (as specified by {@link Collection#add})
95 jsr166 1.32 * @throws NullPointerException if the specified element is null
96     */
97     public boolean add(E e) {
98     return offer(e);
99     }
100    
101     /**
102     * Inserts the specified element into this delay queue.
103     *
104     * @param e the element to add
105 dholmes 1.7 * @return <tt>true</tt>
106 jsr166 1.32 * @throws NullPointerException if the specified element is null
107 dholmes 1.7 */
108 jsr166 1.30 public boolean offer(E e) {
109 dl 1.21 final ReentrantLock lock = this.lock;
110 dl 1.2 lock.lock();
111     try {
112 jsr166 1.30 q.offer(e);
113 jsr166 1.48 if (q.peek() == e) {
114 jsr166 1.49 leader = null;
115 jsr166 1.48 available.signal();
116 jsr166 1.49 }
117 dl 1.2 return true;
118 tim 1.13 } finally {
119 dl 1.2 lock.unlock();
120     }
121     }
122    
123 dholmes 1.7 /**
124 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
125 dholmes 1.7 * unbounded this method will never block.
126 jsr166 1.32 *
127 jsr166 1.30 * @param e the element to add
128 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
129 dholmes 1.7 */
130 jsr166 1.30 public void put(E e) {
131     offer(e);
132 dl 1.2 }
133    
134 dholmes 1.7 /**
135 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
136 dholmes 1.7 * unbounded this method will never block.
137 jsr166 1.32 *
138 jsr166 1.30 * @param e the element to add
139 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
140 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
141     * @return <tt>true</tt>
142 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
143 dholmes 1.7 */
144 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
145     return offer(e);
146 dl 1.2 }
147    
148 dholmes 1.7 /**
149 jsr166 1.32 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
150     * if this queue has no elements with an expired delay.
151 dholmes 1.7 *
152 jsr166 1.32 * @return the head of this queue, or <tt>null</tt> if this
153     * queue has no elements with an expired delay
154 dholmes 1.7 */
155 jsr166 1.32 public E poll() {
156     final ReentrantLock lock = this.lock;
157     lock.lock();
158     try {
159     E first = q.peek();
160 jsr166 1.56 if (first == null || first.getDelay(NANOSECONDS) > 0)
161 jsr166 1.32 return null;
162 jsr166 1.48 else
163     return q.poll();
164 jsr166 1.32 } finally {
165     lock.unlock();
166     }
167 dl 1.2 }
168    
169 dl 1.27 /**
170 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
171     * until an element with an expired delay is available on this queue.
172     *
173 dl 1.27 * @return the head of this queue
174 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
175 dl 1.27 */
176 dl 1.3 public E take() throws InterruptedException {
177 dl 1.21 final ReentrantLock lock = this.lock;
178 dl 1.2 lock.lockInterruptibly();
179     try {
180     for (;;) {
181 dl 1.3 E first = q.peek();
182 jsr166 1.48 if (first == null)
183 dl 1.4 available.await();
184 jsr166 1.49 else {
185 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
186 jsr166 1.49 if (delay <= 0)
187     return q.poll();
188     else if (leader != null)
189     available.await();
190     else {
191     Thread thisThread = Thread.currentThread();
192     leader = thisThread;
193     try {
194     available.awaitNanos(delay);
195     } finally {
196     if (leader == thisThread)
197     leader = null;
198     }
199 dl 1.2 }
200     }
201     }
202 tim 1.13 } finally {
203 jsr166 1.49 if (leader == null && q.peek() != null)
204     available.signal();
205 dl 1.2 lock.unlock();
206     }
207     }
208    
209 dl 1.27 /**
210 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
211     * until an element with an expired delay is available on this queue,
212     * or the specified wait time expires.
213     *
214 dl 1.27 * @return the head of this queue, or <tt>null</tt> if the
215 jsr166 1.32 * specified waiting time elapses before an element with
216     * an expired delay becomes available
217     * @throws InterruptedException {@inheritDoc}
218 dl 1.27 */
219     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
220 jsr166 1.41 long nanos = unit.toNanos(timeout);
221 dl 1.21 final ReentrantLock lock = this.lock;
222 dl 1.2 lock.lockInterruptibly();
223     try {
224     for (;;) {
225 dl 1.3 E first = q.peek();
226 dl 1.2 if (first == null) {
227     if (nanos <= 0)
228     return null;
229     else
230 dl 1.4 nanos = available.awaitNanos(nanos);
231 tim 1.13 } else {
232 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
233 jsr166 1.49 if (delay <= 0)
234     return q.poll();
235     if (nanos <= 0)
236     return null;
237     if (nanos < delay || leader != null)
238     nanos = available.awaitNanos(nanos);
239     else {
240     Thread thisThread = Thread.currentThread();
241     leader = thisThread;
242     try {
243     long timeLeft = available.awaitNanos(delay);
244     nanos -= delay - timeLeft;
245     } finally {
246     if (leader == thisThread)
247     leader = null;
248     }
249     }
250 dl 1.2 }
251     }
252 tim 1.13 } finally {
253 jsr166 1.49 if (leader == null && q.peek() != null)
254     available.signal();
255 dl 1.2 lock.unlock();
256     }
257     }
258    
259 dl 1.27 /**
260 dl 1.36 * Retrieves, but does not remove, the head of this queue, or
261     * returns <tt>null</tt> if this queue is empty. Unlike
262     * <tt>poll</tt>, if no expired elements are available in the queue,
263     * this method returns the element that will expire next,
264     * if one exists.
265 dl 1.27 *
266 jsr166 1.32 * @return the head of this queue, or <tt>null</tt> if this
267 dl 1.35 * queue is empty.
268 dl 1.27 */
269 dl 1.3 public E peek() {
270 dl 1.21 final ReentrantLock lock = this.lock;
271 dl 1.2 lock.lock();
272     try {
273     return q.peek();
274 tim 1.13 } finally {
275 dl 1.2 lock.unlock();
276     }
277 tim 1.1 }
278    
279 dl 1.2 public int size() {
280 dl 1.21 final ReentrantLock lock = this.lock;
281 dl 1.2 lock.lock();
282     try {
283     return q.size();
284 tim 1.13 } finally {
285 dl 1.2 lock.unlock();
286     }
287     }
288    
289 jsr166 1.32 /**
290 jsr166 1.59 * Returns first element only if it is expired.
291 jsr166 1.56 * Used only by drainTo. Call only when holding lock.
292     */
293     private E peekExpired() {
294     // assert lock.isHeldByCurrentThread();
295     E first = q.peek();
296     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
297     null : first;
298     }
299    
300     /**
301 jsr166 1.32 * @throws UnsupportedOperationException {@inheritDoc}
302     * @throws ClassCastException {@inheritDoc}
303     * @throws NullPointerException {@inheritDoc}
304     * @throws IllegalArgumentException {@inheritDoc}
305     */
306 dl 1.17 public int drainTo(Collection<? super E> c) {
307     if (c == null)
308     throw new NullPointerException();
309     if (c == this)
310     throw new IllegalArgumentException();
311 dl 1.21 final ReentrantLock lock = this.lock;
312 dl 1.17 lock.lock();
313     try {
314     int n = 0;
315 jsr166 1.56 for (E e; (e = peekExpired()) != null;) {
316     c.add(e); // In this order, in case add() throws.
317     q.poll();
318 dl 1.17 ++n;
319     }
320     return n;
321     } finally {
322     lock.unlock();
323     }
324     }
325    
326 jsr166 1.32 /**
327     * @throws UnsupportedOperationException {@inheritDoc}
328     * @throws ClassCastException {@inheritDoc}
329     * @throws NullPointerException {@inheritDoc}
330     * @throws IllegalArgumentException {@inheritDoc}
331     */
332 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
333     if (c == null)
334     throw new NullPointerException();
335     if (c == this)
336     throw new IllegalArgumentException();
337     if (maxElements <= 0)
338     return 0;
339 dl 1.21 final ReentrantLock lock = this.lock;
340 dl 1.17 lock.lock();
341     try {
342     int n = 0;
343 jsr166 1.56 for (E e; n < maxElements && (e = peekExpired()) != null;) {
344     c.add(e); // In this order, in case add() throws.
345     q.poll();
346 dl 1.17 ++n;
347     }
348     return n;
349     } finally {
350     lock.unlock();
351     }
352     }
353    
354 dholmes 1.7 /**
355     * Atomically removes all of the elements from this delay queue.
356     * The queue will be empty after this call returns.
357 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
358     * simply discarded from the queue.
359 dholmes 1.7 */
360 dl 1.2 public void clear() {
361 dl 1.21 final ReentrantLock lock = this.lock;
362 dl 1.2 lock.lock();
363     try {
364     q.clear();
365 tim 1.13 } finally {
366 dl 1.2 lock.unlock();
367     }
368     }
369 tim 1.1
370 dholmes 1.7 /**
371     * Always returns <tt>Integer.MAX_VALUE</tt> because
372     * a <tt>DelayQueue</tt> is not capacity constrained.
373 jsr166 1.32 *
374 dholmes 1.7 * @return <tt>Integer.MAX_VALUE</tt>
375     */
376 dl 1.2 public int remainingCapacity() {
377     return Integer.MAX_VALUE;
378 tim 1.1 }
379 dl 1.2
380 jsr166 1.32 /**
381     * Returns an array containing all of the elements in this queue.
382     * The returned array elements are in no particular order.
383     *
384     * <p>The returned array will be "safe" in that no references to it are
385     * maintained by this queue. (In other words, this method must allocate
386     * a new array). The caller is thus free to modify the returned array.
387 jsr166 1.33 *
388 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
389     * APIs.
390     *
391     * @return an array containing all of the elements in this queue
392     */
393 dl 1.2 public Object[] toArray() {
394 dl 1.21 final ReentrantLock lock = this.lock;
395 dl 1.2 lock.lock();
396     try {
397     return q.toArray();
398 tim 1.13 } finally {
399 dl 1.2 lock.unlock();
400     }
401 tim 1.1 }
402 dl 1.2
403 jsr166 1.32 /**
404     * Returns an array containing all of the elements in this queue; the
405     * runtime type of the returned array is that of the specified array.
406     * The returned array elements are in no particular order.
407     * If the queue fits in the specified array, it is returned therein.
408     * Otherwise, a new array is allocated with the runtime type of the
409     * specified array and the size of this queue.
410     *
411     * <p>If this queue fits in the specified array with room to spare
412     * (i.e., the array has more elements than this queue), the element in
413     * the array immediately following the end of the queue is set to
414     * <tt>null</tt>.
415     *
416     * <p>Like the {@link #toArray()} method, this method acts as bridge between
417     * array-based and collection-based APIs. Further, this method allows
418     * precise control over the runtime type of the output array, and may,
419     * under certain circumstances, be used to save allocation costs.
420     *
421 jsr166 1.37 * <p>The following code can be used to dump a delay queue into a newly
422     * allocated array of <tt>Delayed</tt>:
423 jsr166 1.32 *
424 jsr166 1.55 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
425 jsr166 1.32 *
426     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
427     * <tt>toArray()</tt>.
428     *
429     * @param a the array into which the elements of the queue are to
430     * be stored, if it is big enough; otherwise, a new array of the
431     * same runtime type is allocated for this purpose
432     * @return an array containing all of the elements in this queue
433     * @throws ArrayStoreException if the runtime type of the specified array
434     * is not a supertype of the runtime type of every element in
435     * this queue
436     * @throws NullPointerException if the specified array is null
437     */
438     public <T> T[] toArray(T[] a) {
439 dl 1.21 final ReentrantLock lock = this.lock;
440 dl 1.2 lock.lock();
441     try {
442 jsr166 1.32 return q.toArray(a);
443 tim 1.13 } finally {
444 dl 1.2 lock.unlock();
445     }
446 tim 1.1 }
447    
448 dl 1.26 /**
449     * Removes a single instance of the specified element from this
450 dl 1.36 * queue, if it is present, whether or not it has expired.
451 dl 1.26 */
452 dholmes 1.7 public boolean remove(Object o) {
453 dl 1.21 final ReentrantLock lock = this.lock;
454 dl 1.2 lock.lock();
455     try {
456 dholmes 1.7 return q.remove(o);
457 tim 1.13 } finally {
458 dl 1.2 lock.unlock();
459     }
460     }
461    
462 dholmes 1.7 /**
463 jsr166 1.57 * Identity-based version for use in Itr.remove
464     */
465     void removeEQ(Object o) {
466     final ReentrantLock lock = this.lock;
467     lock.lock();
468     try {
469     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
470     if (o == it.next()) {
471     it.remove();
472     break;
473     }
474     }
475     } finally {
476     lock.unlock();
477     }
478     }
479    
480     /**
481 dl 1.36 * Returns an iterator over all the elements (both expired and
482 dl 1.44 * unexpired) in this queue. The iterator does not return the
483 jsr166 1.51 * elements in any particular order.
484     *
485     * <p>The returned iterator is a "weakly consistent" iterator that
486     * will never throw {@link java.util.ConcurrentModificationException
487     * ConcurrentModificationException}, and guarantees to traverse
488     * elements as they existed upon construction of the iterator, and
489     * may (but is not guaranteed to) reflect any modifications
490     * subsequent to construction.
491 dholmes 1.7 *
492 jsr166 1.32 * @return an iterator over the elements in this queue
493 dholmes 1.7 */
494 dl 1.3 public Iterator<E> iterator() {
495 dl 1.44 return new Itr(toArray());
496 dl 1.2 }
497    
498 dl 1.42 /**
499     * Snapshot iterator that works off copy of underlying q array.
500     */
501 dl 1.44 private class Itr implements Iterator<E> {
502 dl 1.42 final Object[] array; // Array of all elements
503 jsr166 1.57 int cursor; // index of next element to return
504 jsr166 1.49 int lastRet; // index of last element, or -1 if no such
505 jsr166 1.43
506 dl 1.42 Itr(Object[] array) {
507     lastRet = -1;
508     this.array = array;
509 dl 1.2 }
510    
511 tim 1.6 public boolean hasNext() {
512 dl 1.42 return cursor < array.length;
513 tim 1.6 }
514    
515 jsr166 1.49 @SuppressWarnings("unchecked")
516 tim 1.6 public E next() {
517 dl 1.42 if (cursor >= array.length)
518     throw new NoSuchElementException();
519     lastRet = cursor;
520     return (E)array[cursor++];
521 tim 1.6 }
522    
523     public void remove() {
524 jsr166 1.43 if (lastRet < 0)
525 jsr166 1.49 throw new IllegalStateException();
526 jsr166 1.57 removeEQ(array[lastRet]);
527 dl 1.42 lastRet = -1;
528 tim 1.6 }
529 tim 1.1 }
530    
531     }