ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.77
Committed: Sat May 6 06:49:46 2017 UTC (7 years ago) by jsr166
Branch: MAIN
Changes since 1.76: +1 -1 lines
Log Message:
8177789: fix collections framework links to point to java.util package doc

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.52 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.68
9 jsr166 1.56 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 jsr166 1.69
11 jsr166 1.68 import java.util.AbstractQueue;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.74 import java.util.Objects;
16 jsr166 1.68 import java.util.PriorityQueue;
17 jsr166 1.69 import java.util.concurrent.locks.Condition;
18     import java.util.concurrent.locks.ReentrantLock;
19 tim 1.1
20     /**
21 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
22 jsr166 1.61 * {@code Delayed} elements, in which an element can only be taken
23 dl 1.27 * when its delay has expired. The <em>head</em> of the queue is that
24 jsr166 1.61 * {@code Delayed} element whose delay expired furthest in the
25     * past. If no delay has expired there is no head and {@code poll}
26     * will return {@code null}. Expiration occurs when an element's
27     * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
28 dl 1.36 * than or equal to zero. Even though unexpired elements cannot be
29 jsr166 1.61 * removed using {@code take} or {@code poll}, they are otherwise
30     * treated as normal elements. For example, the {@code size} method
31 dl 1.36 * returns the count of both expired and unexpired elements.
32 jsr166 1.39 * This queue does not permit null elements.
33 dl 1.25 *
34 jsr166 1.75 * <p>This class and its iterator implement all of the <em>optional</em>
35     * methods of the {@link Collection} and {@link Iterator} interfaces.
36     * The Iterator provided in method {@link #iterator()} is <em>not</em>
37     * guaranteed to traverse the elements of the DelayQueue in any
38     * particular order.
39 dl 1.26 *
40 dl 1.25 * <p>This class is a member of the
41 jsr166 1.77 * <a href="{@docRoot}/java/util/package-summary.html#CollectionsFramework">
42 dl 1.25 * Java Collections Framework</a>.
43     *
44 dl 1.4 * @since 1.5
45     * @author Doug Lea
46 jsr166 1.67 * @param <E> the type of elements held in this queue
47 dl 1.19 */
48 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
49     implements BlockingQueue<E> {
50 tim 1.6
51 jsr166 1.62 private final transient ReentrantLock lock = new ReentrantLock();
52 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
53 tim 1.1
54 dl 1.4 /**
55 jsr166 1.48 * Thread designated to wait for the element at the head of
56     * the queue. This variant of the Leader-Follower pattern
57     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
58     * minimize unnecessary timed waiting. When a thread becomes
59     * the leader, it waits only for the next delay to elapse, but
60     * other threads await indefinitely. The leader thread must
61     * signal some other thread before returning from take() or
62     * poll(...), unless some other thread becomes leader in the
63     * interim. Whenever the head of the queue is replaced with
64     * an element with an earlier expiration time, the leader
65     * field is invalidated by being reset to null, and some
66     * waiting thread, but not necessarily the current leader, is
67     * signalled. So waiting threads must be prepared to acquire
68     * and lose leadership while waiting.
69     */
70 jsr166 1.66 private Thread leader;
71 jsr166 1.48
72     /**
73     * Condition signalled when a newer element becomes available
74     * at the head of the queue or a new thread may need to
75     * become leader.
76     */
77     private final Condition available = lock.newCondition();
78    
79     /**
80 jsr166 1.61 * Creates a new {@code DelayQueue} that is initially empty.
81 dl 1.4 */
82 tim 1.1 public DelayQueue() {}
83    
84 tim 1.6 /**
85 jsr166 1.61 * Creates a {@code DelayQueue} initially containing the elements of the
86 dholmes 1.7 * given collection of {@link Delayed} instances.
87     *
88 jsr166 1.32 * @param c the collection of elements to initially contain
89     * @throws NullPointerException if the specified collection or any
90     * of its elements are null
91 tim 1.6 */
92     public DelayQueue(Collection<? extends E> c) {
93     this.addAll(c);
94     }
95    
96 dholmes 1.7 /**
97 dl 1.16 * Inserts the specified element into this delay queue.
98 dholmes 1.7 *
99 jsr166 1.30 * @param e the element to add
100 jsr166 1.61 * @return {@code true} (as specified by {@link Collection#add})
101 jsr166 1.32 * @throws NullPointerException if the specified element is null
102     */
103     public boolean add(E e) {
104     return offer(e);
105     }
106    
107     /**
108     * Inserts the specified element into this delay queue.
109     *
110     * @param e the element to add
111 jsr166 1.61 * @return {@code true}
112 jsr166 1.32 * @throws NullPointerException if the specified element is null
113 dholmes 1.7 */
114 jsr166 1.30 public boolean offer(E e) {
115 dl 1.21 final ReentrantLock lock = this.lock;
116 dl 1.2 lock.lock();
117     try {
118 jsr166 1.30 q.offer(e);
119 jsr166 1.48 if (q.peek() == e) {
120 jsr166 1.49 leader = null;
121 jsr166 1.48 available.signal();
122 jsr166 1.49 }
123 dl 1.2 return true;
124 tim 1.13 } finally {
125 dl 1.2 lock.unlock();
126     }
127     }
128    
129 dholmes 1.7 /**
130 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
131 dholmes 1.7 * unbounded this method will never block.
132 jsr166 1.32 *
133 jsr166 1.30 * @param e the element to add
134 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
135 dholmes 1.7 */
136 jsr166 1.30 public void put(E e) {
137     offer(e);
138 dl 1.2 }
139    
140 dholmes 1.7 /**
141 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
142 dholmes 1.7 * unbounded this method will never block.
143 jsr166 1.32 *
144 jsr166 1.30 * @param e the element to add
145 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
146 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
147 jsr166 1.61 * @return {@code true}
148 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
149 dholmes 1.7 */
150 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
151     return offer(e);
152 dl 1.2 }
153    
154 dholmes 1.7 /**
155 jsr166 1.61 * Retrieves and removes the head of this queue, or returns {@code null}
156 jsr166 1.32 * if this queue has no elements with an expired delay.
157 dholmes 1.7 *
158 jsr166 1.61 * @return the head of this queue, or {@code null} if this
159 jsr166 1.32 * queue has no elements with an expired delay
160 dholmes 1.7 */
161 jsr166 1.32 public E poll() {
162     final ReentrantLock lock = this.lock;
163     lock.lock();
164     try {
165     E first = q.peek();
166 jsr166 1.70 return (first == null || first.getDelay(NANOSECONDS) > 0)
167     ? null
168     : q.poll();
169 jsr166 1.32 } finally {
170     lock.unlock();
171     }
172 dl 1.2 }
173    
174 dl 1.27 /**
175 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
176     * until an element with an expired delay is available on this queue.
177     *
178 dl 1.27 * @return the head of this queue
179 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
180 dl 1.27 */
181 dl 1.3 public E take() throws InterruptedException {
182 dl 1.21 final ReentrantLock lock = this.lock;
183 dl 1.2 lock.lockInterruptibly();
184     try {
185     for (;;) {
186 dl 1.3 E first = q.peek();
187 jsr166 1.48 if (first == null)
188 dl 1.4 available.await();
189 jsr166 1.49 else {
190 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
191 jsr166 1.72 if (delay <= 0L)
192 jsr166 1.49 return q.poll();
193 jsr166 1.64 first = null; // don't retain ref while waiting
194     if (leader != null)
195 jsr166 1.49 available.await();
196     else {
197     Thread thisThread = Thread.currentThread();
198     leader = thisThread;
199     try {
200     available.awaitNanos(delay);
201     } finally {
202     if (leader == thisThread)
203     leader = null;
204     }
205 dl 1.2 }
206     }
207     }
208 tim 1.13 } finally {
209 jsr166 1.49 if (leader == null && q.peek() != null)
210     available.signal();
211 dl 1.2 lock.unlock();
212     }
213     }
214    
215 dl 1.27 /**
216 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
217     * until an element with an expired delay is available on this queue,
218     * or the specified wait time expires.
219     *
220 jsr166 1.61 * @return the head of this queue, or {@code null} if the
221 jsr166 1.32 * specified waiting time elapses before an element with
222     * an expired delay becomes available
223     * @throws InterruptedException {@inheritDoc}
224 dl 1.27 */
225     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
226 jsr166 1.41 long nanos = unit.toNanos(timeout);
227 dl 1.21 final ReentrantLock lock = this.lock;
228 dl 1.2 lock.lockInterruptibly();
229     try {
230     for (;;) {
231 dl 1.3 E first = q.peek();
232 dl 1.2 if (first == null) {
233 jsr166 1.72 if (nanos <= 0L)
234 dl 1.2 return null;
235     else
236 dl 1.4 nanos = available.awaitNanos(nanos);
237 tim 1.13 } else {
238 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
239 jsr166 1.72 if (delay <= 0L)
240 jsr166 1.49 return q.poll();
241 jsr166 1.72 if (nanos <= 0L)
242 jsr166 1.49 return null;
243 jsr166 1.64 first = null; // don't retain ref while waiting
244 jsr166 1.49 if (nanos < delay || leader != null)
245     nanos = available.awaitNanos(nanos);
246     else {
247     Thread thisThread = Thread.currentThread();
248     leader = thisThread;
249     try {
250     long timeLeft = available.awaitNanos(delay);
251     nanos -= delay - timeLeft;
252     } finally {
253     if (leader == thisThread)
254     leader = null;
255     }
256     }
257 dl 1.2 }
258     }
259 tim 1.13 } finally {
260 jsr166 1.49 if (leader == null && q.peek() != null)
261     available.signal();
262 dl 1.2 lock.unlock();
263     }
264     }
265    
266 dl 1.27 /**
267 dl 1.36 * Retrieves, but does not remove, the head of this queue, or
268 jsr166 1.61 * returns {@code null} if this queue is empty. Unlike
269     * {@code poll}, if no expired elements are available in the queue,
270 dl 1.36 * this method returns the element that will expire next,
271     * if one exists.
272 dl 1.27 *
273 jsr166 1.61 * @return the head of this queue, or {@code null} if this
274 jsr166 1.63 * queue is empty
275 dl 1.27 */
276 dl 1.3 public E peek() {
277 dl 1.21 final ReentrantLock lock = this.lock;
278 dl 1.2 lock.lock();
279     try {
280     return q.peek();
281 tim 1.13 } finally {
282 dl 1.2 lock.unlock();
283     }
284 tim 1.1 }
285    
286 dl 1.2 public int size() {
287 dl 1.21 final ReentrantLock lock = this.lock;
288 dl 1.2 lock.lock();
289     try {
290     return q.size();
291 tim 1.13 } finally {
292 dl 1.2 lock.unlock();
293     }
294     }
295    
296 jsr166 1.32 /**
297     * @throws UnsupportedOperationException {@inheritDoc}
298     * @throws ClassCastException {@inheritDoc}
299     * @throws NullPointerException {@inheritDoc}
300     * @throws IllegalArgumentException {@inheritDoc}
301     */
302 dl 1.17 public int drainTo(Collection<? super E> c) {
303 jsr166 1.76 return drainTo(c, Integer.MAX_VALUE);
304 dl 1.17 }
305    
306 jsr166 1.32 /**
307     * @throws UnsupportedOperationException {@inheritDoc}
308     * @throws ClassCastException {@inheritDoc}
309     * @throws NullPointerException {@inheritDoc}
310     * @throws IllegalArgumentException {@inheritDoc}
311     */
312 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
313 jsr166 1.74 Objects.requireNonNull(c);
314 dl 1.17 if (c == this)
315     throw new IllegalArgumentException();
316     if (maxElements <= 0)
317     return 0;
318 dl 1.21 final ReentrantLock lock = this.lock;
319 dl 1.17 lock.lock();
320     try {
321     int n = 0;
322 jsr166 1.76 for (E first;
323     n < maxElements
324     && (first = q.peek()) != null
325     && first.getDelay(NANOSECONDS) <= 0;) {
326     c.add(first); // In this order, in case add() throws.
327 jsr166 1.56 q.poll();
328 dl 1.17 ++n;
329     }
330     return n;
331     } finally {
332     lock.unlock();
333     }
334     }
335    
336 dholmes 1.7 /**
337     * Atomically removes all of the elements from this delay queue.
338     * The queue will be empty after this call returns.
339 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
340     * simply discarded from the queue.
341 dholmes 1.7 */
342 dl 1.2 public void clear() {
343 dl 1.21 final ReentrantLock lock = this.lock;
344 dl 1.2 lock.lock();
345     try {
346     q.clear();
347 tim 1.13 } finally {
348 dl 1.2 lock.unlock();
349     }
350     }
351 tim 1.1
352 dholmes 1.7 /**
353 jsr166 1.61 * Always returns {@code Integer.MAX_VALUE} because
354     * a {@code DelayQueue} is not capacity constrained.
355 jsr166 1.32 *
356 jsr166 1.61 * @return {@code Integer.MAX_VALUE}
357 dholmes 1.7 */
358 dl 1.2 public int remainingCapacity() {
359     return Integer.MAX_VALUE;
360 tim 1.1 }
361 dl 1.2
362 jsr166 1.32 /**
363     * Returns an array containing all of the elements in this queue.
364     * The returned array elements are in no particular order.
365     *
366     * <p>The returned array will be "safe" in that no references to it are
367     * maintained by this queue. (In other words, this method must allocate
368     * a new array). The caller is thus free to modify the returned array.
369 jsr166 1.33 *
370 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
371     * APIs.
372     *
373     * @return an array containing all of the elements in this queue
374     */
375 dl 1.2 public Object[] toArray() {
376 dl 1.21 final ReentrantLock lock = this.lock;
377 dl 1.2 lock.lock();
378     try {
379     return q.toArray();
380 tim 1.13 } finally {
381 dl 1.2 lock.unlock();
382     }
383 tim 1.1 }
384 dl 1.2
385 jsr166 1.32 /**
386     * Returns an array containing all of the elements in this queue; the
387     * runtime type of the returned array is that of the specified array.
388     * The returned array elements are in no particular order.
389     * If the queue fits in the specified array, it is returned therein.
390     * Otherwise, a new array is allocated with the runtime type of the
391     * specified array and the size of this queue.
392     *
393     * <p>If this queue fits in the specified array with room to spare
394     * (i.e., the array has more elements than this queue), the element in
395     * the array immediately following the end of the queue is set to
396 jsr166 1.61 * {@code null}.
397 jsr166 1.32 *
398     * <p>Like the {@link #toArray()} method, this method acts as bridge between
399     * array-based and collection-based APIs. Further, this method allows
400     * precise control over the runtime type of the output array, and may,
401     * under certain circumstances, be used to save allocation costs.
402     *
403 jsr166 1.37 * <p>The following code can be used to dump a delay queue into a newly
404 jsr166 1.61 * allocated array of {@code Delayed}:
405 jsr166 1.32 *
406 jsr166 1.55 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
407 jsr166 1.32 *
408 jsr166 1.61 * Note that {@code toArray(new Object[0])} is identical in function to
409     * {@code toArray()}.
410 jsr166 1.32 *
411     * @param a the array into which the elements of the queue are to
412     * be stored, if it is big enough; otherwise, a new array of the
413     * same runtime type is allocated for this purpose
414     * @return an array containing all of the elements in this queue
415     * @throws ArrayStoreException if the runtime type of the specified array
416     * is not a supertype of the runtime type of every element in
417     * this queue
418     * @throws NullPointerException if the specified array is null
419     */
420     public <T> T[] toArray(T[] a) {
421 dl 1.21 final ReentrantLock lock = this.lock;
422 dl 1.2 lock.lock();
423     try {
424 jsr166 1.32 return q.toArray(a);
425 tim 1.13 } finally {
426 dl 1.2 lock.unlock();
427     }
428 tim 1.1 }
429    
430 dl 1.26 /**
431     * Removes a single instance of the specified element from this
432 dl 1.36 * queue, if it is present, whether or not it has expired.
433 dl 1.26 */
434 dholmes 1.7 public boolean remove(Object o) {
435 dl 1.21 final ReentrantLock lock = this.lock;
436 dl 1.2 lock.lock();
437     try {
438 dholmes 1.7 return q.remove(o);
439 tim 1.13 } finally {
440 dl 1.2 lock.unlock();
441     }
442     }
443    
444 dholmes 1.7 /**
445 jsr166 1.71 * Identity-based version for use in Itr.remove.
446 jsr166 1.57 */
447     void removeEQ(Object o) {
448     final ReentrantLock lock = this.lock;
449     lock.lock();
450     try {
451     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
452     if (o == it.next()) {
453     it.remove();
454     break;
455     }
456     }
457     } finally {
458     lock.unlock();
459     }
460     }
461    
462     /**
463 dl 1.36 * Returns an iterator over all the elements (both expired and
464 dl 1.44 * unexpired) in this queue. The iterator does not return the
465 jsr166 1.51 * elements in any particular order.
466     *
467 jsr166 1.65 * <p>The returned iterator is
468     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
469 dholmes 1.7 *
470 jsr166 1.32 * @return an iterator over the elements in this queue
471 dholmes 1.7 */
472 dl 1.3 public Iterator<E> iterator() {
473 dl 1.44 return new Itr(toArray());
474 dl 1.2 }
475    
476 dl 1.42 /**
477     * Snapshot iterator that works off copy of underlying q array.
478     */
479 dl 1.44 private class Itr implements Iterator<E> {
480 dl 1.42 final Object[] array; // Array of all elements
481 jsr166 1.57 int cursor; // index of next element to return
482 jsr166 1.49 int lastRet; // index of last element, or -1 if no such
483 jsr166 1.43
484 dl 1.42 Itr(Object[] array) {
485     lastRet = -1;
486     this.array = array;
487 dl 1.2 }
488    
489 tim 1.6 public boolean hasNext() {
490 dl 1.42 return cursor < array.length;
491 tim 1.6 }
492    
493 jsr166 1.49 @SuppressWarnings("unchecked")
494 tim 1.6 public E next() {
495 dl 1.42 if (cursor >= array.length)
496     throw new NoSuchElementException();
497 jsr166 1.73 return (E)array[lastRet = cursor++];
498 tim 1.6 }
499    
500     public void remove() {
501 jsr166 1.43 if (lastRet < 0)
502 jsr166 1.49 throw new IllegalStateException();
503 jsr166 1.57 removeEQ(array[lastRet]);
504 dl 1.42 lastRet = -1;
505 tim 1.6 }
506 tim 1.1 }
507    
508     }