ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.63
Committed: Tue Feb 5 19:54:06 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.62: +1 -1 lines
Log Message:
javadoc style

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11 import java.util.*;
12
13 /**
14 * An unbounded {@linkplain BlockingQueue blocking queue} of
15 * {@code Delayed} elements, in which an element can only be taken
16 * when its delay has expired. The <em>head</em> of the queue is that
17 * {@code Delayed} element whose delay expired furthest in the
18 * past. If no delay has expired there is no head and {@code poll}
19 * will return {@code null}. Expiration occurs when an element's
20 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
21 * than or equal to zero. Even though unexpired elements cannot be
22 * removed using {@code take} or {@code poll}, they are otherwise
23 * treated as normal elements. For example, the {@code size} method
24 * returns the count of both expired and unexpired elements.
25 * This queue does not permit null elements.
26 *
27 * <p>This class and its iterator implement all of the
28 * <em>optional</em> methods of the {@link Collection} and {@link
29 * Iterator} interfaces. The Iterator provided in method {@link
30 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
31 * the DelayQueue in any particular order.
32 *
33 * <p>This class is a member of the
34 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
35 * Java Collections Framework</a>.
36 *
37 * @since 1.5
38 * @author Doug Lea
39 * @param <E> the type of elements held in this collection
40 */
41 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
42 implements BlockingQueue<E> {
43
44 private final transient ReentrantLock lock = new ReentrantLock();
45 private final PriorityQueue<E> q = new PriorityQueue<E>();
46
47 /**
48 * Thread designated to wait for the element at the head of
49 * the queue. This variant of the Leader-Follower pattern
50 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
51 * minimize unnecessary timed waiting. When a thread becomes
52 * the leader, it waits only for the next delay to elapse, but
53 * other threads await indefinitely. The leader thread must
54 * signal some other thread before returning from take() or
55 * poll(...), unless some other thread becomes leader in the
56 * interim. Whenever the head of the queue is replaced with
57 * an element with an earlier expiration time, the leader
58 * field is invalidated by being reset to null, and some
59 * waiting thread, but not necessarily the current leader, is
60 * signalled. So waiting threads must be prepared to acquire
61 * and lose leadership while waiting.
62 */
63 private Thread leader = null;
64
65 /**
66 * Condition signalled when a newer element becomes available
67 * at the head of the queue or a new thread may need to
68 * become leader.
69 */
70 private final Condition available = lock.newCondition();
71
72 /**
73 * Creates a new {@code DelayQueue} that is initially empty.
74 */
75 public DelayQueue() {}
76
77 /**
78 * Creates a {@code DelayQueue} initially containing the elements of the
79 * given collection of {@link Delayed} instances.
80 *
81 * @param c the collection of elements to initially contain
82 * @throws NullPointerException if the specified collection or any
83 * of its elements are null
84 */
85 public DelayQueue(Collection<? extends E> c) {
86 this.addAll(c);
87 }
88
89 /**
90 * Inserts the specified element into this delay queue.
91 *
92 * @param e the element to add
93 * @return {@code true} (as specified by {@link Collection#add})
94 * @throws NullPointerException if the specified element is null
95 */
96 public boolean add(E e) {
97 return offer(e);
98 }
99
100 /**
101 * Inserts the specified element into this delay queue.
102 *
103 * @param e the element to add
104 * @return {@code true}
105 * @throws NullPointerException if the specified element is null
106 */
107 public boolean offer(E e) {
108 final ReentrantLock lock = this.lock;
109 lock.lock();
110 try {
111 q.offer(e);
112 if (q.peek() == e) {
113 leader = null;
114 available.signal();
115 }
116 return true;
117 } finally {
118 lock.unlock();
119 }
120 }
121
122 /**
123 * Inserts the specified element into this delay queue. As the queue is
124 * unbounded this method will never block.
125 *
126 * @param e the element to add
127 * @throws NullPointerException {@inheritDoc}
128 */
129 public void put(E e) {
130 offer(e);
131 }
132
133 /**
134 * Inserts the specified element into this delay queue. As the queue is
135 * unbounded this method will never block.
136 *
137 * @param e the element to add
138 * @param timeout This parameter is ignored as the method never blocks
139 * @param unit This parameter is ignored as the method never blocks
140 * @return {@code true}
141 * @throws NullPointerException {@inheritDoc}
142 */
143 public boolean offer(E e, long timeout, TimeUnit unit) {
144 return offer(e);
145 }
146
147 /**
148 * Retrieves and removes the head of this queue, or returns {@code null}
149 * if this queue has no elements with an expired delay.
150 *
151 * @return the head of this queue, or {@code null} if this
152 * queue has no elements with an expired delay
153 */
154 public E poll() {
155 final ReentrantLock lock = this.lock;
156 lock.lock();
157 try {
158 E first = q.peek();
159 if (first == null || first.getDelay(NANOSECONDS) > 0)
160 return null;
161 else
162 return q.poll();
163 } finally {
164 lock.unlock();
165 }
166 }
167
168 /**
169 * Retrieves and removes the head of this queue, waiting if necessary
170 * until an element with an expired delay is available on this queue.
171 *
172 * @return the head of this queue
173 * @throws InterruptedException {@inheritDoc}
174 */
175 public E take() throws InterruptedException {
176 final ReentrantLock lock = this.lock;
177 lock.lockInterruptibly();
178 try {
179 for (;;) {
180 E first = q.peek();
181 if (first == null)
182 available.await();
183 else {
184 long delay = first.getDelay(NANOSECONDS);
185 if (delay <= 0)
186 return q.poll();
187 else if (leader != null)
188 available.await();
189 else {
190 Thread thisThread = Thread.currentThread();
191 leader = thisThread;
192 try {
193 available.awaitNanos(delay);
194 } finally {
195 if (leader == thisThread)
196 leader = null;
197 }
198 }
199 }
200 }
201 } finally {
202 if (leader == null && q.peek() != null)
203 available.signal();
204 lock.unlock();
205 }
206 }
207
208 /**
209 * Retrieves and removes the head of this queue, waiting if necessary
210 * until an element with an expired delay is available on this queue,
211 * or the specified wait time expires.
212 *
213 * @return the head of this queue, or {@code null} if the
214 * specified waiting time elapses before an element with
215 * an expired delay becomes available
216 * @throws InterruptedException {@inheritDoc}
217 */
218 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
219 long nanos = unit.toNanos(timeout);
220 final ReentrantLock lock = this.lock;
221 lock.lockInterruptibly();
222 try {
223 for (;;) {
224 E first = q.peek();
225 if (first == null) {
226 if (nanos <= 0)
227 return null;
228 else
229 nanos = available.awaitNanos(nanos);
230 } else {
231 long delay = first.getDelay(NANOSECONDS);
232 if (delay <= 0)
233 return q.poll();
234 if (nanos <= 0)
235 return null;
236 if (nanos < delay || leader != null)
237 nanos = available.awaitNanos(nanos);
238 else {
239 Thread thisThread = Thread.currentThread();
240 leader = thisThread;
241 try {
242 long timeLeft = available.awaitNanos(delay);
243 nanos -= delay - timeLeft;
244 } finally {
245 if (leader == thisThread)
246 leader = null;
247 }
248 }
249 }
250 }
251 } finally {
252 if (leader == null && q.peek() != null)
253 available.signal();
254 lock.unlock();
255 }
256 }
257
258 /**
259 * Retrieves, but does not remove, the head of this queue, or
260 * returns {@code null} if this queue is empty. Unlike
261 * {@code poll}, if no expired elements are available in the queue,
262 * this method returns the element that will expire next,
263 * if one exists.
264 *
265 * @return the head of this queue, or {@code null} if this
266 * queue is empty
267 */
268 public E peek() {
269 final ReentrantLock lock = this.lock;
270 lock.lock();
271 try {
272 return q.peek();
273 } finally {
274 lock.unlock();
275 }
276 }
277
278 public int size() {
279 final ReentrantLock lock = this.lock;
280 lock.lock();
281 try {
282 return q.size();
283 } finally {
284 lock.unlock();
285 }
286 }
287
288 /**
289 * Returns first element only if it is expired.
290 * Used only by drainTo. Call only when holding lock.
291 */
292 private E peekExpired() {
293 // assert lock.isHeldByCurrentThread();
294 E first = q.peek();
295 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
296 null : first;
297 }
298
299 /**
300 * @throws UnsupportedOperationException {@inheritDoc}
301 * @throws ClassCastException {@inheritDoc}
302 * @throws NullPointerException {@inheritDoc}
303 * @throws IllegalArgumentException {@inheritDoc}
304 */
305 public int drainTo(Collection<? super E> c) {
306 if (c == null)
307 throw new NullPointerException();
308 if (c == this)
309 throw new IllegalArgumentException();
310 final ReentrantLock lock = this.lock;
311 lock.lock();
312 try {
313 int n = 0;
314 for (E e; (e = peekExpired()) != null;) {
315 c.add(e); // In this order, in case add() throws.
316 q.poll();
317 ++n;
318 }
319 return n;
320 } finally {
321 lock.unlock();
322 }
323 }
324
325 /**
326 * @throws UnsupportedOperationException {@inheritDoc}
327 * @throws ClassCastException {@inheritDoc}
328 * @throws NullPointerException {@inheritDoc}
329 * @throws IllegalArgumentException {@inheritDoc}
330 */
331 public int drainTo(Collection<? super E> c, int maxElements) {
332 if (c == null)
333 throw new NullPointerException();
334 if (c == this)
335 throw new IllegalArgumentException();
336 if (maxElements <= 0)
337 return 0;
338 final ReentrantLock lock = this.lock;
339 lock.lock();
340 try {
341 int n = 0;
342 for (E e; n < maxElements && (e = peekExpired()) != null;) {
343 c.add(e); // In this order, in case add() throws.
344 q.poll();
345 ++n;
346 }
347 return n;
348 } finally {
349 lock.unlock();
350 }
351 }
352
353 /**
354 * Atomically removes all of the elements from this delay queue.
355 * The queue will be empty after this call returns.
356 * Elements with an unexpired delay are not waited for; they are
357 * simply discarded from the queue.
358 */
359 public void clear() {
360 final ReentrantLock lock = this.lock;
361 lock.lock();
362 try {
363 q.clear();
364 } finally {
365 lock.unlock();
366 }
367 }
368
369 /**
370 * Always returns {@code Integer.MAX_VALUE} because
371 * a {@code DelayQueue} is not capacity constrained.
372 *
373 * @return {@code Integer.MAX_VALUE}
374 */
375 public int remainingCapacity() {
376 return Integer.MAX_VALUE;
377 }
378
379 /**
380 * Returns an array containing all of the elements in this queue.
381 * The returned array elements are in no particular order.
382 *
383 * <p>The returned array will be "safe" in that no references to it are
384 * maintained by this queue. (In other words, this method must allocate
385 * a new array). The caller is thus free to modify the returned array.
386 *
387 * <p>This method acts as bridge between array-based and collection-based
388 * APIs.
389 *
390 * @return an array containing all of the elements in this queue
391 */
392 public Object[] toArray() {
393 final ReentrantLock lock = this.lock;
394 lock.lock();
395 try {
396 return q.toArray();
397 } finally {
398 lock.unlock();
399 }
400 }
401
402 /**
403 * Returns an array containing all of the elements in this queue; the
404 * runtime type of the returned array is that of the specified array.
405 * The returned array elements are in no particular order.
406 * If the queue fits in the specified array, it is returned therein.
407 * Otherwise, a new array is allocated with the runtime type of the
408 * specified array and the size of this queue.
409 *
410 * <p>If this queue fits in the specified array with room to spare
411 * (i.e., the array has more elements than this queue), the element in
412 * the array immediately following the end of the queue is set to
413 * {@code null}.
414 *
415 * <p>Like the {@link #toArray()} method, this method acts as bridge between
416 * array-based and collection-based APIs. Further, this method allows
417 * precise control over the runtime type of the output array, and may,
418 * under certain circumstances, be used to save allocation costs.
419 *
420 * <p>The following code can be used to dump a delay queue into a newly
421 * allocated array of {@code Delayed}:
422 *
423 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
424 *
425 * Note that {@code toArray(new Object[0])} is identical in function to
426 * {@code toArray()}.
427 *
428 * @param a the array into which the elements of the queue are to
429 * be stored, if it is big enough; otherwise, a new array of the
430 * same runtime type is allocated for this purpose
431 * @return an array containing all of the elements in this queue
432 * @throws ArrayStoreException if the runtime type of the specified array
433 * is not a supertype of the runtime type of every element in
434 * this queue
435 * @throws NullPointerException if the specified array is null
436 */
437 public <T> T[] toArray(T[] a) {
438 final ReentrantLock lock = this.lock;
439 lock.lock();
440 try {
441 return q.toArray(a);
442 } finally {
443 lock.unlock();
444 }
445 }
446
447 /**
448 * Removes a single instance of the specified element from this
449 * queue, if it is present, whether or not it has expired.
450 */
451 public boolean remove(Object o) {
452 final ReentrantLock lock = this.lock;
453 lock.lock();
454 try {
455 return q.remove(o);
456 } finally {
457 lock.unlock();
458 }
459 }
460
461 /**
462 * Identity-based version for use in Itr.remove
463 */
464 void removeEQ(Object o) {
465 final ReentrantLock lock = this.lock;
466 lock.lock();
467 try {
468 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
469 if (o == it.next()) {
470 it.remove();
471 break;
472 }
473 }
474 } finally {
475 lock.unlock();
476 }
477 }
478
479 /**
480 * Returns an iterator over all the elements (both expired and
481 * unexpired) in this queue. The iterator does not return the
482 * elements in any particular order.
483 *
484 * <p>The returned iterator is a "weakly consistent" iterator that
485 * will never throw {@link java.util.ConcurrentModificationException
486 * ConcurrentModificationException}, and guarantees to traverse
487 * elements as they existed upon construction of the iterator, and
488 * may (but is not guaranteed to) reflect any modifications
489 * subsequent to construction.
490 *
491 * @return an iterator over the elements in this queue
492 */
493 public Iterator<E> iterator() {
494 return new Itr(toArray());
495 }
496
497 /**
498 * Snapshot iterator that works off copy of underlying q array.
499 */
500 private class Itr implements Iterator<E> {
501 final Object[] array; // Array of all elements
502 int cursor; // index of next element to return
503 int lastRet; // index of last element, or -1 if no such
504
505 Itr(Object[] array) {
506 lastRet = -1;
507 this.array = array;
508 }
509
510 public boolean hasNext() {
511 return cursor < array.length;
512 }
513
514 @SuppressWarnings("unchecked")
515 public E next() {
516 if (cursor >= array.length)
517 throw new NoSuchElementException();
518 lastRet = cursor;
519 return (E)array[cursor++];
520 }
521
522 public void remove() {
523 if (lastRet < 0)
524 throw new IllegalStateException();
525 removeEQ(array[lastRet]);
526 lastRet = -1;
527 }
528 }
529
530 }