ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/DelayQueue.java
Revision: 1.7
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 5 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.6: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 import java.util.concurrent.locks.Condition;
11 import java.util.concurrent.locks.ReentrantLock;
12 import java.util.*;
13
14 /**
15 * An unbounded {@linkplain BlockingQueue blocking queue} of
16 * {@code Delayed} elements, in which an element can only be taken
17 * when its delay has expired. The <em>head</em> of the queue is that
18 * {@code Delayed} element whose delay expired furthest in the
19 * past. If no delay has expired there is no head and {@code poll}
20 * will return {@code null}. Expiration occurs when an element's
21 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
22 * than or equal to zero. Even though unexpired elements cannot be
23 * removed using {@code take} or {@code poll}, they are otherwise
24 * treated as normal elements. For example, the {@code size} method
25 * returns the count of both expired and unexpired elements.
26 * This queue does not permit null elements.
27 *
28 * <p>This class and its iterator implement all of the
29 * <em>optional</em> methods of the {@link Collection} and {@link
30 * Iterator} interfaces. The Iterator provided in method {@link
31 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
32 * the DelayQueue in any particular order.
33 *
34 * <p>This class is a member of the
35 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
36 * Java Collections Framework</a>.
37 *
38 * @since 1.5
39 * @author Doug Lea
40 * @param <E> the type of elements held in this collection
41 */
42 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
43 implements BlockingQueue<E> {
44
45 private final transient ReentrantLock lock = new ReentrantLock();
46 private final PriorityQueue<E> q = new PriorityQueue<E>();
47
48 /**
49 * Thread designated to wait for the element at the head of
50 * the queue. This variant of the Leader-Follower pattern
51 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
52 * minimize unnecessary timed waiting. When a thread becomes
53 * the leader, it waits only for the next delay to elapse, but
54 * other threads await indefinitely. The leader thread must
55 * signal some other thread before returning from take() or
56 * poll(...), unless some other thread becomes leader in the
57 * interim. Whenever the head of the queue is replaced with
58 * an element with an earlier expiration time, the leader
59 * field is invalidated by being reset to null, and some
60 * waiting thread, but not necessarily the current leader, is
61 * signalled. So waiting threads must be prepared to acquire
62 * and lose leadership while waiting.
63 */
64 private Thread leader;
65
66 /**
67 * Condition signalled when a newer element becomes available
68 * at the head of the queue or a new thread may need to
69 * become leader.
70 */
71 private final Condition available = lock.newCondition();
72
73 /**
74 * Creates a new {@code DelayQueue} that is initially empty.
75 */
76 public DelayQueue() {}
77
78 /**
79 * Creates a {@code DelayQueue} initially containing the elements of the
80 * given collection of {@link Delayed} instances.
81 *
82 * @param c the collection of elements to initially contain
83 * @throws NullPointerException if the specified collection or any
84 * of its elements are null
85 */
86 public DelayQueue(Collection<? extends E> c) {
87 this.addAll(c);
88 }
89
90 /**
91 * Inserts the specified element into this delay queue.
92 *
93 * @param e the element to add
94 * @return {@code true} (as specified by {@link Collection#add})
95 * @throws NullPointerException if the specified element is null
96 */
97 public boolean add(E e) {
98 return offer(e);
99 }
100
101 /**
102 * Inserts the specified element into this delay queue.
103 *
104 * @param e the element to add
105 * @return {@code true}
106 * @throws NullPointerException if the specified element is null
107 */
108 public boolean offer(E e) {
109 final ReentrantLock lock = this.lock;
110 lock.lock();
111 try {
112 q.offer(e);
113 if (q.peek() == e) {
114 leader = null;
115 available.signal();
116 }
117 return true;
118 } finally {
119 lock.unlock();
120 }
121 }
122
123 /**
124 * Inserts the specified element into this delay queue. As the queue is
125 * unbounded this method will never block.
126 *
127 * @param e the element to add
128 * @throws NullPointerException {@inheritDoc}
129 */
130 public void put(E e) {
131 offer(e);
132 }
133
134 /**
135 * Inserts the specified element into this delay queue. As the queue is
136 * unbounded this method will never block.
137 *
138 * @param e the element to add
139 * @param timeout This parameter is ignored as the method never blocks
140 * @param unit This parameter is ignored as the method never blocks
141 * @return {@code true}
142 * @throws NullPointerException {@inheritDoc}
143 */
144 public boolean offer(E e, long timeout, TimeUnit unit) {
145 return offer(e);
146 }
147
148 /**
149 * Retrieves and removes the head of this queue, or returns {@code null}
150 * if this queue has no elements with an expired delay.
151 *
152 * @return the head of this queue, or {@code null} if this
153 * queue has no elements with an expired delay
154 */
155 public E poll() {
156 final ReentrantLock lock = this.lock;
157 lock.lock();
158 try {
159 E first = q.peek();
160 if (first == null || first.getDelay(NANOSECONDS) > 0)
161 return null;
162 else
163 return q.poll();
164 } finally {
165 lock.unlock();
166 }
167 }
168
169 /**
170 * Retrieves and removes the head of this queue, waiting if necessary
171 * until an element with an expired delay is available on this queue.
172 *
173 * @return the head of this queue
174 * @throws InterruptedException {@inheritDoc}
175 */
176 public E take() throws InterruptedException {
177 final ReentrantLock lock = this.lock;
178 lock.lockInterruptibly();
179 try {
180 for (;;) {
181 E first = q.peek();
182 if (first == null)
183 available.await();
184 else {
185 long delay = first.getDelay(NANOSECONDS);
186 if (delay <= 0)
187 return q.poll();
188 first = null; // don't retain ref while waiting
189 if (leader != null)
190 available.await();
191 else {
192 Thread thisThread = Thread.currentThread();
193 leader = thisThread;
194 try {
195 available.awaitNanos(delay);
196 } finally {
197 if (leader == thisThread)
198 leader = null;
199 }
200 }
201 }
202 }
203 } finally {
204 if (leader == null && q.peek() != null)
205 available.signal();
206 lock.unlock();
207 }
208 }
209
210 /**
211 * Retrieves and removes the head of this queue, waiting if necessary
212 * until an element with an expired delay is available on this queue,
213 * or the specified wait time expires.
214 *
215 * @return the head of this queue, or {@code null} if the
216 * specified waiting time elapses before an element with
217 * an expired delay becomes available
218 * @throws InterruptedException {@inheritDoc}
219 */
220 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
221 long nanos = unit.toNanos(timeout);
222 final ReentrantLock lock = this.lock;
223 lock.lockInterruptibly();
224 try {
225 for (;;) {
226 E first = q.peek();
227 if (first == null) {
228 if (nanos <= 0)
229 return null;
230 else
231 nanos = available.awaitNanos(nanos);
232 } else {
233 long delay = first.getDelay(NANOSECONDS);
234 if (delay <= 0)
235 return q.poll();
236 if (nanos <= 0)
237 return null;
238 first = null; // don't retain ref while waiting
239 if (nanos < delay || leader != null)
240 nanos = available.awaitNanos(nanos);
241 else {
242 Thread thisThread = Thread.currentThread();
243 leader = thisThread;
244 try {
245 long timeLeft = available.awaitNanos(delay);
246 nanos -= delay - timeLeft;
247 } finally {
248 if (leader == thisThread)
249 leader = null;
250 }
251 }
252 }
253 }
254 } finally {
255 if (leader == null && q.peek() != null)
256 available.signal();
257 lock.unlock();
258 }
259 }
260
261 /**
262 * Retrieves, but does not remove, the head of this queue, or
263 * returns {@code null} if this queue is empty. Unlike
264 * {@code poll}, if no expired elements are available in the queue,
265 * this method returns the element that will expire next,
266 * if one exists.
267 *
268 * @return the head of this queue, or {@code null} if this
269 * queue is empty
270 */
271 public E peek() {
272 final ReentrantLock lock = this.lock;
273 lock.lock();
274 try {
275 return q.peek();
276 } finally {
277 lock.unlock();
278 }
279 }
280
281 public int size() {
282 final ReentrantLock lock = this.lock;
283 lock.lock();
284 try {
285 return q.size();
286 } finally {
287 lock.unlock();
288 }
289 }
290
291 /**
292 * Returns first element only if it is expired.
293 * Used only by drainTo. Call only when holding lock.
294 */
295 private E peekExpired() {
296 // assert lock.isHeldByCurrentThread();
297 E first = q.peek();
298 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
299 null : first;
300 }
301
302 /**
303 * @throws UnsupportedOperationException {@inheritDoc}
304 * @throws ClassCastException {@inheritDoc}
305 * @throws NullPointerException {@inheritDoc}
306 * @throws IllegalArgumentException {@inheritDoc}
307 */
308 public int drainTo(Collection<? super E> c) {
309 if (c == null)
310 throw new NullPointerException();
311 if (c == this)
312 throw new IllegalArgumentException();
313 final ReentrantLock lock = this.lock;
314 lock.lock();
315 try {
316 int n = 0;
317 for (E e; (e = peekExpired()) != null;) {
318 c.add(e); // In this order, in case add() throws.
319 q.poll();
320 ++n;
321 }
322 return n;
323 } finally {
324 lock.unlock();
325 }
326 }
327
328 /**
329 * @throws UnsupportedOperationException {@inheritDoc}
330 * @throws ClassCastException {@inheritDoc}
331 * @throws NullPointerException {@inheritDoc}
332 * @throws IllegalArgumentException {@inheritDoc}
333 */
334 public int drainTo(Collection<? super E> c, int maxElements) {
335 if (c == null)
336 throw new NullPointerException();
337 if (c == this)
338 throw new IllegalArgumentException();
339 if (maxElements <= 0)
340 return 0;
341 final ReentrantLock lock = this.lock;
342 lock.lock();
343 try {
344 int n = 0;
345 for (E e; n < maxElements && (e = peekExpired()) != null;) {
346 c.add(e); // In this order, in case add() throws.
347 q.poll();
348 ++n;
349 }
350 return n;
351 } finally {
352 lock.unlock();
353 }
354 }
355
356 /**
357 * Atomically removes all of the elements from this delay queue.
358 * The queue will be empty after this call returns.
359 * Elements with an unexpired delay are not waited for; they are
360 * simply discarded from the queue.
361 */
362 public void clear() {
363 final ReentrantLock lock = this.lock;
364 lock.lock();
365 try {
366 q.clear();
367 } finally {
368 lock.unlock();
369 }
370 }
371
372 /**
373 * Always returns {@code Integer.MAX_VALUE} because
374 * a {@code DelayQueue} is not capacity constrained.
375 *
376 * @return {@code Integer.MAX_VALUE}
377 */
378 public int remainingCapacity() {
379 return Integer.MAX_VALUE;
380 }
381
382 /**
383 * Returns an array containing all of the elements in this queue.
384 * The returned array elements are in no particular order.
385 *
386 * <p>The returned array will be "safe" in that no references to it are
387 * maintained by this queue. (In other words, this method must allocate
388 * a new array). The caller is thus free to modify the returned array.
389 *
390 * <p>This method acts as bridge between array-based and collection-based
391 * APIs.
392 *
393 * @return an array containing all of the elements in this queue
394 */
395 public Object[] toArray() {
396 final ReentrantLock lock = this.lock;
397 lock.lock();
398 try {
399 return q.toArray();
400 } finally {
401 lock.unlock();
402 }
403 }
404
405 /**
406 * Returns an array containing all of the elements in this queue; the
407 * runtime type of the returned array is that of the specified array.
408 * The returned array elements are in no particular order.
409 * If the queue fits in the specified array, it is returned therein.
410 * Otherwise, a new array is allocated with the runtime type of the
411 * specified array and the size of this queue.
412 *
413 * <p>If this queue fits in the specified array with room to spare
414 * (i.e., the array has more elements than this queue), the element in
415 * the array immediately following the end of the queue is set to
416 * {@code null}.
417 *
418 * <p>Like the {@link #toArray()} method, this method acts as bridge between
419 * array-based and collection-based APIs. Further, this method allows
420 * precise control over the runtime type of the output array, and may,
421 * under certain circumstances, be used to save allocation costs.
422 *
423 * <p>The following code can be used to dump a delay queue into a newly
424 * allocated array of {@code Delayed}:
425 *
426 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
427 *
428 * Note that {@code toArray(new Object[0])} is identical in function to
429 * {@code toArray()}.
430 *
431 * @param a the array into which the elements of the queue are to
432 * be stored, if it is big enough; otherwise, a new array of the
433 * same runtime type is allocated for this purpose
434 * @return an array containing all of the elements in this queue
435 * @throws ArrayStoreException if the runtime type of the specified array
436 * is not a supertype of the runtime type of every element in
437 * this queue
438 * @throws NullPointerException if the specified array is null
439 */
440 public <T> T[] toArray(T[] a) {
441 final ReentrantLock lock = this.lock;
442 lock.lock();
443 try {
444 return q.toArray(a);
445 } finally {
446 lock.unlock();
447 }
448 }
449
450 /**
451 * Removes a single instance of the specified element from this
452 * queue, if it is present, whether or not it has expired.
453 */
454 public boolean remove(Object o) {
455 final ReentrantLock lock = this.lock;
456 lock.lock();
457 try {
458 return q.remove(o);
459 } finally {
460 lock.unlock();
461 }
462 }
463
464 /**
465 * Identity-based version for use in Itr.remove
466 */
467 void removeEQ(Object o) {
468 final ReentrantLock lock = this.lock;
469 lock.lock();
470 try {
471 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
472 if (o == it.next()) {
473 it.remove();
474 break;
475 }
476 }
477 } finally {
478 lock.unlock();
479 }
480 }
481
482 /**
483 * Returns an iterator over all the elements (both expired and
484 * unexpired) in this queue. The iterator does not return the
485 * elements in any particular order.
486 *
487 * <p>The returned iterator is a "weakly consistent" iterator that
488 * will never throw {@link java.util.ConcurrentModificationException
489 * ConcurrentModificationException}, and guarantees to traverse
490 * elements as they existed upon construction of the iterator, and
491 * may (but is not guaranteed to) reflect any modifications
492 * subsequent to construction.
493 *
494 * @return an iterator over the elements in this queue
495 */
496 public Iterator<E> iterator() {
497 return new Itr(toArray());
498 }
499
500 /**
501 * Snapshot iterator that works off copy of underlying q array.
502 */
503 private class Itr implements Iterator<E> {
504 final Object[] array; // Array of all elements
505 int cursor; // index of next element to return
506 int lastRet; // index of last element, or -1 if no such
507
508 Itr(Object[] array) {
509 lastRet = -1;
510 this.array = array;
511 }
512
513 public boolean hasNext() {
514 return cursor < array.length;
515 }
516
517 @SuppressWarnings("unchecked")
518 public E next() {
519 if (cursor >= array.length)
520 throw new NoSuchElementException();
521 lastRet = cursor;
522 return (E)array[cursor++];
523 }
524
525 public void remove() {
526 if (lastRet < 0)
527 throw new IllegalStateException();
528 removeEQ(array[lastRet]);
529 lastRet = -1;
530 }
531 }
532
533 }