ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.58
Committed: Thu Nov 24 02:35:13 2011 UTC (12 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.57: +0 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import static java.util.concurrent.TimeUnit.NANOSECONDS;
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11 import java.util.*;
12
13 /**
14 * An unbounded {@linkplain BlockingQueue blocking queue} of
15 * <tt>Delayed</tt> elements, in which an element can only be taken
16 * when its delay has expired. The <em>head</em> of the queue is that
17 * <tt>Delayed</tt> element whose delay expired furthest in the
18 * past. If no delay has expired there is no head and <tt>poll</tt>
19 * will return <tt>null</tt>. Expiration occurs when an element's
20 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
21 * than or equal to zero. Even though unexpired elements cannot be
22 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
23 * treated as normal elements. For example, the <tt>size</tt> method
24 * returns the count of both expired and unexpired elements.
25 * This queue does not permit null elements.
26 *
27 * <p>This class and its iterator implement all of the
28 * <em>optional</em> methods of the {@link Collection} and {@link
29 * Iterator} interfaces. The Iterator provided in method {@link
30 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
31 * the DelayQueue in any particular order.
32 *
33 * <p>This class is a member of the
34 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
35 * Java Collections Framework</a>.
36 *
37 * @since 1.5
38 * @author Doug Lea
39 * @param <E> the type of elements held in this collection
40 */
41
42 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
43 implements BlockingQueue<E> {
44
45 private transient final ReentrantLock lock = new ReentrantLock();
46 private final PriorityQueue<E> q = new PriorityQueue<E>();
47
48 /**
49 * Thread designated to wait for the element at the head of
50 * the queue. This variant of the Leader-Follower pattern
51 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
52 * minimize unnecessary timed waiting. When a thread becomes
53 * the leader, it waits only for the next delay to elapse, but
54 * other threads await indefinitely. The leader thread must
55 * signal some other thread before returning from take() or
56 * poll(...), unless some other thread becomes leader in the
57 * interim. Whenever the head of the queue is replaced with
58 * an element with an earlier expiration time, the leader
59 * field is invalidated by being reset to null, and some
60 * waiting thread, but not necessarily the current leader, is
61 * signalled. So waiting threads must be prepared to acquire
62 * and lose leadership while waiting.
63 */
64 private Thread leader = null;
65
66 /**
67 * Condition signalled when a newer element becomes available
68 * at the head of the queue or a new thread may need to
69 * become leader.
70 */
71 private final Condition available = lock.newCondition();
72
73 /**
74 * Creates a new <tt>DelayQueue</tt> that is initially empty.
75 */
76 public DelayQueue() {}
77
78 /**
79 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
80 * given collection of {@link Delayed} instances.
81 *
82 * @param c the collection of elements to initially contain
83 * @throws NullPointerException if the specified collection or any
84 * of its elements are null
85 */
86 public DelayQueue(Collection<? extends E> c) {
87 this.addAll(c);
88 }
89
90 /**
91 * Inserts the specified element into this delay queue.
92 *
93 * @param e the element to add
94 * @return <tt>true</tt> (as specified by {@link Collection#add})
95 * @throws NullPointerException if the specified element is null
96 */
97 public boolean add(E e) {
98 return offer(e);
99 }
100
101 /**
102 * Inserts the specified element into this delay queue.
103 *
104 * @param e the element to add
105 * @return <tt>true</tt>
106 * @throws NullPointerException if the specified element is null
107 */
108 public boolean offer(E e) {
109 final ReentrantLock lock = this.lock;
110 lock.lock();
111 try {
112 q.offer(e);
113 if (q.peek() == e) {
114 leader = null;
115 available.signal();
116 }
117 return true;
118 } finally {
119 lock.unlock();
120 }
121 }
122
123 /**
124 * Inserts the specified element into this delay queue. As the queue is
125 * unbounded this method will never block.
126 *
127 * @param e the element to add
128 * @throws NullPointerException {@inheritDoc}
129 */
130 public void put(E e) {
131 offer(e);
132 }
133
134 /**
135 * Inserts the specified element into this delay queue. As the queue is
136 * unbounded this method will never block.
137 *
138 * @param e the element to add
139 * @param timeout This parameter is ignored as the method never blocks
140 * @param unit This parameter is ignored as the method never blocks
141 * @return <tt>true</tt>
142 * @throws NullPointerException {@inheritDoc}
143 */
144 public boolean offer(E e, long timeout, TimeUnit unit) {
145 return offer(e);
146 }
147
148 /**
149 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
150 * if this queue has no elements with an expired delay.
151 *
152 * @return the head of this queue, or <tt>null</tt> if this
153 * queue has no elements with an expired delay
154 */
155 public E poll() {
156 final ReentrantLock lock = this.lock;
157 lock.lock();
158 try {
159 E first = q.peek();
160 if (first == null || first.getDelay(NANOSECONDS) > 0)
161 return null;
162 else
163 return q.poll();
164 } finally {
165 lock.unlock();
166 }
167 }
168
169 /**
170 * Retrieves and removes the head of this queue, waiting if necessary
171 * until an element with an expired delay is available on this queue.
172 *
173 * @return the head of this queue
174 * @throws InterruptedException {@inheritDoc}
175 */
176 public E take() throws InterruptedException {
177 final ReentrantLock lock = this.lock;
178 lock.lockInterruptibly();
179 try {
180 for (;;) {
181 E first = q.peek();
182 if (first == null)
183 available.await();
184 else {
185 long delay = first.getDelay(NANOSECONDS);
186 if (delay <= 0)
187 return q.poll();
188 else if (leader != null)
189 available.await();
190 else {
191 Thread thisThread = Thread.currentThread();
192 leader = thisThread;
193 try {
194 available.awaitNanos(delay);
195 } finally {
196 if (leader == thisThread)
197 leader = null;
198 }
199 }
200 }
201 }
202 } finally {
203 if (leader == null && q.peek() != null)
204 available.signal();
205 lock.unlock();
206 }
207 }
208
209 /**
210 * Retrieves and removes the head of this queue, waiting if necessary
211 * until an element with an expired delay is available on this queue,
212 * or the specified wait time expires.
213 *
214 * @return the head of this queue, or <tt>null</tt> if the
215 * specified waiting time elapses before an element with
216 * an expired delay becomes available
217 * @throws InterruptedException {@inheritDoc}
218 */
219 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
220 long nanos = unit.toNanos(timeout);
221 final ReentrantLock lock = this.lock;
222 lock.lockInterruptibly();
223 try {
224 for (;;) {
225 E first = q.peek();
226 if (first == null) {
227 if (nanos <= 0)
228 return null;
229 else
230 nanos = available.awaitNanos(nanos);
231 } else {
232 long delay = first.getDelay(NANOSECONDS);
233 if (delay <= 0)
234 return q.poll();
235 if (nanos <= 0)
236 return null;
237 if (nanos < delay || leader != null)
238 nanos = available.awaitNanos(nanos);
239 else {
240 Thread thisThread = Thread.currentThread();
241 leader = thisThread;
242 try {
243 long timeLeft = available.awaitNanos(delay);
244 nanos -= delay - timeLeft;
245 } finally {
246 if (leader == thisThread)
247 leader = null;
248 }
249 }
250 }
251 }
252 } finally {
253 if (leader == null && q.peek() != null)
254 available.signal();
255 lock.unlock();
256 }
257 }
258
259 /**
260 * Retrieves, but does not remove, the head of this queue, or
261 * returns <tt>null</tt> if this queue is empty. Unlike
262 * <tt>poll</tt>, if no expired elements are available in the queue,
263 * this method returns the element that will expire next,
264 * if one exists.
265 *
266 * @return the head of this queue, or <tt>null</tt> if this
267 * queue is empty.
268 */
269 public E peek() {
270 final ReentrantLock lock = this.lock;
271 lock.lock();
272 try {
273 return q.peek();
274 } finally {
275 lock.unlock();
276 }
277 }
278
279 public int size() {
280 final ReentrantLock lock = this.lock;
281 lock.lock();
282 try {
283 return q.size();
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 /**
290 * Return first element only if it is expired.
291 * Used only by drainTo. Call only when holding lock.
292 */
293 private E peekExpired() {
294 // assert lock.isHeldByCurrentThread();
295 E first = q.peek();
296 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
297 null : first;
298 }
299
300 /**
301 * @throws UnsupportedOperationException {@inheritDoc}
302 * @throws ClassCastException {@inheritDoc}
303 * @throws NullPointerException {@inheritDoc}
304 * @throws IllegalArgumentException {@inheritDoc}
305 */
306 public int drainTo(Collection<? super E> c) {
307 if (c == null)
308 throw new NullPointerException();
309 if (c == this)
310 throw new IllegalArgumentException();
311 final ReentrantLock lock = this.lock;
312 lock.lock();
313 try {
314 int n = 0;
315 for (E e; (e = peekExpired()) != null;) {
316 c.add(e); // In this order, in case add() throws.
317 q.poll();
318 ++n;
319 }
320 return n;
321 } finally {
322 lock.unlock();
323 }
324 }
325
326 /**
327 * @throws UnsupportedOperationException {@inheritDoc}
328 * @throws ClassCastException {@inheritDoc}
329 * @throws NullPointerException {@inheritDoc}
330 * @throws IllegalArgumentException {@inheritDoc}
331 */
332 public int drainTo(Collection<? super E> c, int maxElements) {
333 if (c == null)
334 throw new NullPointerException();
335 if (c == this)
336 throw new IllegalArgumentException();
337 if (maxElements <= 0)
338 return 0;
339 final ReentrantLock lock = this.lock;
340 lock.lock();
341 try {
342 int n = 0;
343 for (E e; n < maxElements && (e = peekExpired()) != null;) {
344 c.add(e); // In this order, in case add() throws.
345 q.poll();
346 ++n;
347 }
348 return n;
349 } finally {
350 lock.unlock();
351 }
352 }
353
354 /**
355 * Atomically removes all of the elements from this delay queue.
356 * The queue will be empty after this call returns.
357 * Elements with an unexpired delay are not waited for; they are
358 * simply discarded from the queue.
359 */
360 public void clear() {
361 final ReentrantLock lock = this.lock;
362 lock.lock();
363 try {
364 q.clear();
365 } finally {
366 lock.unlock();
367 }
368 }
369
370 /**
371 * Always returns <tt>Integer.MAX_VALUE</tt> because
372 * a <tt>DelayQueue</tt> is not capacity constrained.
373 *
374 * @return <tt>Integer.MAX_VALUE</tt>
375 */
376 public int remainingCapacity() {
377 return Integer.MAX_VALUE;
378 }
379
380 /**
381 * Returns an array containing all of the elements in this queue.
382 * The returned array elements are in no particular order.
383 *
384 * <p>The returned array will be "safe" in that no references to it are
385 * maintained by this queue. (In other words, this method must allocate
386 * a new array). The caller is thus free to modify the returned array.
387 *
388 * <p>This method acts as bridge between array-based and collection-based
389 * APIs.
390 *
391 * @return an array containing all of the elements in this queue
392 */
393 public Object[] toArray() {
394 final ReentrantLock lock = this.lock;
395 lock.lock();
396 try {
397 return q.toArray();
398 } finally {
399 lock.unlock();
400 }
401 }
402
403 /**
404 * Returns an array containing all of the elements in this queue; the
405 * runtime type of the returned array is that of the specified array.
406 * The returned array elements are in no particular order.
407 * If the queue fits in the specified array, it is returned therein.
408 * Otherwise, a new array is allocated with the runtime type of the
409 * specified array and the size of this queue.
410 *
411 * <p>If this queue fits in the specified array with room to spare
412 * (i.e., the array has more elements than this queue), the element in
413 * the array immediately following the end of the queue is set to
414 * <tt>null</tt>.
415 *
416 * <p>Like the {@link #toArray()} method, this method acts as bridge between
417 * array-based and collection-based APIs. Further, this method allows
418 * precise control over the runtime type of the output array, and may,
419 * under certain circumstances, be used to save allocation costs.
420 *
421 * <p>The following code can be used to dump a delay queue into a newly
422 * allocated array of <tt>Delayed</tt>:
423 *
424 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
425 *
426 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
427 * <tt>toArray()</tt>.
428 *
429 * @param a the array into which the elements of the queue are to
430 * be stored, if it is big enough; otherwise, a new array of the
431 * same runtime type is allocated for this purpose
432 * @return an array containing all of the elements in this queue
433 * @throws ArrayStoreException if the runtime type of the specified array
434 * is not a supertype of the runtime type of every element in
435 * this queue
436 * @throws NullPointerException if the specified array is null
437 */
438 public <T> T[] toArray(T[] a) {
439 final ReentrantLock lock = this.lock;
440 lock.lock();
441 try {
442 return q.toArray(a);
443 } finally {
444 lock.unlock();
445 }
446 }
447
448 /**
449 * Removes a single instance of the specified element from this
450 * queue, if it is present, whether or not it has expired.
451 */
452 public boolean remove(Object o) {
453 final ReentrantLock lock = this.lock;
454 lock.lock();
455 try {
456 return q.remove(o);
457 } finally {
458 lock.unlock();
459 }
460 }
461
462 /**
463 * Identity-based version for use in Itr.remove
464 */
465 void removeEQ(Object o) {
466 final ReentrantLock lock = this.lock;
467 lock.lock();
468 try {
469 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
470 if (o == it.next()) {
471 it.remove();
472 break;
473 }
474 }
475 } finally {
476 lock.unlock();
477 }
478 }
479
480 /**
481 * Returns an iterator over all the elements (both expired and
482 * unexpired) in this queue. The iterator does not return the
483 * elements in any particular order.
484 *
485 * <p>The returned iterator is a "weakly consistent" iterator that
486 * will never throw {@link java.util.ConcurrentModificationException
487 * ConcurrentModificationException}, and guarantees to traverse
488 * elements as they existed upon construction of the iterator, and
489 * may (but is not guaranteed to) reflect any modifications
490 * subsequent to construction.
491 *
492 * @return an iterator over the elements in this queue
493 */
494 public Iterator<E> iterator() {
495 return new Itr(toArray());
496 }
497
498 /**
499 * Snapshot iterator that works off copy of underlying q array.
500 */
501 private class Itr implements Iterator<E> {
502 final Object[] array; // Array of all elements
503 int cursor; // index of next element to return
504 int lastRet; // index of last element, or -1 if no such
505
506 Itr(Object[] array) {
507 lastRet = -1;
508 this.array = array;
509 }
510
511 public boolean hasNext() {
512 return cursor < array.length;
513 }
514
515 @SuppressWarnings("unchecked")
516 public E next() {
517 if (cursor >= array.length)
518 throw new NoSuchElementException();
519 lastRet = cursor;
520 return (E)array[cursor++];
521 }
522
523 public void remove() {
524 if (lastRet < 0)
525 throw new IllegalStateException();
526 removeEQ(array[lastRet]);
527 lastRet = -1;
528 }
529 }
530
531 }