ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.69
Committed: Wed Dec 31 07:54:13 2014 UTC (9 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.68: +3 -2 lines
Log Message:
standardize import statement order

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.52 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.68
9 jsr166 1.56 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 jsr166 1.69
11 jsr166 1.68 import java.util.AbstractQueue;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15     import java.util.PriorityQueue;
16 jsr166 1.69 import java.util.concurrent.locks.Condition;
17     import java.util.concurrent.locks.ReentrantLock;
18 tim 1.1
19     /**
20 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
21 jsr166 1.61 * {@code Delayed} elements, in which an element can only be taken
22 dl 1.27 * when its delay has expired. The <em>head</em> of the queue is that
23 jsr166 1.61 * {@code Delayed} element whose delay expired furthest in the
24     * past. If no delay has expired there is no head and {@code poll}
25     * will return {@code null}. Expiration occurs when an element's
26     * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
27 dl 1.36 * than or equal to zero. Even though unexpired elements cannot be
28 jsr166 1.61 * removed using {@code take} or {@code poll}, they are otherwise
29     * treated as normal elements. For example, the {@code size} method
30 dl 1.36 * returns the count of both expired and unexpired elements.
31 jsr166 1.39 * This queue does not permit null elements.
32 dl 1.25 *
33 dl 1.26 * <p>This class and its iterator implement all of the
34     * <em>optional</em> methods of the {@link Collection} and {@link
35 dl 1.53 * Iterator} interfaces. The Iterator provided in method {@link
36     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
37     * the DelayQueue in any particular order.
38 dl 1.26 *
39 dl 1.25 * <p>This class is a member of the
40 jsr166 1.46 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
41 dl 1.25 * Java Collections Framework</a>.
42     *
43 dl 1.4 * @since 1.5
44     * @author Doug Lea
45 jsr166 1.67 * @param <E> the type of elements held in this queue
46 dl 1.19 */
47 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
48     implements BlockingQueue<E> {
49 tim 1.6
50 jsr166 1.62 private final transient ReentrantLock lock = new ReentrantLock();
51 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
52 tim 1.1
53 dl 1.4 /**
54 jsr166 1.48 * Thread designated to wait for the element at the head of
55     * the queue. This variant of the Leader-Follower pattern
56     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
57     * minimize unnecessary timed waiting. When a thread becomes
58     * the leader, it waits only for the next delay to elapse, but
59     * other threads await indefinitely. The leader thread must
60     * signal some other thread before returning from take() or
61     * poll(...), unless some other thread becomes leader in the
62     * interim. Whenever the head of the queue is replaced with
63     * an element with an earlier expiration time, the leader
64     * field is invalidated by being reset to null, and some
65     * waiting thread, but not necessarily the current leader, is
66     * signalled. So waiting threads must be prepared to acquire
67     * and lose leadership while waiting.
68     */
69 jsr166 1.66 private Thread leader;
70 jsr166 1.48
71     /**
72     * Condition signalled when a newer element becomes available
73     * at the head of the queue or a new thread may need to
74     * become leader.
75     */
76     private final Condition available = lock.newCondition();
77    
78     /**
79 jsr166 1.61 * Creates a new {@code DelayQueue} that is initially empty.
80 dl 1.4 */
81 tim 1.1 public DelayQueue() {}
82    
83 tim 1.6 /**
84 jsr166 1.61 * Creates a {@code DelayQueue} initially containing the elements of the
85 dholmes 1.7 * given collection of {@link Delayed} instances.
86     *
87 jsr166 1.32 * @param c the collection of elements to initially contain
88     * @throws NullPointerException if the specified collection or any
89     * of its elements are null
90 tim 1.6 */
91     public DelayQueue(Collection<? extends E> c) {
92     this.addAll(c);
93     }
94    
95 dholmes 1.7 /**
96 dl 1.16 * Inserts the specified element into this delay queue.
97 dholmes 1.7 *
98 jsr166 1.30 * @param e the element to add
99 jsr166 1.61 * @return {@code true} (as specified by {@link Collection#add})
100 jsr166 1.32 * @throws NullPointerException if the specified element is null
101     */
102     public boolean add(E e) {
103     return offer(e);
104     }
105    
106     /**
107     * Inserts the specified element into this delay queue.
108     *
109     * @param e the element to add
110 jsr166 1.61 * @return {@code true}
111 jsr166 1.32 * @throws NullPointerException if the specified element is null
112 dholmes 1.7 */
113 jsr166 1.30 public boolean offer(E e) {
114 dl 1.21 final ReentrantLock lock = this.lock;
115 dl 1.2 lock.lock();
116     try {
117 jsr166 1.30 q.offer(e);
118 jsr166 1.48 if (q.peek() == e) {
119 jsr166 1.49 leader = null;
120 jsr166 1.48 available.signal();
121 jsr166 1.49 }
122 dl 1.2 return true;
123 tim 1.13 } finally {
124 dl 1.2 lock.unlock();
125     }
126     }
127    
128 dholmes 1.7 /**
129 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
130 dholmes 1.7 * unbounded this method will never block.
131 jsr166 1.32 *
132 jsr166 1.30 * @param e the element to add
133 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
134 dholmes 1.7 */
135 jsr166 1.30 public void put(E e) {
136     offer(e);
137 dl 1.2 }
138    
139 dholmes 1.7 /**
140 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
141 dholmes 1.7 * unbounded this method will never block.
142 jsr166 1.32 *
143 jsr166 1.30 * @param e the element to add
144 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
145 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
146 jsr166 1.61 * @return {@code true}
147 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
148 dholmes 1.7 */
149 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
150     return offer(e);
151 dl 1.2 }
152    
153 dholmes 1.7 /**
154 jsr166 1.61 * Retrieves and removes the head of this queue, or returns {@code null}
155 jsr166 1.32 * if this queue has no elements with an expired delay.
156 dholmes 1.7 *
157 jsr166 1.61 * @return the head of this queue, or {@code null} if this
158 jsr166 1.32 * queue has no elements with an expired delay
159 dholmes 1.7 */
160 jsr166 1.32 public E poll() {
161     final ReentrantLock lock = this.lock;
162     lock.lock();
163     try {
164     E first = q.peek();
165 jsr166 1.56 if (first == null || first.getDelay(NANOSECONDS) > 0)
166 jsr166 1.32 return null;
167 jsr166 1.48 else
168     return q.poll();
169 jsr166 1.32 } finally {
170     lock.unlock();
171     }
172 dl 1.2 }
173    
174 dl 1.27 /**
175 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
176     * until an element with an expired delay is available on this queue.
177     *
178 dl 1.27 * @return the head of this queue
179 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
180 dl 1.27 */
181 dl 1.3 public E take() throws InterruptedException {
182 dl 1.21 final ReentrantLock lock = this.lock;
183 dl 1.2 lock.lockInterruptibly();
184     try {
185     for (;;) {
186 dl 1.3 E first = q.peek();
187 jsr166 1.48 if (first == null)
188 dl 1.4 available.await();
189 jsr166 1.49 else {
190 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
191 jsr166 1.49 if (delay <= 0)
192     return q.poll();
193 jsr166 1.64 first = null; // don't retain ref while waiting
194     if (leader != null)
195 jsr166 1.49 available.await();
196     else {
197     Thread thisThread = Thread.currentThread();
198     leader = thisThread;
199     try {
200     available.awaitNanos(delay);
201     } finally {
202     if (leader == thisThread)
203     leader = null;
204     }
205 dl 1.2 }
206     }
207     }
208 tim 1.13 } finally {
209 jsr166 1.49 if (leader == null && q.peek() != null)
210     available.signal();
211 dl 1.2 lock.unlock();
212     }
213     }
214    
215 dl 1.27 /**
216 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
217     * until an element with an expired delay is available on this queue,
218     * or the specified wait time expires.
219     *
220 jsr166 1.61 * @return the head of this queue, or {@code null} if the
221 jsr166 1.32 * specified waiting time elapses before an element with
222     * an expired delay becomes available
223     * @throws InterruptedException {@inheritDoc}
224 dl 1.27 */
225     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
226 jsr166 1.41 long nanos = unit.toNanos(timeout);
227 dl 1.21 final ReentrantLock lock = this.lock;
228 dl 1.2 lock.lockInterruptibly();
229     try {
230     for (;;) {
231 dl 1.3 E first = q.peek();
232 dl 1.2 if (first == null) {
233     if (nanos <= 0)
234     return null;
235     else
236 dl 1.4 nanos = available.awaitNanos(nanos);
237 tim 1.13 } else {
238 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
239 jsr166 1.49 if (delay <= 0)
240     return q.poll();
241     if (nanos <= 0)
242     return null;
243 jsr166 1.64 first = null; // don't retain ref while waiting
244 jsr166 1.49 if (nanos < delay || leader != null)
245     nanos = available.awaitNanos(nanos);
246     else {
247     Thread thisThread = Thread.currentThread();
248     leader = thisThread;
249     try {
250     long timeLeft = available.awaitNanos(delay);
251     nanos -= delay - timeLeft;
252     } finally {
253     if (leader == thisThread)
254     leader = null;
255     }
256     }
257 dl 1.2 }
258     }
259 tim 1.13 } finally {
260 jsr166 1.49 if (leader == null && q.peek() != null)
261     available.signal();
262 dl 1.2 lock.unlock();
263     }
264     }
265    
266 dl 1.27 /**
267 dl 1.36 * Retrieves, but does not remove, the head of this queue, or
268 jsr166 1.61 * returns {@code null} if this queue is empty. Unlike
269     * {@code poll}, if no expired elements are available in the queue,
270 dl 1.36 * this method returns the element that will expire next,
271     * if one exists.
272 dl 1.27 *
273 jsr166 1.61 * @return the head of this queue, or {@code null} if this
274 jsr166 1.63 * queue is empty
275 dl 1.27 */
276 dl 1.3 public E peek() {
277 dl 1.21 final ReentrantLock lock = this.lock;
278 dl 1.2 lock.lock();
279     try {
280     return q.peek();
281 tim 1.13 } finally {
282 dl 1.2 lock.unlock();
283     }
284 tim 1.1 }
285    
286 dl 1.2 public int size() {
287 dl 1.21 final ReentrantLock lock = this.lock;
288 dl 1.2 lock.lock();
289     try {
290     return q.size();
291 tim 1.13 } finally {
292 dl 1.2 lock.unlock();
293     }
294     }
295    
296 jsr166 1.32 /**
297 jsr166 1.59 * Returns first element only if it is expired.
298 jsr166 1.56 * Used only by drainTo. Call only when holding lock.
299     */
300     private E peekExpired() {
301     // assert lock.isHeldByCurrentThread();
302     E first = q.peek();
303     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
304     null : first;
305     }
306    
307     /**
308 jsr166 1.32 * @throws UnsupportedOperationException {@inheritDoc}
309     * @throws ClassCastException {@inheritDoc}
310     * @throws NullPointerException {@inheritDoc}
311     * @throws IllegalArgumentException {@inheritDoc}
312     */
313 dl 1.17 public int drainTo(Collection<? super E> c) {
314     if (c == null)
315     throw new NullPointerException();
316     if (c == this)
317     throw new IllegalArgumentException();
318 dl 1.21 final ReentrantLock lock = this.lock;
319 dl 1.17 lock.lock();
320     try {
321     int n = 0;
322 jsr166 1.56 for (E e; (e = peekExpired()) != null;) {
323     c.add(e); // In this order, in case add() throws.
324     q.poll();
325 dl 1.17 ++n;
326     }
327     return n;
328     } finally {
329     lock.unlock();
330     }
331     }
332    
333 jsr166 1.32 /**
334     * @throws UnsupportedOperationException {@inheritDoc}
335     * @throws ClassCastException {@inheritDoc}
336     * @throws NullPointerException {@inheritDoc}
337     * @throws IllegalArgumentException {@inheritDoc}
338     */
339 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
340     if (c == null)
341     throw new NullPointerException();
342     if (c == this)
343     throw new IllegalArgumentException();
344     if (maxElements <= 0)
345     return 0;
346 dl 1.21 final ReentrantLock lock = this.lock;
347 dl 1.17 lock.lock();
348     try {
349     int n = 0;
350 jsr166 1.56 for (E e; n < maxElements && (e = peekExpired()) != null;) {
351     c.add(e); // In this order, in case add() throws.
352     q.poll();
353 dl 1.17 ++n;
354     }
355     return n;
356     } finally {
357     lock.unlock();
358     }
359     }
360    
361 dholmes 1.7 /**
362     * Atomically removes all of the elements from this delay queue.
363     * The queue will be empty after this call returns.
364 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
365     * simply discarded from the queue.
366 dholmes 1.7 */
367 dl 1.2 public void clear() {
368 dl 1.21 final ReentrantLock lock = this.lock;
369 dl 1.2 lock.lock();
370     try {
371     q.clear();
372 tim 1.13 } finally {
373 dl 1.2 lock.unlock();
374     }
375     }
376 tim 1.1
377 dholmes 1.7 /**
378 jsr166 1.61 * Always returns {@code Integer.MAX_VALUE} because
379     * a {@code DelayQueue} is not capacity constrained.
380 jsr166 1.32 *
381 jsr166 1.61 * @return {@code Integer.MAX_VALUE}
382 dholmes 1.7 */
383 dl 1.2 public int remainingCapacity() {
384     return Integer.MAX_VALUE;
385 tim 1.1 }
386 dl 1.2
387 jsr166 1.32 /**
388     * Returns an array containing all of the elements in this queue.
389     * The returned array elements are in no particular order.
390     *
391     * <p>The returned array will be "safe" in that no references to it are
392     * maintained by this queue. (In other words, this method must allocate
393     * a new array). The caller is thus free to modify the returned array.
394 jsr166 1.33 *
395 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
396     * APIs.
397     *
398     * @return an array containing all of the elements in this queue
399     */
400 dl 1.2 public Object[] toArray() {
401 dl 1.21 final ReentrantLock lock = this.lock;
402 dl 1.2 lock.lock();
403     try {
404     return q.toArray();
405 tim 1.13 } finally {
406 dl 1.2 lock.unlock();
407     }
408 tim 1.1 }
409 dl 1.2
410 jsr166 1.32 /**
411     * Returns an array containing all of the elements in this queue; the
412     * runtime type of the returned array is that of the specified array.
413     * The returned array elements are in no particular order.
414     * If the queue fits in the specified array, it is returned therein.
415     * Otherwise, a new array is allocated with the runtime type of the
416     * specified array and the size of this queue.
417     *
418     * <p>If this queue fits in the specified array with room to spare
419     * (i.e., the array has more elements than this queue), the element in
420     * the array immediately following the end of the queue is set to
421 jsr166 1.61 * {@code null}.
422 jsr166 1.32 *
423     * <p>Like the {@link #toArray()} method, this method acts as bridge between
424     * array-based and collection-based APIs. Further, this method allows
425     * precise control over the runtime type of the output array, and may,
426     * under certain circumstances, be used to save allocation costs.
427     *
428 jsr166 1.37 * <p>The following code can be used to dump a delay queue into a newly
429 jsr166 1.61 * allocated array of {@code Delayed}:
430 jsr166 1.32 *
431 jsr166 1.55 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
432 jsr166 1.32 *
433 jsr166 1.61 * Note that {@code toArray(new Object[0])} is identical in function to
434     * {@code toArray()}.
435 jsr166 1.32 *
436     * @param a the array into which the elements of the queue are to
437     * be stored, if it is big enough; otherwise, a new array of the
438     * same runtime type is allocated for this purpose
439     * @return an array containing all of the elements in this queue
440     * @throws ArrayStoreException if the runtime type of the specified array
441     * is not a supertype of the runtime type of every element in
442     * this queue
443     * @throws NullPointerException if the specified array is null
444     */
445     public <T> T[] toArray(T[] a) {
446 dl 1.21 final ReentrantLock lock = this.lock;
447 dl 1.2 lock.lock();
448     try {
449 jsr166 1.32 return q.toArray(a);
450 tim 1.13 } finally {
451 dl 1.2 lock.unlock();
452     }
453 tim 1.1 }
454    
455 dl 1.26 /**
456     * Removes a single instance of the specified element from this
457 dl 1.36 * queue, if it is present, whether or not it has expired.
458 dl 1.26 */
459 dholmes 1.7 public boolean remove(Object o) {
460 dl 1.21 final ReentrantLock lock = this.lock;
461 dl 1.2 lock.lock();
462     try {
463 dholmes 1.7 return q.remove(o);
464 tim 1.13 } finally {
465 dl 1.2 lock.unlock();
466     }
467     }
468    
469 dholmes 1.7 /**
470 jsr166 1.57 * Identity-based version for use in Itr.remove
471     */
472     void removeEQ(Object o) {
473     final ReentrantLock lock = this.lock;
474     lock.lock();
475     try {
476     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
477     if (o == it.next()) {
478     it.remove();
479     break;
480     }
481     }
482     } finally {
483     lock.unlock();
484     }
485     }
486    
487     /**
488 dl 1.36 * Returns an iterator over all the elements (both expired and
489 dl 1.44 * unexpired) in this queue. The iterator does not return the
490 jsr166 1.51 * elements in any particular order.
491     *
492 jsr166 1.65 * <p>The returned iterator is
493     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
494 dholmes 1.7 *
495 jsr166 1.32 * @return an iterator over the elements in this queue
496 dholmes 1.7 */
497 dl 1.3 public Iterator<E> iterator() {
498 dl 1.44 return new Itr(toArray());
499 dl 1.2 }
500    
501 dl 1.42 /**
502     * Snapshot iterator that works off copy of underlying q array.
503     */
504 dl 1.44 private class Itr implements Iterator<E> {
505 dl 1.42 final Object[] array; // Array of all elements
506 jsr166 1.57 int cursor; // index of next element to return
507 jsr166 1.49 int lastRet; // index of last element, or -1 if no such
508 jsr166 1.43
509 dl 1.42 Itr(Object[] array) {
510     lastRet = -1;
511     this.array = array;
512 dl 1.2 }
513    
514 tim 1.6 public boolean hasNext() {
515 dl 1.42 return cursor < array.length;
516 tim 1.6 }
517    
518 jsr166 1.49 @SuppressWarnings("unchecked")
519 tim 1.6 public E next() {
520 dl 1.42 if (cursor >= array.length)
521     throw new NoSuchElementException();
522     lastRet = cursor;
523     return (E)array[cursor++];
524 tim 1.6 }
525    
526     public void remove() {
527 jsr166 1.43 if (lastRet < 0)
528 jsr166 1.49 throw new IllegalStateException();
529 jsr166 1.57 removeEQ(array[lastRet]);
530 dl 1.42 lastRet = -1;
531 tim 1.6 }
532 tim 1.1 }
533    
534     }