ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/DelayQueue.java
Revision: 1.7
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.6: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8 jsr166 1.7
9 dl 1.1 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.*;
13    
14     /**
15     * An unbounded {@linkplain BlockingQueue blocking queue} of
16 jsr166 1.2 * {@code Delayed} elements, in which an element can only be taken
17 dl 1.1 * when its delay has expired. The <em>head</em> of the queue is that
18 jsr166 1.2 * {@code Delayed} element whose delay expired furthest in the
19     * past. If no delay has expired there is no head and {@code poll}
20     * will return {@code null}. Expiration occurs when an element's
21     * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
22 dl 1.1 * than or equal to zero. Even though unexpired elements cannot be
23 jsr166 1.2 * removed using {@code take} or {@code poll}, they are otherwise
24     * treated as normal elements. For example, the {@code size} method
25 dl 1.1 * returns the count of both expired and unexpired elements.
26     * This queue does not permit null elements.
27     *
28     * <p>This class and its iterator implement all of the
29     * <em>optional</em> methods of the {@link Collection} and {@link
30     * Iterator} interfaces. The Iterator provided in method {@link
31     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
32     * the DelayQueue in any particular order.
33     *
34     * <p>This class is a member of the
35     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
36     * Java Collections Framework</a>.
37     *
38     * @since 1.5
39     * @author Doug Lea
40     * @param <E> the type of elements held in this collection
41     */
42     public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
43     implements BlockingQueue<E> {
44    
45 jsr166 1.3 private final transient ReentrantLock lock = new ReentrantLock();
46 dl 1.1 private final PriorityQueue<E> q = new PriorityQueue<E>();
47    
48     /**
49     * Thread designated to wait for the element at the head of
50     * the queue. This variant of the Leader-Follower pattern
51     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
52     * minimize unnecessary timed waiting. When a thread becomes
53     * the leader, it waits only for the next delay to elapse, but
54     * other threads await indefinitely. The leader thread must
55     * signal some other thread before returning from take() or
56     * poll(...), unless some other thread becomes leader in the
57     * interim. Whenever the head of the queue is replaced with
58     * an element with an earlier expiration time, the leader
59     * field is invalidated by being reset to null, and some
60     * waiting thread, but not necessarily the current leader, is
61     * signalled. So waiting threads must be prepared to acquire
62     * and lose leadership while waiting.
63     */
64 jsr166 1.6 private Thread leader;
65 dl 1.1
66     /**
67     * Condition signalled when a newer element becomes available
68     * at the head of the queue or a new thread may need to
69     * become leader.
70     */
71     private final Condition available = lock.newCondition();
72    
73     /**
74 jsr166 1.2 * Creates a new {@code DelayQueue} that is initially empty.
75 dl 1.1 */
76     public DelayQueue() {}
77    
78     /**
79 jsr166 1.2 * Creates a {@code DelayQueue} initially containing the elements of the
80 dl 1.1 * given collection of {@link Delayed} instances.
81     *
82     * @param c the collection of elements to initially contain
83     * @throws NullPointerException if the specified collection or any
84     * of its elements are null
85     */
86     public DelayQueue(Collection<? extends E> c) {
87     this.addAll(c);
88     }
89    
90     /**
91     * Inserts the specified element into this delay queue.
92     *
93     * @param e the element to add
94 jsr166 1.2 * @return {@code true} (as specified by {@link Collection#add})
95 dl 1.1 * @throws NullPointerException if the specified element is null
96     */
97     public boolean add(E e) {
98     return offer(e);
99     }
100    
101     /**
102     * Inserts the specified element into this delay queue.
103     *
104     * @param e the element to add
105 jsr166 1.2 * @return {@code true}
106 dl 1.1 * @throws NullPointerException if the specified element is null
107     */
108     public boolean offer(E e) {
109     final ReentrantLock lock = this.lock;
110     lock.lock();
111     try {
112     q.offer(e);
113     if (q.peek() == e) {
114     leader = null;
115     available.signal();
116     }
117     return true;
118     } finally {
119     lock.unlock();
120     }
121     }
122    
123     /**
124     * Inserts the specified element into this delay queue. As the queue is
125     * unbounded this method will never block.
126     *
127     * @param e the element to add
128     * @throws NullPointerException {@inheritDoc}
129     */
130     public void put(E e) {
131     offer(e);
132     }
133    
134     /**
135     * Inserts the specified element into this delay queue. As the queue is
136     * unbounded this method will never block.
137     *
138     * @param e the element to add
139     * @param timeout This parameter is ignored as the method never blocks
140     * @param unit This parameter is ignored as the method never blocks
141 jsr166 1.2 * @return {@code true}
142 dl 1.1 * @throws NullPointerException {@inheritDoc}
143     */
144     public boolean offer(E e, long timeout, TimeUnit unit) {
145     return offer(e);
146     }
147    
148     /**
149 jsr166 1.2 * Retrieves and removes the head of this queue, or returns {@code null}
150 dl 1.1 * if this queue has no elements with an expired delay.
151     *
152 jsr166 1.2 * @return the head of this queue, or {@code null} if this
153 dl 1.1 * queue has no elements with an expired delay
154     */
155     public E poll() {
156     final ReentrantLock lock = this.lock;
157     lock.lock();
158     try {
159     E first = q.peek();
160     if (first == null || first.getDelay(NANOSECONDS) > 0)
161     return null;
162     else
163     return q.poll();
164     } finally {
165     lock.unlock();
166     }
167     }
168    
169     /**
170     * Retrieves and removes the head of this queue, waiting if necessary
171     * until an element with an expired delay is available on this queue.
172     *
173     * @return the head of this queue
174     * @throws InterruptedException {@inheritDoc}
175     */
176     public E take() throws InterruptedException {
177     final ReentrantLock lock = this.lock;
178     lock.lockInterruptibly();
179     try {
180     for (;;) {
181     E first = q.peek();
182     if (first == null)
183     available.await();
184     else {
185     long delay = first.getDelay(NANOSECONDS);
186     if (delay <= 0)
187     return q.poll();
188 jsr166 1.5 first = null; // don't retain ref while waiting
189     if (leader != null)
190 dl 1.1 available.await();
191     else {
192     Thread thisThread = Thread.currentThread();
193     leader = thisThread;
194     try {
195     available.awaitNanos(delay);
196     } finally {
197     if (leader == thisThread)
198     leader = null;
199     }
200     }
201     }
202     }
203     } finally {
204     if (leader == null && q.peek() != null)
205     available.signal();
206     lock.unlock();
207     }
208     }
209    
210     /**
211     * Retrieves and removes the head of this queue, waiting if necessary
212     * until an element with an expired delay is available on this queue,
213     * or the specified wait time expires.
214     *
215 jsr166 1.2 * @return the head of this queue, or {@code null} if the
216 dl 1.1 * specified waiting time elapses before an element with
217     * an expired delay becomes available
218     * @throws InterruptedException {@inheritDoc}
219     */
220     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
221     long nanos = unit.toNanos(timeout);
222     final ReentrantLock lock = this.lock;
223     lock.lockInterruptibly();
224     try {
225     for (;;) {
226     E first = q.peek();
227     if (first == null) {
228     if (nanos <= 0)
229     return null;
230     else
231     nanos = available.awaitNanos(nanos);
232     } else {
233     long delay = first.getDelay(NANOSECONDS);
234     if (delay <= 0)
235     return q.poll();
236     if (nanos <= 0)
237     return null;
238 jsr166 1.5 first = null; // don't retain ref while waiting
239 dl 1.1 if (nanos < delay || leader != null)
240     nanos = available.awaitNanos(nanos);
241     else {
242     Thread thisThread = Thread.currentThread();
243     leader = thisThread;
244     try {
245     long timeLeft = available.awaitNanos(delay);
246     nanos -= delay - timeLeft;
247     } finally {
248     if (leader == thisThread)
249     leader = null;
250     }
251     }
252     }
253     }
254     } finally {
255     if (leader == null && q.peek() != null)
256     available.signal();
257     lock.unlock();
258     }
259     }
260    
261     /**
262     * Retrieves, but does not remove, the head of this queue, or
263 jsr166 1.2 * returns {@code null} if this queue is empty. Unlike
264     * {@code poll}, if no expired elements are available in the queue,
265 dl 1.1 * this method returns the element that will expire next,
266     * if one exists.
267     *
268 jsr166 1.2 * @return the head of this queue, or {@code null} if this
269 jsr166 1.4 * queue is empty
270 dl 1.1 */
271     public E peek() {
272     final ReentrantLock lock = this.lock;
273     lock.lock();
274     try {
275     return q.peek();
276     } finally {
277     lock.unlock();
278     }
279     }
280    
281     public int size() {
282     final ReentrantLock lock = this.lock;
283     lock.lock();
284     try {
285     return q.size();
286     } finally {
287     lock.unlock();
288     }
289     }
290    
291     /**
292     * Returns first element only if it is expired.
293     * Used only by drainTo. Call only when holding lock.
294     */
295     private E peekExpired() {
296     // assert lock.isHeldByCurrentThread();
297     E first = q.peek();
298     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
299     null : first;
300     }
301    
302     /**
303     * @throws UnsupportedOperationException {@inheritDoc}
304     * @throws ClassCastException {@inheritDoc}
305     * @throws NullPointerException {@inheritDoc}
306     * @throws IllegalArgumentException {@inheritDoc}
307     */
308     public int drainTo(Collection<? super E> c) {
309     if (c == null)
310     throw new NullPointerException();
311     if (c == this)
312     throw new IllegalArgumentException();
313     final ReentrantLock lock = this.lock;
314     lock.lock();
315     try {
316     int n = 0;
317     for (E e; (e = peekExpired()) != null;) {
318     c.add(e); // In this order, in case add() throws.
319     q.poll();
320     ++n;
321     }
322     return n;
323     } finally {
324     lock.unlock();
325     }
326     }
327    
328     /**
329     * @throws UnsupportedOperationException {@inheritDoc}
330     * @throws ClassCastException {@inheritDoc}
331     * @throws NullPointerException {@inheritDoc}
332     * @throws IllegalArgumentException {@inheritDoc}
333     */
334     public int drainTo(Collection<? super E> c, int maxElements) {
335     if (c == null)
336     throw new NullPointerException();
337     if (c == this)
338     throw new IllegalArgumentException();
339     if (maxElements <= 0)
340     return 0;
341     final ReentrantLock lock = this.lock;
342     lock.lock();
343     try {
344     int n = 0;
345     for (E e; n < maxElements && (e = peekExpired()) != null;) {
346     c.add(e); // In this order, in case add() throws.
347     q.poll();
348     ++n;
349     }
350     return n;
351     } finally {
352     lock.unlock();
353     }
354     }
355    
356     /**
357     * Atomically removes all of the elements from this delay queue.
358     * The queue will be empty after this call returns.
359     * Elements with an unexpired delay are not waited for; they are
360     * simply discarded from the queue.
361     */
362     public void clear() {
363     final ReentrantLock lock = this.lock;
364     lock.lock();
365     try {
366     q.clear();
367     } finally {
368     lock.unlock();
369     }
370     }
371    
372     /**
373 jsr166 1.2 * Always returns {@code Integer.MAX_VALUE} because
374     * a {@code DelayQueue} is not capacity constrained.
375 dl 1.1 *
376 jsr166 1.2 * @return {@code Integer.MAX_VALUE}
377 dl 1.1 */
378     public int remainingCapacity() {
379     return Integer.MAX_VALUE;
380     }
381    
382     /**
383     * Returns an array containing all of the elements in this queue.
384     * The returned array elements are in no particular order.
385     *
386     * <p>The returned array will be "safe" in that no references to it are
387     * maintained by this queue. (In other words, this method must allocate
388     * a new array). The caller is thus free to modify the returned array.
389     *
390     * <p>This method acts as bridge between array-based and collection-based
391     * APIs.
392     *
393     * @return an array containing all of the elements in this queue
394     */
395     public Object[] toArray() {
396     final ReentrantLock lock = this.lock;
397     lock.lock();
398     try {
399     return q.toArray();
400     } finally {
401     lock.unlock();
402     }
403     }
404    
405     /**
406     * Returns an array containing all of the elements in this queue; the
407     * runtime type of the returned array is that of the specified array.
408     * The returned array elements are in no particular order.
409     * If the queue fits in the specified array, it is returned therein.
410     * Otherwise, a new array is allocated with the runtime type of the
411     * specified array and the size of this queue.
412     *
413     * <p>If this queue fits in the specified array with room to spare
414     * (i.e., the array has more elements than this queue), the element in
415     * the array immediately following the end of the queue is set to
416 jsr166 1.2 * {@code null}.
417 dl 1.1 *
418     * <p>Like the {@link #toArray()} method, this method acts as bridge between
419     * array-based and collection-based APIs. Further, this method allows
420     * precise control over the runtime type of the output array, and may,
421     * under certain circumstances, be used to save allocation costs.
422     *
423     * <p>The following code can be used to dump a delay queue into a newly
424 jsr166 1.2 * allocated array of {@code Delayed}:
425 dl 1.1 *
426     * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
427     *
428 jsr166 1.2 * Note that {@code toArray(new Object[0])} is identical in function to
429     * {@code toArray()}.
430 dl 1.1 *
431     * @param a the array into which the elements of the queue are to
432     * be stored, if it is big enough; otherwise, a new array of the
433     * same runtime type is allocated for this purpose
434     * @return an array containing all of the elements in this queue
435     * @throws ArrayStoreException if the runtime type of the specified array
436     * is not a supertype of the runtime type of every element in
437     * this queue
438     * @throws NullPointerException if the specified array is null
439     */
440     public <T> T[] toArray(T[] a) {
441     final ReentrantLock lock = this.lock;
442     lock.lock();
443     try {
444     return q.toArray(a);
445     } finally {
446     lock.unlock();
447     }
448     }
449    
450     /**
451     * Removes a single instance of the specified element from this
452     * queue, if it is present, whether or not it has expired.
453     */
454     public boolean remove(Object o) {
455     final ReentrantLock lock = this.lock;
456     lock.lock();
457     try {
458     return q.remove(o);
459     } finally {
460     lock.unlock();
461     }
462     }
463    
464     /**
465     * Identity-based version for use in Itr.remove
466     */
467     void removeEQ(Object o) {
468     final ReentrantLock lock = this.lock;
469     lock.lock();
470     try {
471     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
472     if (o == it.next()) {
473     it.remove();
474     break;
475     }
476     }
477     } finally {
478     lock.unlock();
479     }
480     }
481    
482     /**
483     * Returns an iterator over all the elements (both expired and
484     * unexpired) in this queue. The iterator does not return the
485     * elements in any particular order.
486     *
487     * <p>The returned iterator is a "weakly consistent" iterator that
488     * will never throw {@link java.util.ConcurrentModificationException
489     * ConcurrentModificationException}, and guarantees to traverse
490     * elements as they existed upon construction of the iterator, and
491     * may (but is not guaranteed to) reflect any modifications
492     * subsequent to construction.
493     *
494     * @return an iterator over the elements in this queue
495     */
496     public Iterator<E> iterator() {
497     return new Itr(toArray());
498     }
499    
500     /**
501     * Snapshot iterator that works off copy of underlying q array.
502     */
503     private class Itr implements Iterator<E> {
504     final Object[] array; // Array of all elements
505     int cursor; // index of next element to return
506     int lastRet; // index of last element, or -1 if no such
507    
508     Itr(Object[] array) {
509     lastRet = -1;
510     this.array = array;
511     }
512    
513     public boolean hasNext() {
514     return cursor < array.length;
515     }
516    
517     @SuppressWarnings("unchecked")
518     public E next() {
519     if (cursor >= array.length)
520     throw new NoSuchElementException();
521     lastRet = cursor;
522     return (E)array[cursor++];
523     }
524    
525     public void remove() {
526     if (lastRet < 0)
527     throw new IllegalStateException();
528     removeEQ(array[lastRet]);
529     lastRet = -1;
530     }
531     }
532    
533     }