ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.75
Committed: Thu Dec 29 22:37:31 2016 UTC (7 years, 5 months ago) by jsr166
Branch: MAIN
Changes since 1.74: +5 -5 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10
11 import java.util.AbstractQueue;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Objects;
16 import java.util.PriorityQueue;
17 import java.util.concurrent.locks.Condition;
18 import java.util.concurrent.locks.ReentrantLock;
19
20 /**
21 * An unbounded {@linkplain BlockingQueue blocking queue} of
22 * {@code Delayed} elements, in which an element can only be taken
23 * when its delay has expired. The <em>head</em> of the queue is that
24 * {@code Delayed} element whose delay expired furthest in the
25 * past. If no delay has expired there is no head and {@code poll}
26 * will return {@code null}. Expiration occurs when an element's
27 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
28 * than or equal to zero. Even though unexpired elements cannot be
29 * removed using {@code take} or {@code poll}, they are otherwise
30 * treated as normal elements. For example, the {@code size} method
31 * returns the count of both expired and unexpired elements.
32 * This queue does not permit null elements.
33 *
34 * <p>This class and its iterator implement all of the <em>optional</em>
35 * methods of the {@link Collection} and {@link Iterator} interfaces.
36 * The Iterator provided in method {@link #iterator()} is <em>not</em>
37 * guaranteed to traverse the elements of the DelayQueue in any
38 * particular order.
39 *
40 * <p>This class is a member of the
41 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42 * Java Collections Framework</a>.
43 *
44 * @since 1.5
45 * @author Doug Lea
46 * @param <E> the type of elements held in this queue
47 */
48 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
49 implements BlockingQueue<E> {
50
51 private final transient ReentrantLock lock = new ReentrantLock();
52 private final PriorityQueue<E> q = new PriorityQueue<E>();
53
54 /**
55 * Thread designated to wait for the element at the head of
56 * the queue. This variant of the Leader-Follower pattern
57 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
58 * minimize unnecessary timed waiting. When a thread becomes
59 * the leader, it waits only for the next delay to elapse, but
60 * other threads await indefinitely. The leader thread must
61 * signal some other thread before returning from take() or
62 * poll(...), unless some other thread becomes leader in the
63 * interim. Whenever the head of the queue is replaced with
64 * an element with an earlier expiration time, the leader
65 * field is invalidated by being reset to null, and some
66 * waiting thread, but not necessarily the current leader, is
67 * signalled. So waiting threads must be prepared to acquire
68 * and lose leadership while waiting.
69 */
70 private Thread leader;
71
72 /**
73 * Condition signalled when a newer element becomes available
74 * at the head of the queue or a new thread may need to
75 * become leader.
76 */
77 private final Condition available = lock.newCondition();
78
79 /**
80 * Creates a new {@code DelayQueue} that is initially empty.
81 */
82 public DelayQueue() {}
83
84 /**
85 * Creates a {@code DelayQueue} initially containing the elements of the
86 * given collection of {@link Delayed} instances.
87 *
88 * @param c the collection of elements to initially contain
89 * @throws NullPointerException if the specified collection or any
90 * of its elements are null
91 */
92 public DelayQueue(Collection<? extends E> c) {
93 this.addAll(c);
94 }
95
96 /**
97 * Inserts the specified element into this delay queue.
98 *
99 * @param e the element to add
100 * @return {@code true} (as specified by {@link Collection#add})
101 * @throws NullPointerException if the specified element is null
102 */
103 public boolean add(E e) {
104 return offer(e);
105 }
106
107 /**
108 * Inserts the specified element into this delay queue.
109 *
110 * @param e the element to add
111 * @return {@code true}
112 * @throws NullPointerException if the specified element is null
113 */
114 public boolean offer(E e) {
115 final ReentrantLock lock = this.lock;
116 lock.lock();
117 try {
118 q.offer(e);
119 if (q.peek() == e) {
120 leader = null;
121 available.signal();
122 }
123 return true;
124 } finally {
125 lock.unlock();
126 }
127 }
128
129 /**
130 * Inserts the specified element into this delay queue. As the queue is
131 * unbounded this method will never block.
132 *
133 * @param e the element to add
134 * @throws NullPointerException {@inheritDoc}
135 */
136 public void put(E e) {
137 offer(e);
138 }
139
140 /**
141 * Inserts the specified element into this delay queue. As the queue is
142 * unbounded this method will never block.
143 *
144 * @param e the element to add
145 * @param timeout This parameter is ignored as the method never blocks
146 * @param unit This parameter is ignored as the method never blocks
147 * @return {@code true}
148 * @throws NullPointerException {@inheritDoc}
149 */
150 public boolean offer(E e, long timeout, TimeUnit unit) {
151 return offer(e);
152 }
153
154 /**
155 * Retrieves and removes the head of this queue, or returns {@code null}
156 * if this queue has no elements with an expired delay.
157 *
158 * @return the head of this queue, or {@code null} if this
159 * queue has no elements with an expired delay
160 */
161 public E poll() {
162 final ReentrantLock lock = this.lock;
163 lock.lock();
164 try {
165 E first = q.peek();
166 return (first == null || first.getDelay(NANOSECONDS) > 0)
167 ? null
168 : q.poll();
169 } finally {
170 lock.unlock();
171 }
172 }
173
174 /**
175 * Retrieves and removes the head of this queue, waiting if necessary
176 * until an element with an expired delay is available on this queue.
177 *
178 * @return the head of this queue
179 * @throws InterruptedException {@inheritDoc}
180 */
181 public E take() throws InterruptedException {
182 final ReentrantLock lock = this.lock;
183 lock.lockInterruptibly();
184 try {
185 for (;;) {
186 E first = q.peek();
187 if (first == null)
188 available.await();
189 else {
190 long delay = first.getDelay(NANOSECONDS);
191 if (delay <= 0L)
192 return q.poll();
193 first = null; // don't retain ref while waiting
194 if (leader != null)
195 available.await();
196 else {
197 Thread thisThread = Thread.currentThread();
198 leader = thisThread;
199 try {
200 available.awaitNanos(delay);
201 } finally {
202 if (leader == thisThread)
203 leader = null;
204 }
205 }
206 }
207 }
208 } finally {
209 if (leader == null && q.peek() != null)
210 available.signal();
211 lock.unlock();
212 }
213 }
214
215 /**
216 * Retrieves and removes the head of this queue, waiting if necessary
217 * until an element with an expired delay is available on this queue,
218 * or the specified wait time expires.
219 *
220 * @return the head of this queue, or {@code null} if the
221 * specified waiting time elapses before an element with
222 * an expired delay becomes available
223 * @throws InterruptedException {@inheritDoc}
224 */
225 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
226 long nanos = unit.toNanos(timeout);
227 final ReentrantLock lock = this.lock;
228 lock.lockInterruptibly();
229 try {
230 for (;;) {
231 E first = q.peek();
232 if (first == null) {
233 if (nanos <= 0L)
234 return null;
235 else
236 nanos = available.awaitNanos(nanos);
237 } else {
238 long delay = first.getDelay(NANOSECONDS);
239 if (delay <= 0L)
240 return q.poll();
241 if (nanos <= 0L)
242 return null;
243 first = null; // don't retain ref while waiting
244 if (nanos < delay || leader != null)
245 nanos = available.awaitNanos(nanos);
246 else {
247 Thread thisThread = Thread.currentThread();
248 leader = thisThread;
249 try {
250 long timeLeft = available.awaitNanos(delay);
251 nanos -= delay - timeLeft;
252 } finally {
253 if (leader == thisThread)
254 leader = null;
255 }
256 }
257 }
258 }
259 } finally {
260 if (leader == null && q.peek() != null)
261 available.signal();
262 lock.unlock();
263 }
264 }
265
266 /**
267 * Retrieves, but does not remove, the head of this queue, or
268 * returns {@code null} if this queue is empty. Unlike
269 * {@code poll}, if no expired elements are available in the queue,
270 * this method returns the element that will expire next,
271 * if one exists.
272 *
273 * @return the head of this queue, or {@code null} if this
274 * queue is empty
275 */
276 public E peek() {
277 final ReentrantLock lock = this.lock;
278 lock.lock();
279 try {
280 return q.peek();
281 } finally {
282 lock.unlock();
283 }
284 }
285
286 public int size() {
287 final ReentrantLock lock = this.lock;
288 lock.lock();
289 try {
290 return q.size();
291 } finally {
292 lock.unlock();
293 }
294 }
295
296 /**
297 * Returns first element only if it is expired.
298 * Used only by drainTo. Call only when holding lock.
299 */
300 private E peekExpired() {
301 // assert lock.isHeldByCurrentThread();
302 E first = q.peek();
303 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
304 null : first;
305 }
306
307 /**
308 * @throws UnsupportedOperationException {@inheritDoc}
309 * @throws ClassCastException {@inheritDoc}
310 * @throws NullPointerException {@inheritDoc}
311 * @throws IllegalArgumentException {@inheritDoc}
312 */
313 public int drainTo(Collection<? super E> c) {
314 Objects.requireNonNull(c);
315 if (c == this)
316 throw new IllegalArgumentException();
317 final ReentrantLock lock = this.lock;
318 lock.lock();
319 try {
320 int n = 0;
321 for (E e; (e = peekExpired()) != null;) {
322 c.add(e); // In this order, in case add() throws.
323 q.poll();
324 ++n;
325 }
326 return n;
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 /**
333 * @throws UnsupportedOperationException {@inheritDoc}
334 * @throws ClassCastException {@inheritDoc}
335 * @throws NullPointerException {@inheritDoc}
336 * @throws IllegalArgumentException {@inheritDoc}
337 */
338 public int drainTo(Collection<? super E> c, int maxElements) {
339 Objects.requireNonNull(c);
340 if (c == this)
341 throw new IllegalArgumentException();
342 if (maxElements <= 0)
343 return 0;
344 final ReentrantLock lock = this.lock;
345 lock.lock();
346 try {
347 int n = 0;
348 for (E e; n < maxElements && (e = peekExpired()) != null;) {
349 c.add(e); // In this order, in case add() throws.
350 q.poll();
351 ++n;
352 }
353 return n;
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 /**
360 * Atomically removes all of the elements from this delay queue.
361 * The queue will be empty after this call returns.
362 * Elements with an unexpired delay are not waited for; they are
363 * simply discarded from the queue.
364 */
365 public void clear() {
366 final ReentrantLock lock = this.lock;
367 lock.lock();
368 try {
369 q.clear();
370 } finally {
371 lock.unlock();
372 }
373 }
374
375 /**
376 * Always returns {@code Integer.MAX_VALUE} because
377 * a {@code DelayQueue} is not capacity constrained.
378 *
379 * @return {@code Integer.MAX_VALUE}
380 */
381 public int remainingCapacity() {
382 return Integer.MAX_VALUE;
383 }
384
385 /**
386 * Returns an array containing all of the elements in this queue.
387 * The returned array elements are in no particular order.
388 *
389 * <p>The returned array will be "safe" in that no references to it are
390 * maintained by this queue. (In other words, this method must allocate
391 * a new array). The caller is thus free to modify the returned array.
392 *
393 * <p>This method acts as bridge between array-based and collection-based
394 * APIs.
395 *
396 * @return an array containing all of the elements in this queue
397 */
398 public Object[] toArray() {
399 final ReentrantLock lock = this.lock;
400 lock.lock();
401 try {
402 return q.toArray();
403 } finally {
404 lock.unlock();
405 }
406 }
407
408 /**
409 * Returns an array containing all of the elements in this queue; the
410 * runtime type of the returned array is that of the specified array.
411 * The returned array elements are in no particular order.
412 * If the queue fits in the specified array, it is returned therein.
413 * Otherwise, a new array is allocated with the runtime type of the
414 * specified array and the size of this queue.
415 *
416 * <p>If this queue fits in the specified array with room to spare
417 * (i.e., the array has more elements than this queue), the element in
418 * the array immediately following the end of the queue is set to
419 * {@code null}.
420 *
421 * <p>Like the {@link #toArray()} method, this method acts as bridge between
422 * array-based and collection-based APIs. Further, this method allows
423 * precise control over the runtime type of the output array, and may,
424 * under certain circumstances, be used to save allocation costs.
425 *
426 * <p>The following code can be used to dump a delay queue into a newly
427 * allocated array of {@code Delayed}:
428 *
429 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
430 *
431 * Note that {@code toArray(new Object[0])} is identical in function to
432 * {@code toArray()}.
433 *
434 * @param a the array into which the elements of the queue are to
435 * be stored, if it is big enough; otherwise, a new array of the
436 * same runtime type is allocated for this purpose
437 * @return an array containing all of the elements in this queue
438 * @throws ArrayStoreException if the runtime type of the specified array
439 * is not a supertype of the runtime type of every element in
440 * this queue
441 * @throws NullPointerException if the specified array is null
442 */
443 public <T> T[] toArray(T[] a) {
444 final ReentrantLock lock = this.lock;
445 lock.lock();
446 try {
447 return q.toArray(a);
448 } finally {
449 lock.unlock();
450 }
451 }
452
453 /**
454 * Removes a single instance of the specified element from this
455 * queue, if it is present, whether or not it has expired.
456 */
457 public boolean remove(Object o) {
458 final ReentrantLock lock = this.lock;
459 lock.lock();
460 try {
461 return q.remove(o);
462 } finally {
463 lock.unlock();
464 }
465 }
466
467 /**
468 * Identity-based version for use in Itr.remove.
469 */
470 void removeEQ(Object o) {
471 final ReentrantLock lock = this.lock;
472 lock.lock();
473 try {
474 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
475 if (o == it.next()) {
476 it.remove();
477 break;
478 }
479 }
480 } finally {
481 lock.unlock();
482 }
483 }
484
485 /**
486 * Returns an iterator over all the elements (both expired and
487 * unexpired) in this queue. The iterator does not return the
488 * elements in any particular order.
489 *
490 * <p>The returned iterator is
491 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
492 *
493 * @return an iterator over the elements in this queue
494 */
495 public Iterator<E> iterator() {
496 return new Itr(toArray());
497 }
498
499 /**
500 * Snapshot iterator that works off copy of underlying q array.
501 */
502 private class Itr implements Iterator<E> {
503 final Object[] array; // Array of all elements
504 int cursor; // index of next element to return
505 int lastRet; // index of last element, or -1 if no such
506
507 Itr(Object[] array) {
508 lastRet = -1;
509 this.array = array;
510 }
511
512 public boolean hasNext() {
513 return cursor < array.length;
514 }
515
516 @SuppressWarnings("unchecked")
517 public E next() {
518 if (cursor >= array.length)
519 throw new NoSuchElementException();
520 return (E)array[lastRet = cursor++];
521 }
522
523 public void remove() {
524 if (lastRet < 0)
525 throw new IllegalStateException();
526 removeEQ(array[lastRet]);
527 lastRet = -1;
528 }
529 }
530
531 }