ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.57
Committed: Sun Jul 3 06:04:06 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.56: +20 -15 lines
Log Message:
Introduce removeEQ as in PriorityBlockingQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7
8 package java.util.concurrent;
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13
14 /**
15 * An unbounded {@linkplain BlockingQueue blocking queue} of
16 * <tt>Delayed</tt> elements, in which an element can only be taken
17 * when its delay has expired. The <em>head</em> of the queue is that
18 * <tt>Delayed</tt> element whose delay expired furthest in the
19 * past. If no delay has expired there is no head and <tt>poll</tt>
20 * will return <tt>null</tt>. Expiration occurs when an element's
21 * <tt>getDelay(TimeUnit.NANOSECONDS)</tt> method returns a value less
22 * than or equal to zero. Even though unexpired elements cannot be
23 * removed using <tt>take</tt> or <tt>poll</tt>, they are otherwise
24 * treated as normal elements. For example, the <tt>size</tt> method
25 * returns the count of both expired and unexpired elements.
26 * This queue does not permit null elements.
27 *
28 * <p>This class and its iterator implement all of the
29 * <em>optional</em> methods of the {@link Collection} and {@link
30 * Iterator} interfaces. The Iterator provided in method {@link
31 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
32 * the DelayQueue in any particular order.
33 *
34 * <p>This class is a member of the
35 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
36 * Java Collections Framework</a>.
37 *
38 * @since 1.5
39 * @author Doug Lea
40 * @param <E> the type of elements held in this collection
41 */
42
43 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
44 implements BlockingQueue<E> {
45
46 private transient final ReentrantLock lock = new ReentrantLock();
47 private final PriorityQueue<E> q = new PriorityQueue<E>();
48
49 /**
50 * Thread designated to wait for the element at the head of
51 * the queue. This variant of the Leader-Follower pattern
52 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
53 * minimize unnecessary timed waiting. When a thread becomes
54 * the leader, it waits only for the next delay to elapse, but
55 * other threads await indefinitely. The leader thread must
56 * signal some other thread before returning from take() or
57 * poll(...), unless some other thread becomes leader in the
58 * interim. Whenever the head of the queue is replaced with
59 * an element with an earlier expiration time, the leader
60 * field is invalidated by being reset to null, and some
61 * waiting thread, but not necessarily the current leader, is
62 * signalled. So waiting threads must be prepared to acquire
63 * and lose leadership while waiting.
64 */
65 private Thread leader = null;
66
67 /**
68 * Condition signalled when a newer element becomes available
69 * at the head of the queue or a new thread may need to
70 * become leader.
71 */
72 private final Condition available = lock.newCondition();
73
74 /**
75 * Creates a new <tt>DelayQueue</tt> that is initially empty.
76 */
77 public DelayQueue() {}
78
79 /**
80 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
81 * given collection of {@link Delayed} instances.
82 *
83 * @param c the collection of elements to initially contain
84 * @throws NullPointerException if the specified collection or any
85 * of its elements are null
86 */
87 public DelayQueue(Collection<? extends E> c) {
88 this.addAll(c);
89 }
90
91 /**
92 * Inserts the specified element into this delay queue.
93 *
94 * @param e the element to add
95 * @return <tt>true</tt> (as specified by {@link Collection#add})
96 * @throws NullPointerException if the specified element is null
97 */
98 public boolean add(E e) {
99 return offer(e);
100 }
101
102 /**
103 * Inserts the specified element into this delay queue.
104 *
105 * @param e the element to add
106 * @return <tt>true</tt>
107 * @throws NullPointerException if the specified element is null
108 */
109 public boolean offer(E e) {
110 final ReentrantLock lock = this.lock;
111 lock.lock();
112 try {
113 q.offer(e);
114 if (q.peek() == e) {
115 leader = null;
116 available.signal();
117 }
118 return true;
119 } finally {
120 lock.unlock();
121 }
122 }
123
124 /**
125 * Inserts the specified element into this delay queue. As the queue is
126 * unbounded this method will never block.
127 *
128 * @param e the element to add
129 * @throws NullPointerException {@inheritDoc}
130 */
131 public void put(E e) {
132 offer(e);
133 }
134
135 /**
136 * Inserts the specified element into this delay queue. As the queue is
137 * unbounded this method will never block.
138 *
139 * @param e the element to add
140 * @param timeout This parameter is ignored as the method never blocks
141 * @param unit This parameter is ignored as the method never blocks
142 * @return <tt>true</tt>
143 * @throws NullPointerException {@inheritDoc}
144 */
145 public boolean offer(E e, long timeout, TimeUnit unit) {
146 return offer(e);
147 }
148
149 /**
150 * Retrieves and removes the head of this queue, or returns <tt>null</tt>
151 * if this queue has no elements with an expired delay.
152 *
153 * @return the head of this queue, or <tt>null</tt> if this
154 * queue has no elements with an expired delay
155 */
156 public E poll() {
157 final ReentrantLock lock = this.lock;
158 lock.lock();
159 try {
160 E first = q.peek();
161 if (first == null || first.getDelay(NANOSECONDS) > 0)
162 return null;
163 else
164 return q.poll();
165 } finally {
166 lock.unlock();
167 }
168 }
169
170 /**
171 * Retrieves and removes the head of this queue, waiting if necessary
172 * until an element with an expired delay is available on this queue.
173 *
174 * @return the head of this queue
175 * @throws InterruptedException {@inheritDoc}
176 */
177 public E take() throws InterruptedException {
178 final ReentrantLock lock = this.lock;
179 lock.lockInterruptibly();
180 try {
181 for (;;) {
182 E first = q.peek();
183 if (first == null)
184 available.await();
185 else {
186 long delay = first.getDelay(NANOSECONDS);
187 if (delay <= 0)
188 return q.poll();
189 else if (leader != null)
190 available.await();
191 else {
192 Thread thisThread = Thread.currentThread();
193 leader = thisThread;
194 try {
195 available.awaitNanos(delay);
196 } finally {
197 if (leader == thisThread)
198 leader = null;
199 }
200 }
201 }
202 }
203 } finally {
204 if (leader == null && q.peek() != null)
205 available.signal();
206 lock.unlock();
207 }
208 }
209
210 /**
211 * Retrieves and removes the head of this queue, waiting if necessary
212 * until an element with an expired delay is available on this queue,
213 * or the specified wait time expires.
214 *
215 * @return the head of this queue, or <tt>null</tt> if the
216 * specified waiting time elapses before an element with
217 * an expired delay becomes available
218 * @throws InterruptedException {@inheritDoc}
219 */
220 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
221 long nanos = unit.toNanos(timeout);
222 final ReentrantLock lock = this.lock;
223 lock.lockInterruptibly();
224 try {
225 for (;;) {
226 E first = q.peek();
227 if (first == null) {
228 if (nanos <= 0)
229 return null;
230 else
231 nanos = available.awaitNanos(nanos);
232 } else {
233 long delay = first.getDelay(NANOSECONDS);
234 if (delay <= 0)
235 return q.poll();
236 if (nanos <= 0)
237 return null;
238 if (nanos < delay || leader != null)
239 nanos = available.awaitNanos(nanos);
240 else {
241 Thread thisThread = Thread.currentThread();
242 leader = thisThread;
243 try {
244 long timeLeft = available.awaitNanos(delay);
245 nanos -= delay - timeLeft;
246 } finally {
247 if (leader == thisThread)
248 leader = null;
249 }
250 }
251 }
252 }
253 } finally {
254 if (leader == null && q.peek() != null)
255 available.signal();
256 lock.unlock();
257 }
258 }
259
260 /**
261 * Retrieves, but does not remove, the head of this queue, or
262 * returns <tt>null</tt> if this queue is empty. Unlike
263 * <tt>poll</tt>, if no expired elements are available in the queue,
264 * this method returns the element that will expire next,
265 * if one exists.
266 *
267 * @return the head of this queue, or <tt>null</tt> if this
268 * queue is empty.
269 */
270 public E peek() {
271 final ReentrantLock lock = this.lock;
272 lock.lock();
273 try {
274 return q.peek();
275 } finally {
276 lock.unlock();
277 }
278 }
279
280 public int size() {
281 final ReentrantLock lock = this.lock;
282 lock.lock();
283 try {
284 return q.size();
285 } finally {
286 lock.unlock();
287 }
288 }
289
290 /**
291 * Return first element only if it is expired.
292 * Used only by drainTo. Call only when holding lock.
293 */
294 private E peekExpired() {
295 // assert lock.isHeldByCurrentThread();
296 E first = q.peek();
297 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
298 null : first;
299 }
300
301 /**
302 * @throws UnsupportedOperationException {@inheritDoc}
303 * @throws ClassCastException {@inheritDoc}
304 * @throws NullPointerException {@inheritDoc}
305 * @throws IllegalArgumentException {@inheritDoc}
306 */
307 public int drainTo(Collection<? super E> c) {
308 if (c == null)
309 throw new NullPointerException();
310 if (c == this)
311 throw new IllegalArgumentException();
312 final ReentrantLock lock = this.lock;
313 lock.lock();
314 try {
315 int n = 0;
316 for (E e; (e = peekExpired()) != null;) {
317 c.add(e); // In this order, in case add() throws.
318 q.poll();
319 ++n;
320 }
321 return n;
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 /**
328 * @throws UnsupportedOperationException {@inheritDoc}
329 * @throws ClassCastException {@inheritDoc}
330 * @throws NullPointerException {@inheritDoc}
331 * @throws IllegalArgumentException {@inheritDoc}
332 */
333 public int drainTo(Collection<? super E> c, int maxElements) {
334 if (c == null)
335 throw new NullPointerException();
336 if (c == this)
337 throw new IllegalArgumentException();
338 if (maxElements <= 0)
339 return 0;
340 final ReentrantLock lock = this.lock;
341 lock.lock();
342 try {
343 int n = 0;
344 for (E e; n < maxElements && (e = peekExpired()) != null;) {
345 c.add(e); // In this order, in case add() throws.
346 q.poll();
347 ++n;
348 }
349 return n;
350 } finally {
351 lock.unlock();
352 }
353 }
354
355 /**
356 * Atomically removes all of the elements from this delay queue.
357 * The queue will be empty after this call returns.
358 * Elements with an unexpired delay are not waited for; they are
359 * simply discarded from the queue.
360 */
361 public void clear() {
362 final ReentrantLock lock = this.lock;
363 lock.lock();
364 try {
365 q.clear();
366 } finally {
367 lock.unlock();
368 }
369 }
370
371 /**
372 * Always returns <tt>Integer.MAX_VALUE</tt> because
373 * a <tt>DelayQueue</tt> is not capacity constrained.
374 *
375 * @return <tt>Integer.MAX_VALUE</tt>
376 */
377 public int remainingCapacity() {
378 return Integer.MAX_VALUE;
379 }
380
381 /**
382 * Returns an array containing all of the elements in this queue.
383 * The returned array elements are in no particular order.
384 *
385 * <p>The returned array will be "safe" in that no references to it are
386 * maintained by this queue. (In other words, this method must allocate
387 * a new array). The caller is thus free to modify the returned array.
388 *
389 * <p>This method acts as bridge between array-based and collection-based
390 * APIs.
391 *
392 * @return an array containing all of the elements in this queue
393 */
394 public Object[] toArray() {
395 final ReentrantLock lock = this.lock;
396 lock.lock();
397 try {
398 return q.toArray();
399 } finally {
400 lock.unlock();
401 }
402 }
403
404 /**
405 * Returns an array containing all of the elements in this queue; the
406 * runtime type of the returned array is that of the specified array.
407 * The returned array elements are in no particular order.
408 * If the queue fits in the specified array, it is returned therein.
409 * Otherwise, a new array is allocated with the runtime type of the
410 * specified array and the size of this queue.
411 *
412 * <p>If this queue fits in the specified array with room to spare
413 * (i.e., the array has more elements than this queue), the element in
414 * the array immediately following the end of the queue is set to
415 * <tt>null</tt>.
416 *
417 * <p>Like the {@link #toArray()} method, this method acts as bridge between
418 * array-based and collection-based APIs. Further, this method allows
419 * precise control over the runtime type of the output array, and may,
420 * under certain circumstances, be used to save allocation costs.
421 *
422 * <p>The following code can be used to dump a delay queue into a newly
423 * allocated array of <tt>Delayed</tt>:
424 *
425 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
426 *
427 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
428 * <tt>toArray()</tt>.
429 *
430 * @param a the array into which the elements of the queue are to
431 * be stored, if it is big enough; otherwise, a new array of the
432 * same runtime type is allocated for this purpose
433 * @return an array containing all of the elements in this queue
434 * @throws ArrayStoreException if the runtime type of the specified array
435 * is not a supertype of the runtime type of every element in
436 * this queue
437 * @throws NullPointerException if the specified array is null
438 */
439 public <T> T[] toArray(T[] a) {
440 final ReentrantLock lock = this.lock;
441 lock.lock();
442 try {
443 return q.toArray(a);
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 /**
450 * Removes a single instance of the specified element from this
451 * queue, if it is present, whether or not it has expired.
452 */
453 public boolean remove(Object o) {
454 final ReentrantLock lock = this.lock;
455 lock.lock();
456 try {
457 return q.remove(o);
458 } finally {
459 lock.unlock();
460 }
461 }
462
463 /**
464 * Identity-based version for use in Itr.remove
465 */
466 void removeEQ(Object o) {
467 final ReentrantLock lock = this.lock;
468 lock.lock();
469 try {
470 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
471 if (o == it.next()) {
472 it.remove();
473 break;
474 }
475 }
476 } finally {
477 lock.unlock();
478 }
479 }
480
481 /**
482 * Returns an iterator over all the elements (both expired and
483 * unexpired) in this queue. The iterator does not return the
484 * elements in any particular order.
485 *
486 * <p>The returned iterator is a "weakly consistent" iterator that
487 * will never throw {@link java.util.ConcurrentModificationException
488 * ConcurrentModificationException}, and guarantees to traverse
489 * elements as they existed upon construction of the iterator, and
490 * may (but is not guaranteed to) reflect any modifications
491 * subsequent to construction.
492 *
493 * @return an iterator over the elements in this queue
494 */
495 public Iterator<E> iterator() {
496 return new Itr(toArray());
497 }
498
499 /**
500 * Snapshot iterator that works off copy of underlying q array.
501 */
502 private class Itr implements Iterator<E> {
503 final Object[] array; // Array of all elements
504 int cursor; // index of next element to return
505 int lastRet; // index of last element, or -1 if no such
506
507 Itr(Object[] array) {
508 lastRet = -1;
509 this.array = array;
510 }
511
512 public boolean hasNext() {
513 return cursor < array.length;
514 }
515
516 @SuppressWarnings("unchecked")
517 public E next() {
518 if (cursor >= array.length)
519 throw new NoSuchElementException();
520 lastRet = cursor;
521 return (E)array[cursor++];
522 }
523
524 public void remove() {
525 if (lastRet < 0)
526 throw new IllegalStateException();
527 removeEQ(array[lastRet]);
528 lastRet = -1;
529 }
530 }
531
532 }