ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.74
Committed: Sat Dec 24 19:32:07 2016 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.73: +3 -4 lines
Log Message:
convert to Objects.requireNonNull

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.52 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.68
9 jsr166 1.56 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 jsr166 1.69
11 jsr166 1.68 import java.util.AbstractQueue;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.74 import java.util.Objects;
16 jsr166 1.68 import java.util.PriorityQueue;
17 jsr166 1.69 import java.util.concurrent.locks.Condition;
18     import java.util.concurrent.locks.ReentrantLock;
19 tim 1.1
20     /**
21 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
22 jsr166 1.61 * {@code Delayed} elements, in which an element can only be taken
23 dl 1.27 * when its delay has expired. The <em>head</em> of the queue is that
24 jsr166 1.61 * {@code Delayed} element whose delay expired furthest in the
25     * past. If no delay has expired there is no head and {@code poll}
26     * will return {@code null}. Expiration occurs when an element's
27     * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
28 dl 1.36 * than or equal to zero. Even though unexpired elements cannot be
29 jsr166 1.61 * removed using {@code take} or {@code poll}, they are otherwise
30     * treated as normal elements. For example, the {@code size} method
31 dl 1.36 * returns the count of both expired and unexpired elements.
32 jsr166 1.39 * This queue does not permit null elements.
33 dl 1.25 *
34 dl 1.26 * <p>This class and its iterator implement all of the
35     * <em>optional</em> methods of the {@link Collection} and {@link
36 dl 1.53 * Iterator} interfaces. The Iterator provided in method {@link
37     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
38     * the DelayQueue in any particular order.
39 dl 1.26 *
40 dl 1.25 * <p>This class is a member of the
41 jsr166 1.46 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 dl 1.25 * Java Collections Framework</a>.
43     *
44 dl 1.4 * @since 1.5
45     * @author Doug Lea
46 jsr166 1.67 * @param <E> the type of elements held in this queue
47 dl 1.19 */
48 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
49     implements BlockingQueue<E> {
50 tim 1.6
51 jsr166 1.62 private final transient ReentrantLock lock = new ReentrantLock();
52 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
53 tim 1.1
54 dl 1.4 /**
55 jsr166 1.48 * Thread designated to wait for the element at the head of
56     * the queue. This variant of the Leader-Follower pattern
57     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
58     * minimize unnecessary timed waiting. When a thread becomes
59     * the leader, it waits only for the next delay to elapse, but
60     * other threads await indefinitely. The leader thread must
61     * signal some other thread before returning from take() or
62     * poll(...), unless some other thread becomes leader in the
63     * interim. Whenever the head of the queue is replaced with
64     * an element with an earlier expiration time, the leader
65     * field is invalidated by being reset to null, and some
66     * waiting thread, but not necessarily the current leader, is
67     * signalled. So waiting threads must be prepared to acquire
68     * and lose leadership while waiting.
69     */
70 jsr166 1.66 private Thread leader;
71 jsr166 1.48
72     /**
73     * Condition signalled when a newer element becomes available
74     * at the head of the queue or a new thread may need to
75     * become leader.
76     */
77     private final Condition available = lock.newCondition();
78    
79     /**
80 jsr166 1.61 * Creates a new {@code DelayQueue} that is initially empty.
81 dl 1.4 */
82 tim 1.1 public DelayQueue() {}
83    
84 tim 1.6 /**
85 jsr166 1.61 * Creates a {@code DelayQueue} initially containing the elements of the
86 dholmes 1.7 * given collection of {@link Delayed} instances.
87     *
88 jsr166 1.32 * @param c the collection of elements to initially contain
89     * @throws NullPointerException if the specified collection or any
90     * of its elements are null
91 tim 1.6 */
92     public DelayQueue(Collection<? extends E> c) {
93     this.addAll(c);
94     }
95    
96 dholmes 1.7 /**
97 dl 1.16 * Inserts the specified element into this delay queue.
98 dholmes 1.7 *
99 jsr166 1.30 * @param e the element to add
100 jsr166 1.61 * @return {@code true} (as specified by {@link Collection#add})
101 jsr166 1.32 * @throws NullPointerException if the specified element is null
102     */
103     public boolean add(E e) {
104     return offer(e);
105     }
106    
107     /**
108     * Inserts the specified element into this delay queue.
109     *
110     * @param e the element to add
111 jsr166 1.61 * @return {@code true}
112 jsr166 1.32 * @throws NullPointerException if the specified element is null
113 dholmes 1.7 */
114 jsr166 1.30 public boolean offer(E e) {
115 dl 1.21 final ReentrantLock lock = this.lock;
116 dl 1.2 lock.lock();
117     try {
118 jsr166 1.30 q.offer(e);
119 jsr166 1.48 if (q.peek() == e) {
120 jsr166 1.49 leader = null;
121 jsr166 1.48 available.signal();
122 jsr166 1.49 }
123 dl 1.2 return true;
124 tim 1.13 } finally {
125 dl 1.2 lock.unlock();
126     }
127     }
128    
129 dholmes 1.7 /**
130 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
131 dholmes 1.7 * unbounded this method will never block.
132 jsr166 1.32 *
133 jsr166 1.30 * @param e the element to add
134 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
135 dholmes 1.7 */
136 jsr166 1.30 public void put(E e) {
137     offer(e);
138 dl 1.2 }
139    
140 dholmes 1.7 /**
141 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
142 dholmes 1.7 * unbounded this method will never block.
143 jsr166 1.32 *
144 jsr166 1.30 * @param e the element to add
145 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
146 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
147 jsr166 1.61 * @return {@code true}
148 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
149 dholmes 1.7 */
150 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
151     return offer(e);
152 dl 1.2 }
153    
154 dholmes 1.7 /**
155 jsr166 1.61 * Retrieves and removes the head of this queue, or returns {@code null}
156 jsr166 1.32 * if this queue has no elements with an expired delay.
157 dholmes 1.7 *
158 jsr166 1.61 * @return the head of this queue, or {@code null} if this
159 jsr166 1.32 * queue has no elements with an expired delay
160 dholmes 1.7 */
161 jsr166 1.32 public E poll() {
162     final ReentrantLock lock = this.lock;
163     lock.lock();
164     try {
165     E first = q.peek();
166 jsr166 1.70 return (first == null || first.getDelay(NANOSECONDS) > 0)
167     ? null
168     : q.poll();
169 jsr166 1.32 } finally {
170     lock.unlock();
171     }
172 dl 1.2 }
173    
174 dl 1.27 /**
175 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
176     * until an element with an expired delay is available on this queue.
177     *
178 dl 1.27 * @return the head of this queue
179 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
180 dl 1.27 */
181 dl 1.3 public E take() throws InterruptedException {
182 dl 1.21 final ReentrantLock lock = this.lock;
183 dl 1.2 lock.lockInterruptibly();
184     try {
185     for (;;) {
186 dl 1.3 E first = q.peek();
187 jsr166 1.48 if (first == null)
188 dl 1.4 available.await();
189 jsr166 1.49 else {
190 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
191 jsr166 1.72 if (delay <= 0L)
192 jsr166 1.49 return q.poll();
193 jsr166 1.64 first = null; // don't retain ref while waiting
194     if (leader != null)
195 jsr166 1.49 available.await();
196     else {
197     Thread thisThread = Thread.currentThread();
198     leader = thisThread;
199     try {
200     available.awaitNanos(delay);
201     } finally {
202     if (leader == thisThread)
203     leader = null;
204     }
205 dl 1.2 }
206     }
207     }
208 tim 1.13 } finally {
209 jsr166 1.49 if (leader == null && q.peek() != null)
210     available.signal();
211 dl 1.2 lock.unlock();
212     }
213     }
214    
215 dl 1.27 /**
216 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
217     * until an element with an expired delay is available on this queue,
218     * or the specified wait time expires.
219     *
220 jsr166 1.61 * @return the head of this queue, or {@code null} if the
221 jsr166 1.32 * specified waiting time elapses before an element with
222     * an expired delay becomes available
223     * @throws InterruptedException {@inheritDoc}
224 dl 1.27 */
225     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
226 jsr166 1.41 long nanos = unit.toNanos(timeout);
227 dl 1.21 final ReentrantLock lock = this.lock;
228 dl 1.2 lock.lockInterruptibly();
229     try {
230     for (;;) {
231 dl 1.3 E first = q.peek();
232 dl 1.2 if (first == null) {
233 jsr166 1.72 if (nanos <= 0L)
234 dl 1.2 return null;
235     else
236 dl 1.4 nanos = available.awaitNanos(nanos);
237 tim 1.13 } else {
238 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
239 jsr166 1.72 if (delay <= 0L)
240 jsr166 1.49 return q.poll();
241 jsr166 1.72 if (nanos <= 0L)
242 jsr166 1.49 return null;
243 jsr166 1.64 first = null; // don't retain ref while waiting
244 jsr166 1.49 if (nanos < delay || leader != null)
245     nanos = available.awaitNanos(nanos);
246     else {
247     Thread thisThread = Thread.currentThread();
248     leader = thisThread;
249     try {
250     long timeLeft = available.awaitNanos(delay);
251     nanos -= delay - timeLeft;
252     } finally {
253     if (leader == thisThread)
254     leader = null;
255     }
256     }
257 dl 1.2 }
258     }
259 tim 1.13 } finally {
260 jsr166 1.49 if (leader == null && q.peek() != null)
261     available.signal();
262 dl 1.2 lock.unlock();
263     }
264     }
265    
266 dl 1.27 /**
267 dl 1.36 * Retrieves, but does not remove, the head of this queue, or
268 jsr166 1.61 * returns {@code null} if this queue is empty. Unlike
269     * {@code poll}, if no expired elements are available in the queue,
270 dl 1.36 * this method returns the element that will expire next,
271     * if one exists.
272 dl 1.27 *
273 jsr166 1.61 * @return the head of this queue, or {@code null} if this
274 jsr166 1.63 * queue is empty
275 dl 1.27 */
276 dl 1.3 public E peek() {
277 dl 1.21 final ReentrantLock lock = this.lock;
278 dl 1.2 lock.lock();
279     try {
280     return q.peek();
281 tim 1.13 } finally {
282 dl 1.2 lock.unlock();
283     }
284 tim 1.1 }
285    
286 dl 1.2 public int size() {
287 dl 1.21 final ReentrantLock lock = this.lock;
288 dl 1.2 lock.lock();
289     try {
290     return q.size();
291 tim 1.13 } finally {
292 dl 1.2 lock.unlock();
293     }
294     }
295    
296 jsr166 1.32 /**
297 jsr166 1.59 * Returns first element only if it is expired.
298 jsr166 1.56 * Used only by drainTo. Call only when holding lock.
299     */
300     private E peekExpired() {
301     // assert lock.isHeldByCurrentThread();
302     E first = q.peek();
303     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
304     null : first;
305     }
306    
307     /**
308 jsr166 1.32 * @throws UnsupportedOperationException {@inheritDoc}
309     * @throws ClassCastException {@inheritDoc}
310     * @throws NullPointerException {@inheritDoc}
311     * @throws IllegalArgumentException {@inheritDoc}
312     */
313 dl 1.17 public int drainTo(Collection<? super E> c) {
314 jsr166 1.74 Objects.requireNonNull(c);
315 dl 1.17 if (c == this)
316     throw new IllegalArgumentException();
317 dl 1.21 final ReentrantLock lock = this.lock;
318 dl 1.17 lock.lock();
319     try {
320     int n = 0;
321 jsr166 1.56 for (E e; (e = peekExpired()) != null;) {
322     c.add(e); // In this order, in case add() throws.
323     q.poll();
324 dl 1.17 ++n;
325     }
326     return n;
327     } finally {
328     lock.unlock();
329     }
330     }
331    
332 jsr166 1.32 /**
333     * @throws UnsupportedOperationException {@inheritDoc}
334     * @throws ClassCastException {@inheritDoc}
335     * @throws NullPointerException {@inheritDoc}
336     * @throws IllegalArgumentException {@inheritDoc}
337     */
338 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
339 jsr166 1.74 Objects.requireNonNull(c);
340 dl 1.17 if (c == this)
341     throw new IllegalArgumentException();
342     if (maxElements <= 0)
343     return 0;
344 dl 1.21 final ReentrantLock lock = this.lock;
345 dl 1.17 lock.lock();
346     try {
347     int n = 0;
348 jsr166 1.56 for (E e; n < maxElements && (e = peekExpired()) != null;) {
349     c.add(e); // In this order, in case add() throws.
350     q.poll();
351 dl 1.17 ++n;
352     }
353     return n;
354     } finally {
355     lock.unlock();
356     }
357     }
358    
359 dholmes 1.7 /**
360     * Atomically removes all of the elements from this delay queue.
361     * The queue will be empty after this call returns.
362 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
363     * simply discarded from the queue.
364 dholmes 1.7 */
365 dl 1.2 public void clear() {
366 dl 1.21 final ReentrantLock lock = this.lock;
367 dl 1.2 lock.lock();
368     try {
369     q.clear();
370 tim 1.13 } finally {
371 dl 1.2 lock.unlock();
372     }
373     }
374 tim 1.1
375 dholmes 1.7 /**
376 jsr166 1.61 * Always returns {@code Integer.MAX_VALUE} because
377     * a {@code DelayQueue} is not capacity constrained.
378 jsr166 1.32 *
379 jsr166 1.61 * @return {@code Integer.MAX_VALUE}
380 dholmes 1.7 */
381 dl 1.2 public int remainingCapacity() {
382     return Integer.MAX_VALUE;
383 tim 1.1 }
384 dl 1.2
385 jsr166 1.32 /**
386     * Returns an array containing all of the elements in this queue.
387     * The returned array elements are in no particular order.
388     *
389     * <p>The returned array will be "safe" in that no references to it are
390     * maintained by this queue. (In other words, this method must allocate
391     * a new array). The caller is thus free to modify the returned array.
392 jsr166 1.33 *
393 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
394     * APIs.
395     *
396     * @return an array containing all of the elements in this queue
397     */
398 dl 1.2 public Object[] toArray() {
399 dl 1.21 final ReentrantLock lock = this.lock;
400 dl 1.2 lock.lock();
401     try {
402     return q.toArray();
403 tim 1.13 } finally {
404 dl 1.2 lock.unlock();
405     }
406 tim 1.1 }
407 dl 1.2
408 jsr166 1.32 /**
409     * Returns an array containing all of the elements in this queue; the
410     * runtime type of the returned array is that of the specified array.
411     * The returned array elements are in no particular order.
412     * If the queue fits in the specified array, it is returned therein.
413     * Otherwise, a new array is allocated with the runtime type of the
414     * specified array and the size of this queue.
415     *
416     * <p>If this queue fits in the specified array with room to spare
417     * (i.e., the array has more elements than this queue), the element in
418     * the array immediately following the end of the queue is set to
419 jsr166 1.61 * {@code null}.
420 jsr166 1.32 *
421     * <p>Like the {@link #toArray()} method, this method acts as bridge between
422     * array-based and collection-based APIs. Further, this method allows
423     * precise control over the runtime type of the output array, and may,
424     * under certain circumstances, be used to save allocation costs.
425     *
426 jsr166 1.37 * <p>The following code can be used to dump a delay queue into a newly
427 jsr166 1.61 * allocated array of {@code Delayed}:
428 jsr166 1.32 *
429 jsr166 1.55 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
430 jsr166 1.32 *
431 jsr166 1.61 * Note that {@code toArray(new Object[0])} is identical in function to
432     * {@code toArray()}.
433 jsr166 1.32 *
434     * @param a the array into which the elements of the queue are to
435     * be stored, if it is big enough; otherwise, a new array of the
436     * same runtime type is allocated for this purpose
437     * @return an array containing all of the elements in this queue
438     * @throws ArrayStoreException if the runtime type of the specified array
439     * is not a supertype of the runtime type of every element in
440     * this queue
441     * @throws NullPointerException if the specified array is null
442     */
443     public <T> T[] toArray(T[] a) {
444 dl 1.21 final ReentrantLock lock = this.lock;
445 dl 1.2 lock.lock();
446     try {
447 jsr166 1.32 return q.toArray(a);
448 tim 1.13 } finally {
449 dl 1.2 lock.unlock();
450     }
451 tim 1.1 }
452    
453 dl 1.26 /**
454     * Removes a single instance of the specified element from this
455 dl 1.36 * queue, if it is present, whether or not it has expired.
456 dl 1.26 */
457 dholmes 1.7 public boolean remove(Object o) {
458 dl 1.21 final ReentrantLock lock = this.lock;
459 dl 1.2 lock.lock();
460     try {
461 dholmes 1.7 return q.remove(o);
462 tim 1.13 } finally {
463 dl 1.2 lock.unlock();
464     }
465     }
466    
467 dholmes 1.7 /**
468 jsr166 1.71 * Identity-based version for use in Itr.remove.
469 jsr166 1.57 */
470     void removeEQ(Object o) {
471     final ReentrantLock lock = this.lock;
472     lock.lock();
473     try {
474     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
475     if (o == it.next()) {
476     it.remove();
477     break;
478     }
479     }
480     } finally {
481     lock.unlock();
482     }
483     }
484    
485     /**
486 dl 1.36 * Returns an iterator over all the elements (both expired and
487 dl 1.44 * unexpired) in this queue. The iterator does not return the
488 jsr166 1.51 * elements in any particular order.
489     *
490 jsr166 1.65 * <p>The returned iterator is
491     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
492 dholmes 1.7 *
493 jsr166 1.32 * @return an iterator over the elements in this queue
494 dholmes 1.7 */
495 dl 1.3 public Iterator<E> iterator() {
496 dl 1.44 return new Itr(toArray());
497 dl 1.2 }
498    
499 dl 1.42 /**
500     * Snapshot iterator that works off copy of underlying q array.
501     */
502 dl 1.44 private class Itr implements Iterator<E> {
503 dl 1.42 final Object[] array; // Array of all elements
504 jsr166 1.57 int cursor; // index of next element to return
505 jsr166 1.49 int lastRet; // index of last element, or -1 if no such
506 jsr166 1.43
507 dl 1.42 Itr(Object[] array) {
508     lastRet = -1;
509     this.array = array;
510 dl 1.2 }
511    
512 tim 1.6 public boolean hasNext() {
513 dl 1.42 return cursor < array.length;
514 tim 1.6 }
515    
516 jsr166 1.49 @SuppressWarnings("unchecked")
517 tim 1.6 public E next() {
518 dl 1.42 if (cursor >= array.length)
519     throw new NoSuchElementException();
520 jsr166 1.73 return (E)array[lastRet = cursor++];
521 tim 1.6 }
522    
523     public void remove() {
524 jsr166 1.43 if (lastRet < 0)
525 jsr166 1.49 throw new IllegalStateException();
526 jsr166 1.57 removeEQ(array[lastRet]);
527 dl 1.42 lastRet = -1;
528 tim 1.6 }
529 tim 1.1 }
530    
531     }