ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/DelayQueue.java
Revision: 1.1
Committed: Sat Mar 26 06:22:50 2016 UTC (8 years, 1 month ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Log Message:
fork jdk8 maintenance branch for source and jtreg tests

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10
11 import java.util.AbstractQueue;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.PriorityQueue;
16 import java.util.concurrent.locks.Condition;
17 import java.util.concurrent.locks.ReentrantLock;
18
19 /**
20 * An unbounded {@linkplain BlockingQueue blocking queue} of
21 * {@code Delayed} elements, in which an element can only be taken
22 * when its delay has expired. The <em>head</em> of the queue is that
23 * {@code Delayed} element whose delay expired furthest in the
24 * past. If no delay has expired there is no head and {@code poll}
25 * will return {@code null}. Expiration occurs when an element's
26 * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
27 * than or equal to zero. Even though unexpired elements cannot be
28 * removed using {@code take} or {@code poll}, they are otherwise
29 * treated as normal elements. For example, the {@code size} method
30 * returns the count of both expired and unexpired elements.
31 * This queue does not permit null elements.
32 *
33 * <p>This class and its iterator implement all of the
34 * <em>optional</em> methods of the {@link Collection} and {@link
35 * Iterator} interfaces. The Iterator provided in method {@link
36 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
37 * the DelayQueue in any particular order.
38 *
39 * <p>This class is a member of the
40 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
41 * Java Collections Framework</a>.
42 *
43 * @since 1.5
44 * @author Doug Lea
45 * @param <E> the type of elements held in this queue
46 */
47 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
48 implements BlockingQueue<E> {
49
50 private final transient ReentrantLock lock = new ReentrantLock();
51 private final PriorityQueue<E> q = new PriorityQueue<E>();
52
53 /**
54 * Thread designated to wait for the element at the head of
55 * the queue. This variant of the Leader-Follower pattern
56 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
57 * minimize unnecessary timed waiting. When a thread becomes
58 * the leader, it waits only for the next delay to elapse, but
59 * other threads await indefinitely. The leader thread must
60 * signal some other thread before returning from take() or
61 * poll(...), unless some other thread becomes leader in the
62 * interim. Whenever the head of the queue is replaced with
63 * an element with an earlier expiration time, the leader
64 * field is invalidated by being reset to null, and some
65 * waiting thread, but not necessarily the current leader, is
66 * signalled. So waiting threads must be prepared to acquire
67 * and lose leadership while waiting.
68 */
69 private Thread leader;
70
71 /**
72 * Condition signalled when a newer element becomes available
73 * at the head of the queue or a new thread may need to
74 * become leader.
75 */
76 private final Condition available = lock.newCondition();
77
78 /**
79 * Creates a new {@code DelayQueue} that is initially empty.
80 */
81 public DelayQueue() {}
82
83 /**
84 * Creates a {@code DelayQueue} initially containing the elements of the
85 * given collection of {@link Delayed} instances.
86 *
87 * @param c the collection of elements to initially contain
88 * @throws NullPointerException if the specified collection or any
89 * of its elements are null
90 */
91 public DelayQueue(Collection<? extends E> c) {
92 this.addAll(c);
93 }
94
95 /**
96 * Inserts the specified element into this delay queue.
97 *
98 * @param e the element to add
99 * @return {@code true} (as specified by {@link Collection#add})
100 * @throws NullPointerException if the specified element is null
101 */
102 public boolean add(E e) {
103 return offer(e);
104 }
105
106 /**
107 * Inserts the specified element into this delay queue.
108 *
109 * @param e the element to add
110 * @return {@code true}
111 * @throws NullPointerException if the specified element is null
112 */
113 public boolean offer(E e) {
114 final ReentrantLock lock = this.lock;
115 lock.lock();
116 try {
117 q.offer(e);
118 if (q.peek() == e) {
119 leader = null;
120 available.signal();
121 }
122 return true;
123 } finally {
124 lock.unlock();
125 }
126 }
127
128 /**
129 * Inserts the specified element into this delay queue. As the queue is
130 * unbounded this method will never block.
131 *
132 * @param e the element to add
133 * @throws NullPointerException {@inheritDoc}
134 */
135 public void put(E e) {
136 offer(e);
137 }
138
139 /**
140 * Inserts the specified element into this delay queue. As the queue is
141 * unbounded this method will never block.
142 *
143 * @param e the element to add
144 * @param timeout This parameter is ignored as the method never blocks
145 * @param unit This parameter is ignored as the method never blocks
146 * @return {@code true}
147 * @throws NullPointerException {@inheritDoc}
148 */
149 public boolean offer(E e, long timeout, TimeUnit unit) {
150 return offer(e);
151 }
152
153 /**
154 * Retrieves and removes the head of this queue, or returns {@code null}
155 * if this queue has no elements with an expired delay.
156 *
157 * @return the head of this queue, or {@code null} if this
158 * queue has no elements with an expired delay
159 */
160 public E poll() {
161 final ReentrantLock lock = this.lock;
162 lock.lock();
163 try {
164 E first = q.peek();
165 return (first == null || first.getDelay(NANOSECONDS) > 0)
166 ? null
167 : q.poll();
168 } finally {
169 lock.unlock();
170 }
171 }
172
173 /**
174 * Retrieves and removes the head of this queue, waiting if necessary
175 * until an element with an expired delay is available on this queue.
176 *
177 * @return the head of this queue
178 * @throws InterruptedException {@inheritDoc}
179 */
180 public E take() throws InterruptedException {
181 final ReentrantLock lock = this.lock;
182 lock.lockInterruptibly();
183 try {
184 for (;;) {
185 E first = q.peek();
186 if (first == null)
187 available.await();
188 else {
189 long delay = first.getDelay(NANOSECONDS);
190 if (delay <= 0L)
191 return q.poll();
192 first = null; // don't retain ref while waiting
193 if (leader != null)
194 available.await();
195 else {
196 Thread thisThread = Thread.currentThread();
197 leader = thisThread;
198 try {
199 available.awaitNanos(delay);
200 } finally {
201 if (leader == thisThread)
202 leader = null;
203 }
204 }
205 }
206 }
207 } finally {
208 if (leader == null && q.peek() != null)
209 available.signal();
210 lock.unlock();
211 }
212 }
213
214 /**
215 * Retrieves and removes the head of this queue, waiting if necessary
216 * until an element with an expired delay is available on this queue,
217 * or the specified wait time expires.
218 *
219 * @return the head of this queue, or {@code null} if the
220 * specified waiting time elapses before an element with
221 * an expired delay becomes available
222 * @throws InterruptedException {@inheritDoc}
223 */
224 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
225 long nanos = unit.toNanos(timeout);
226 final ReentrantLock lock = this.lock;
227 lock.lockInterruptibly();
228 try {
229 for (;;) {
230 E first = q.peek();
231 if (first == null) {
232 if (nanos <= 0L)
233 return null;
234 else
235 nanos = available.awaitNanos(nanos);
236 } else {
237 long delay = first.getDelay(NANOSECONDS);
238 if (delay <= 0L)
239 return q.poll();
240 if (nanos <= 0L)
241 return null;
242 first = null; // don't retain ref while waiting
243 if (nanos < delay || leader != null)
244 nanos = available.awaitNanos(nanos);
245 else {
246 Thread thisThread = Thread.currentThread();
247 leader = thisThread;
248 try {
249 long timeLeft = available.awaitNanos(delay);
250 nanos -= delay - timeLeft;
251 } finally {
252 if (leader == thisThread)
253 leader = null;
254 }
255 }
256 }
257 }
258 } finally {
259 if (leader == null && q.peek() != null)
260 available.signal();
261 lock.unlock();
262 }
263 }
264
265 /**
266 * Retrieves, but does not remove, the head of this queue, or
267 * returns {@code null} if this queue is empty. Unlike
268 * {@code poll}, if no expired elements are available in the queue,
269 * this method returns the element that will expire next,
270 * if one exists.
271 *
272 * @return the head of this queue, or {@code null} if this
273 * queue is empty
274 */
275 public E peek() {
276 final ReentrantLock lock = this.lock;
277 lock.lock();
278 try {
279 return q.peek();
280 } finally {
281 lock.unlock();
282 }
283 }
284
285 public int size() {
286 final ReentrantLock lock = this.lock;
287 lock.lock();
288 try {
289 return q.size();
290 } finally {
291 lock.unlock();
292 }
293 }
294
295 /**
296 * Returns first element only if it is expired.
297 * Used only by drainTo. Call only when holding lock.
298 */
299 private E peekExpired() {
300 // assert lock.isHeldByCurrentThread();
301 E first = q.peek();
302 return (first == null || first.getDelay(NANOSECONDS) > 0) ?
303 null : first;
304 }
305
306 /**
307 * @throws UnsupportedOperationException {@inheritDoc}
308 * @throws ClassCastException {@inheritDoc}
309 * @throws NullPointerException {@inheritDoc}
310 * @throws IllegalArgumentException {@inheritDoc}
311 */
312 public int drainTo(Collection<? super E> c) {
313 if (c == null)
314 throw new NullPointerException();
315 if (c == this)
316 throw new IllegalArgumentException();
317 final ReentrantLock lock = this.lock;
318 lock.lock();
319 try {
320 int n = 0;
321 for (E e; (e = peekExpired()) != null;) {
322 c.add(e); // In this order, in case add() throws.
323 q.poll();
324 ++n;
325 }
326 return n;
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 /**
333 * @throws UnsupportedOperationException {@inheritDoc}
334 * @throws ClassCastException {@inheritDoc}
335 * @throws NullPointerException {@inheritDoc}
336 * @throws IllegalArgumentException {@inheritDoc}
337 */
338 public int drainTo(Collection<? super E> c, int maxElements) {
339 if (c == null)
340 throw new NullPointerException();
341 if (c == this)
342 throw new IllegalArgumentException();
343 if (maxElements <= 0)
344 return 0;
345 final ReentrantLock lock = this.lock;
346 lock.lock();
347 try {
348 int n = 0;
349 for (E e; n < maxElements && (e = peekExpired()) != null;) {
350 c.add(e); // In this order, in case add() throws.
351 q.poll();
352 ++n;
353 }
354 return n;
355 } finally {
356 lock.unlock();
357 }
358 }
359
360 /**
361 * Atomically removes all of the elements from this delay queue.
362 * The queue will be empty after this call returns.
363 * Elements with an unexpired delay are not waited for; they are
364 * simply discarded from the queue.
365 */
366 public void clear() {
367 final ReentrantLock lock = this.lock;
368 lock.lock();
369 try {
370 q.clear();
371 } finally {
372 lock.unlock();
373 }
374 }
375
376 /**
377 * Always returns {@code Integer.MAX_VALUE} because
378 * a {@code DelayQueue} is not capacity constrained.
379 *
380 * @return {@code Integer.MAX_VALUE}
381 */
382 public int remainingCapacity() {
383 return Integer.MAX_VALUE;
384 }
385
386 /**
387 * Returns an array containing all of the elements in this queue.
388 * The returned array elements are in no particular order.
389 *
390 * <p>The returned array will be "safe" in that no references to it are
391 * maintained by this queue. (In other words, this method must allocate
392 * a new array). The caller is thus free to modify the returned array.
393 *
394 * <p>This method acts as bridge between array-based and collection-based
395 * APIs.
396 *
397 * @return an array containing all of the elements in this queue
398 */
399 public Object[] toArray() {
400 final ReentrantLock lock = this.lock;
401 lock.lock();
402 try {
403 return q.toArray();
404 } finally {
405 lock.unlock();
406 }
407 }
408
409 /**
410 * Returns an array containing all of the elements in this queue; the
411 * runtime type of the returned array is that of the specified array.
412 * The returned array elements are in no particular order.
413 * If the queue fits in the specified array, it is returned therein.
414 * Otherwise, a new array is allocated with the runtime type of the
415 * specified array and the size of this queue.
416 *
417 * <p>If this queue fits in the specified array with room to spare
418 * (i.e., the array has more elements than this queue), the element in
419 * the array immediately following the end of the queue is set to
420 * {@code null}.
421 *
422 * <p>Like the {@link #toArray()} method, this method acts as bridge between
423 * array-based and collection-based APIs. Further, this method allows
424 * precise control over the runtime type of the output array, and may,
425 * under certain circumstances, be used to save allocation costs.
426 *
427 * <p>The following code can be used to dump a delay queue into a newly
428 * allocated array of {@code Delayed}:
429 *
430 * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
431 *
432 * Note that {@code toArray(new Object[0])} is identical in function to
433 * {@code toArray()}.
434 *
435 * @param a the array into which the elements of the queue are to
436 * be stored, if it is big enough; otherwise, a new array of the
437 * same runtime type is allocated for this purpose
438 * @return an array containing all of the elements in this queue
439 * @throws ArrayStoreException if the runtime type of the specified array
440 * is not a supertype of the runtime type of every element in
441 * this queue
442 * @throws NullPointerException if the specified array is null
443 */
444 public <T> T[] toArray(T[] a) {
445 final ReentrantLock lock = this.lock;
446 lock.lock();
447 try {
448 return q.toArray(a);
449 } finally {
450 lock.unlock();
451 }
452 }
453
454 /**
455 * Removes a single instance of the specified element from this
456 * queue, if it is present, whether or not it has expired.
457 */
458 public boolean remove(Object o) {
459 final ReentrantLock lock = this.lock;
460 lock.lock();
461 try {
462 return q.remove(o);
463 } finally {
464 lock.unlock();
465 }
466 }
467
468 /**
469 * Identity-based version for use in Itr.remove.
470 */
471 void removeEQ(Object o) {
472 final ReentrantLock lock = this.lock;
473 lock.lock();
474 try {
475 for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
476 if (o == it.next()) {
477 it.remove();
478 break;
479 }
480 }
481 } finally {
482 lock.unlock();
483 }
484 }
485
486 /**
487 * Returns an iterator over all the elements (both expired and
488 * unexpired) in this queue. The iterator does not return the
489 * elements in any particular order.
490 *
491 * <p>The returned iterator is
492 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
493 *
494 * @return an iterator over the elements in this queue
495 */
496 public Iterator<E> iterator() {
497 return new Itr(toArray());
498 }
499
500 /**
501 * Snapshot iterator that works off copy of underlying q array.
502 */
503 private class Itr implements Iterator<E> {
504 final Object[] array; // Array of all elements
505 int cursor; // index of next element to return
506 int lastRet; // index of last element, or -1 if no such
507
508 Itr(Object[] array) {
509 lastRet = -1;
510 this.array = array;
511 }
512
513 public boolean hasNext() {
514 return cursor < array.length;
515 }
516
517 @SuppressWarnings("unchecked")
518 public E next() {
519 if (cursor >= array.length)
520 throw new NoSuchElementException();
521 lastRet = cursor;
522 return (E)array[cursor++];
523 }
524
525 public void remove() {
526 if (lastRet < 0)
527 throw new IllegalStateException();
528 removeEQ(array[lastRet]);
529 lastRet = -1;
530 }
531 }
532
533 }