ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.70
Committed: Sat Feb 28 22:57:37 2015 UTC (9 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.69: +3 -4 lines
Log Message:
bytecode golf

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.52 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.68
9 jsr166 1.56 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 jsr166 1.69
11 jsr166 1.68 import java.util.AbstractQueue;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15     import java.util.PriorityQueue;
16 jsr166 1.69 import java.util.concurrent.locks.Condition;
17     import java.util.concurrent.locks.ReentrantLock;
18 tim 1.1
19     /**
20 dl 1.27 * An unbounded {@linkplain BlockingQueue blocking queue} of
21 jsr166 1.61 * {@code Delayed} elements, in which an element can only be taken
22 dl 1.27 * when its delay has expired. The <em>head</em> of the queue is that
23 jsr166 1.61 * {@code Delayed} element whose delay expired furthest in the
24     * past. If no delay has expired there is no head and {@code poll}
25     * will return {@code null}. Expiration occurs when an element's
26     * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
27 dl 1.36 * than or equal to zero. Even though unexpired elements cannot be
28 jsr166 1.61 * removed using {@code take} or {@code poll}, they are otherwise
29     * treated as normal elements. For example, the {@code size} method
30 dl 1.36 * returns the count of both expired and unexpired elements.
31 jsr166 1.39 * This queue does not permit null elements.
32 dl 1.25 *
33 dl 1.26 * <p>This class and its iterator implement all of the
34     * <em>optional</em> methods of the {@link Collection} and {@link
35 dl 1.53 * Iterator} interfaces. The Iterator provided in method {@link
36     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
37     * the DelayQueue in any particular order.
38 dl 1.26 *
39 dl 1.25 * <p>This class is a member of the
40 jsr166 1.46 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
41 dl 1.25 * Java Collections Framework</a>.
42     *
43 dl 1.4 * @since 1.5
44     * @author Doug Lea
45 jsr166 1.67 * @param <E> the type of elements held in this queue
46 dl 1.19 */
47 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
48     implements BlockingQueue<E> {
49 tim 1.6
50 jsr166 1.62 private final transient ReentrantLock lock = new ReentrantLock();
51 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
52 tim 1.1
53 dl 1.4 /**
54 jsr166 1.48 * Thread designated to wait for the element at the head of
55     * the queue. This variant of the Leader-Follower pattern
56     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
57     * minimize unnecessary timed waiting. When a thread becomes
58     * the leader, it waits only for the next delay to elapse, but
59     * other threads await indefinitely. The leader thread must
60     * signal some other thread before returning from take() or
61     * poll(...), unless some other thread becomes leader in the
62     * interim. Whenever the head of the queue is replaced with
63     * an element with an earlier expiration time, the leader
64     * field is invalidated by being reset to null, and some
65     * waiting thread, but not necessarily the current leader, is
66     * signalled. So waiting threads must be prepared to acquire
67     * and lose leadership while waiting.
68     */
69 jsr166 1.66 private Thread leader;
70 jsr166 1.48
71     /**
72     * Condition signalled when a newer element becomes available
73     * at the head of the queue or a new thread may need to
74     * become leader.
75     */
76     private final Condition available = lock.newCondition();
77    
78     /**
79 jsr166 1.61 * Creates a new {@code DelayQueue} that is initially empty.
80 dl 1.4 */
81 tim 1.1 public DelayQueue() {}
82    
83 tim 1.6 /**
84 jsr166 1.61 * Creates a {@code DelayQueue} initially containing the elements of the
85 dholmes 1.7 * given collection of {@link Delayed} instances.
86     *
87 jsr166 1.32 * @param c the collection of elements to initially contain
88     * @throws NullPointerException if the specified collection or any
89     * of its elements are null
90 tim 1.6 */
91     public DelayQueue(Collection<? extends E> c) {
92     this.addAll(c);
93     }
94    
95 dholmes 1.7 /**
96 dl 1.16 * Inserts the specified element into this delay queue.
97 dholmes 1.7 *
98 jsr166 1.30 * @param e the element to add
99 jsr166 1.61 * @return {@code true} (as specified by {@link Collection#add})
100 jsr166 1.32 * @throws NullPointerException if the specified element is null
101     */
102     public boolean add(E e) {
103     return offer(e);
104     }
105    
106     /**
107     * Inserts the specified element into this delay queue.
108     *
109     * @param e the element to add
110 jsr166 1.61 * @return {@code true}
111 jsr166 1.32 * @throws NullPointerException if the specified element is null
112 dholmes 1.7 */
113 jsr166 1.30 public boolean offer(E e) {
114 dl 1.21 final ReentrantLock lock = this.lock;
115 dl 1.2 lock.lock();
116     try {
117 jsr166 1.30 q.offer(e);
118 jsr166 1.48 if (q.peek() == e) {
119 jsr166 1.49 leader = null;
120 jsr166 1.48 available.signal();
121 jsr166 1.49 }
122 dl 1.2 return true;
123 tim 1.13 } finally {
124 dl 1.2 lock.unlock();
125     }
126     }
127    
128 dholmes 1.7 /**
129 jsr166 1.32 * Inserts the specified element into this delay queue. As the queue is
130 dholmes 1.7 * unbounded this method will never block.
131 jsr166 1.32 *
132 jsr166 1.30 * @param e the element to add
133 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
134 dholmes 1.7 */
135 jsr166 1.30 public void put(E e) {
136     offer(e);
137 dl 1.2 }
138    
139 dholmes 1.7 /**
140 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
141 dholmes 1.7 * unbounded this method will never block.
142 jsr166 1.32 *
143 jsr166 1.30 * @param e the element to add
144 dl 1.14 * @param timeout This parameter is ignored as the method never blocks
145 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
146 jsr166 1.61 * @return {@code true}
147 jsr166 1.32 * @throws NullPointerException {@inheritDoc}
148 dholmes 1.7 */
149 jsr166 1.30 public boolean offer(E e, long timeout, TimeUnit unit) {
150     return offer(e);
151 dl 1.2 }
152    
153 dholmes 1.7 /**
154 jsr166 1.61 * Retrieves and removes the head of this queue, or returns {@code null}
155 jsr166 1.32 * if this queue has no elements with an expired delay.
156 dholmes 1.7 *
157 jsr166 1.61 * @return the head of this queue, or {@code null} if this
158 jsr166 1.32 * queue has no elements with an expired delay
159 dholmes 1.7 */
160 jsr166 1.32 public E poll() {
161     final ReentrantLock lock = this.lock;
162     lock.lock();
163     try {
164     E first = q.peek();
165 jsr166 1.70 return (first == null || first.getDelay(NANOSECONDS) > 0)
166     ? null
167     : q.poll();
168 jsr166 1.32 } finally {
169     lock.unlock();
170     }
171 dl 1.2 }
172    
173 dl 1.27 /**
174 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
175     * until an element with an expired delay is available on this queue.
176     *
177 dl 1.27 * @return the head of this queue
178 jsr166 1.32 * @throws InterruptedException {@inheritDoc}
179 dl 1.27 */
180 dl 1.3 public E take() throws InterruptedException {
181 dl 1.21 final ReentrantLock lock = this.lock;
182 dl 1.2 lock.lockInterruptibly();
183     try {
184     for (;;) {
185 dl 1.3 E first = q.peek();
186 jsr166 1.48 if (first == null)
187 dl 1.4 available.await();
188 jsr166 1.49 else {
189 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
190 jsr166 1.49 if (delay <= 0)
191     return q.poll();
192 jsr166 1.64 first = null; // don't retain ref while waiting
193     if (leader != null)
194 jsr166 1.49 available.await();
195     else {
196     Thread thisThread = Thread.currentThread();
197     leader = thisThread;
198     try {
199     available.awaitNanos(delay);
200     } finally {
201     if (leader == thisThread)
202     leader = null;
203     }
204 dl 1.2 }
205     }
206     }
207 tim 1.13 } finally {
208 jsr166 1.49 if (leader == null && q.peek() != null)
209     available.signal();
210 dl 1.2 lock.unlock();
211     }
212     }
213    
214 dl 1.27 /**
215 jsr166 1.32 * Retrieves and removes the head of this queue, waiting if necessary
216     * until an element with an expired delay is available on this queue,
217     * or the specified wait time expires.
218     *
219 jsr166 1.61 * @return the head of this queue, or {@code null} if the
220 jsr166 1.32 * specified waiting time elapses before an element with
221     * an expired delay becomes available
222     * @throws InterruptedException {@inheritDoc}
223 dl 1.27 */
224     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
225 jsr166 1.41 long nanos = unit.toNanos(timeout);
226 dl 1.21 final ReentrantLock lock = this.lock;
227 dl 1.2 lock.lockInterruptibly();
228     try {
229     for (;;) {
230 dl 1.3 E first = q.peek();
231 dl 1.2 if (first == null) {
232     if (nanos <= 0)
233     return null;
234     else
235 dl 1.4 nanos = available.awaitNanos(nanos);
236 tim 1.13 } else {
237 jsr166 1.56 long delay = first.getDelay(NANOSECONDS);
238 jsr166 1.49 if (delay <= 0)
239     return q.poll();
240     if (nanos <= 0)
241     return null;
242 jsr166 1.64 first = null; // don't retain ref while waiting
243 jsr166 1.49 if (nanos < delay || leader != null)
244     nanos = available.awaitNanos(nanos);
245     else {
246     Thread thisThread = Thread.currentThread();
247     leader = thisThread;
248     try {
249     long timeLeft = available.awaitNanos(delay);
250     nanos -= delay - timeLeft;
251     } finally {
252     if (leader == thisThread)
253     leader = null;
254     }
255     }
256 dl 1.2 }
257     }
258 tim 1.13 } finally {
259 jsr166 1.49 if (leader == null && q.peek() != null)
260     available.signal();
261 dl 1.2 lock.unlock();
262     }
263     }
264    
265 dl 1.27 /**
266 dl 1.36 * Retrieves, but does not remove, the head of this queue, or
267 jsr166 1.61 * returns {@code null} if this queue is empty. Unlike
268     * {@code poll}, if no expired elements are available in the queue,
269 dl 1.36 * this method returns the element that will expire next,
270     * if one exists.
271 dl 1.27 *
272 jsr166 1.61 * @return the head of this queue, or {@code null} if this
273 jsr166 1.63 * queue is empty
274 dl 1.27 */
275 dl 1.3 public E peek() {
276 dl 1.21 final ReentrantLock lock = this.lock;
277 dl 1.2 lock.lock();
278     try {
279     return q.peek();
280 tim 1.13 } finally {
281 dl 1.2 lock.unlock();
282     }
283 tim 1.1 }
284    
285 dl 1.2 public int size() {
286 dl 1.21 final ReentrantLock lock = this.lock;
287 dl 1.2 lock.lock();
288     try {
289     return q.size();
290 tim 1.13 } finally {
291 dl 1.2 lock.unlock();
292     }
293     }
294    
295 jsr166 1.32 /**
296 jsr166 1.59 * Returns first element only if it is expired.
297 jsr166 1.56 * Used only by drainTo. Call only when holding lock.
298     */
299     private E peekExpired() {
300     // assert lock.isHeldByCurrentThread();
301     E first = q.peek();
302     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
303     null : first;
304     }
305    
306     /**
307 jsr166 1.32 * @throws UnsupportedOperationException {@inheritDoc}
308     * @throws ClassCastException {@inheritDoc}
309     * @throws NullPointerException {@inheritDoc}
310     * @throws IllegalArgumentException {@inheritDoc}
311     */
312 dl 1.17 public int drainTo(Collection<? super E> c) {
313     if (c == null)
314     throw new NullPointerException();
315     if (c == this)
316     throw new IllegalArgumentException();
317 dl 1.21 final ReentrantLock lock = this.lock;
318 dl 1.17 lock.lock();
319     try {
320     int n = 0;
321 jsr166 1.56 for (E e; (e = peekExpired()) != null;) {
322     c.add(e); // In this order, in case add() throws.
323     q.poll();
324 dl 1.17 ++n;
325     }
326     return n;
327     } finally {
328     lock.unlock();
329     }
330     }
331    
332 jsr166 1.32 /**
333     * @throws UnsupportedOperationException {@inheritDoc}
334     * @throws ClassCastException {@inheritDoc}
335     * @throws NullPointerException {@inheritDoc}
336     * @throws IllegalArgumentException {@inheritDoc}
337     */
338 dl 1.17 public int drainTo(Collection<? super E> c, int maxElements) {
339     if (c == null)
340     throw new NullPointerException();
341     if (c == this)
342     throw new IllegalArgumentException();
343     if (maxElements <= 0)
344     return 0;
345 dl 1.21 final ReentrantLock lock = this.lock;
346 dl 1.17 lock.lock();
347     try {
348     int n = 0;
349 jsr166 1.56 for (E e; n < maxElements && (e = peekExpired()) != null;) {
350     c.add(e); // In this order, in case add() throws.
351     q.poll();
352 dl 1.17 ++n;
353     }
354     return n;
355     } finally {
356     lock.unlock();
357     }
358     }
359    
360 dholmes 1.7 /**
361     * Atomically removes all of the elements from this delay queue.
362     * The queue will be empty after this call returns.
363 jsr166 1.32 * Elements with an unexpired delay are not waited for; they are
364     * simply discarded from the queue.
365 dholmes 1.7 */
366 dl 1.2 public void clear() {
367 dl 1.21 final ReentrantLock lock = this.lock;
368 dl 1.2 lock.lock();
369     try {
370     q.clear();
371 tim 1.13 } finally {
372 dl 1.2 lock.unlock();
373     }
374     }
375 tim 1.1
376 dholmes 1.7 /**
377 jsr166 1.61 * Always returns {@code Integer.MAX_VALUE} because
378     * a {@code DelayQueue} is not capacity constrained.
379 jsr166 1.32 *
380 jsr166 1.61 * @return {@code Integer.MAX_VALUE}
381 dholmes 1.7 */
382 dl 1.2 public int remainingCapacity() {
383     return Integer.MAX_VALUE;
384 tim 1.1 }
385 dl 1.2
386 jsr166 1.32 /**
387     * Returns an array containing all of the elements in this queue.
388     * The returned array elements are in no particular order.
389     *
390     * <p>The returned array will be "safe" in that no references to it are
391     * maintained by this queue. (In other words, this method must allocate
392     * a new array). The caller is thus free to modify the returned array.
393 jsr166 1.33 *
394 jsr166 1.32 * <p>This method acts as bridge between array-based and collection-based
395     * APIs.
396     *
397     * @return an array containing all of the elements in this queue
398     */
399 dl 1.2 public Object[] toArray() {
400 dl 1.21 final ReentrantLock lock = this.lock;
401 dl 1.2 lock.lock();
402     try {
403     return q.toArray();
404 tim 1.13 } finally {
405 dl 1.2 lock.unlock();
406     }
407 tim 1.1 }
408 dl 1.2
409 jsr166 1.32 /**
410     * Returns an array containing all of the elements in this queue; the
411     * runtime type of the returned array is that of the specified array.
412     * The returned array elements are in no particular order.
413     * If the queue fits in the specified array, it is returned therein.
414     * Otherwise, a new array is allocated with the runtime type of the
415     * specified array and the size of this queue.
416     *
417     * <p>If this queue fits in the specified array with room to spare
418     * (i.e., the array has more elements than this queue), the element in
419     * the array immediately following the end of the queue is set to
420 jsr166 1.61 * {@code null}.
421 jsr166 1.32 *
422     * <p>Like the {@link #toArray()} method, this method acts as bridge between
423     * array-based and collection-based APIs. Further, this method allows
424     * precise control over the runtime type of the output array, and may,
425     * under certain circumstances, be used to save allocation costs.
426     *
427 jsr166 1.37 * <p>The following code can be used to dump a delay queue into a newly
428 jsr166 1.61 * allocated array of {@code Delayed}:
429 jsr166 1.32 *
430 jsr166 1.55 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
431 jsr166 1.32 *
432 jsr166 1.61 * Note that {@code toArray(new Object[0])} is identical in function to
433     * {@code toArray()}.
434 jsr166 1.32 *
435     * @param a the array into which the elements of the queue are to
436     * be stored, if it is big enough; otherwise, a new array of the
437     * same runtime type is allocated for this purpose
438     * @return an array containing all of the elements in this queue
439     * @throws ArrayStoreException if the runtime type of the specified array
440     * is not a supertype of the runtime type of every element in
441     * this queue
442     * @throws NullPointerException if the specified array is null
443     */
444     public <T> T[] toArray(T[] a) {
445 dl 1.21 final ReentrantLock lock = this.lock;
446 dl 1.2 lock.lock();
447     try {
448 jsr166 1.32 return q.toArray(a);
449 tim 1.13 } finally {
450 dl 1.2 lock.unlock();
451     }
452 tim 1.1 }
453    
454 dl 1.26 /**
455     * Removes a single instance of the specified element from this
456 dl 1.36 * queue, if it is present, whether or not it has expired.
457 dl 1.26 */
458 dholmes 1.7 public boolean remove(Object o) {
459 dl 1.21 final ReentrantLock lock = this.lock;
460 dl 1.2 lock.lock();
461     try {
462 dholmes 1.7 return q.remove(o);
463 tim 1.13 } finally {
464 dl 1.2 lock.unlock();
465     }
466     }
467    
468 dholmes 1.7 /**
469 jsr166 1.57 * Identity-based version for use in Itr.remove
470     */
471     void removeEQ(Object o) {
472     final ReentrantLock lock = this.lock;
473     lock.lock();
474     try {
475     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
476     if (o == it.next()) {
477     it.remove();
478     break;
479     }
480     }
481     } finally {
482     lock.unlock();
483     }
484     }
485    
486     /**
487 dl 1.36 * Returns an iterator over all the elements (both expired and
488 dl 1.44 * unexpired) in this queue. The iterator does not return the
489 jsr166 1.51 * elements in any particular order.
490     *
491 jsr166 1.65 * <p>The returned iterator is
492     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
493 dholmes 1.7 *
494 jsr166 1.32 * @return an iterator over the elements in this queue
495 dholmes 1.7 */
496 dl 1.3 public Iterator<E> iterator() {
497 dl 1.44 return new Itr(toArray());
498 dl 1.2 }
499    
500 dl 1.42 /**
501     * Snapshot iterator that works off copy of underlying q array.
502     */
503 dl 1.44 private class Itr implements Iterator<E> {
504 dl 1.42 final Object[] array; // Array of all elements
505 jsr166 1.57 int cursor; // index of next element to return
506 jsr166 1.49 int lastRet; // index of last element, or -1 if no such
507 jsr166 1.43
508 dl 1.42 Itr(Object[] array) {
509     lastRet = -1;
510     this.array = array;
511 dl 1.2 }
512    
513 tim 1.6 public boolean hasNext() {
514 dl 1.42 return cursor < array.length;
515 tim 1.6 }
516    
517 jsr166 1.49 @SuppressWarnings("unchecked")
518 tim 1.6 public E next() {
519 dl 1.42 if (cursor >= array.length)
520     throw new NoSuchElementException();
521     lastRet = cursor;
522     return (E)array[cursor++];
523 tim 1.6 }
524    
525     public void remove() {
526 jsr166 1.43 if (lastRet < 0)
527 jsr166 1.49 throw new IllegalStateException();
528 jsr166 1.57 removeEQ(array[lastRet]);
529 dl 1.42 lastRet = -1;
530 tim 1.6 }
531 tim 1.1 }
532    
533     }