ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/DelayQueue.java
Revision: 1.1
Committed: Sat Mar 26 06:22:50 2016 UTC (8 years, 1 month ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Log Message:
fork jdk8 maintenance branch for source and jtreg tests

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import static java.util.concurrent.TimeUnit.NANOSECONDS;
10    
11     import java.util.AbstractQueue;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15     import java.util.PriorityQueue;
16     import java.util.concurrent.locks.Condition;
17     import java.util.concurrent.locks.ReentrantLock;
18    
19     /**
20     * An unbounded {@linkplain BlockingQueue blocking queue} of
21     * {@code Delayed} elements, in which an element can only be taken
22     * when its delay has expired. The <em>head</em> of the queue is that
23     * {@code Delayed} element whose delay expired furthest in the
24     * past. If no delay has expired there is no head and {@code poll}
25     * will return {@code null}. Expiration occurs when an element's
26     * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less
27     * than or equal to zero. Even though unexpired elements cannot be
28     * removed using {@code take} or {@code poll}, they are otherwise
29     * treated as normal elements. For example, the {@code size} method
30     * returns the count of both expired and unexpired elements.
31     * This queue does not permit null elements.
32     *
33     * <p>This class and its iterator implement all of the
34     * <em>optional</em> methods of the {@link Collection} and {@link
35     * Iterator} interfaces. The Iterator provided in method {@link
36     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
37     * the DelayQueue in any particular order.
38     *
39     * <p>This class is a member of the
40     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
41     * Java Collections Framework</a>.
42     *
43     * @since 1.5
44     * @author Doug Lea
45     * @param <E> the type of elements held in this queue
46     */
47     public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
48     implements BlockingQueue<E> {
49    
50     private final transient ReentrantLock lock = new ReentrantLock();
51     private final PriorityQueue<E> q = new PriorityQueue<E>();
52    
53     /**
54     * Thread designated to wait for the element at the head of
55     * the queue. This variant of the Leader-Follower pattern
56     * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
57     * minimize unnecessary timed waiting. When a thread becomes
58     * the leader, it waits only for the next delay to elapse, but
59     * other threads await indefinitely. The leader thread must
60     * signal some other thread before returning from take() or
61     * poll(...), unless some other thread becomes leader in the
62     * interim. Whenever the head of the queue is replaced with
63     * an element with an earlier expiration time, the leader
64     * field is invalidated by being reset to null, and some
65     * waiting thread, but not necessarily the current leader, is
66     * signalled. So waiting threads must be prepared to acquire
67     * and lose leadership while waiting.
68     */
69     private Thread leader;
70    
71     /**
72     * Condition signalled when a newer element becomes available
73     * at the head of the queue or a new thread may need to
74     * become leader.
75     */
76     private final Condition available = lock.newCondition();
77    
78     /**
79     * Creates a new {@code DelayQueue} that is initially empty.
80     */
81     public DelayQueue() {}
82    
83     /**
84     * Creates a {@code DelayQueue} initially containing the elements of the
85     * given collection of {@link Delayed} instances.
86     *
87     * @param c the collection of elements to initially contain
88     * @throws NullPointerException if the specified collection or any
89     * of its elements are null
90     */
91     public DelayQueue(Collection<? extends E> c) {
92     this.addAll(c);
93     }
94    
95     /**
96     * Inserts the specified element into this delay queue.
97     *
98     * @param e the element to add
99     * @return {@code true} (as specified by {@link Collection#add})
100     * @throws NullPointerException if the specified element is null
101     */
102     public boolean add(E e) {
103     return offer(e);
104     }
105    
106     /**
107     * Inserts the specified element into this delay queue.
108     *
109     * @param e the element to add
110     * @return {@code true}
111     * @throws NullPointerException if the specified element is null
112     */
113     public boolean offer(E e) {
114     final ReentrantLock lock = this.lock;
115     lock.lock();
116     try {
117     q.offer(e);
118     if (q.peek() == e) {
119     leader = null;
120     available.signal();
121     }
122     return true;
123     } finally {
124     lock.unlock();
125     }
126     }
127    
128     /**
129     * Inserts the specified element into this delay queue. As the queue is
130     * unbounded this method will never block.
131     *
132     * @param e the element to add
133     * @throws NullPointerException {@inheritDoc}
134     */
135     public void put(E e) {
136     offer(e);
137     }
138    
139     /**
140     * Inserts the specified element into this delay queue. As the queue is
141     * unbounded this method will never block.
142     *
143     * @param e the element to add
144     * @param timeout This parameter is ignored as the method never blocks
145     * @param unit This parameter is ignored as the method never blocks
146     * @return {@code true}
147     * @throws NullPointerException {@inheritDoc}
148     */
149     public boolean offer(E e, long timeout, TimeUnit unit) {
150     return offer(e);
151     }
152    
153     /**
154     * Retrieves and removes the head of this queue, or returns {@code null}
155     * if this queue has no elements with an expired delay.
156     *
157     * @return the head of this queue, or {@code null} if this
158     * queue has no elements with an expired delay
159     */
160     public E poll() {
161     final ReentrantLock lock = this.lock;
162     lock.lock();
163     try {
164     E first = q.peek();
165     return (first == null || first.getDelay(NANOSECONDS) > 0)
166     ? null
167     : q.poll();
168     } finally {
169     lock.unlock();
170     }
171     }
172    
173     /**
174     * Retrieves and removes the head of this queue, waiting if necessary
175     * until an element with an expired delay is available on this queue.
176     *
177     * @return the head of this queue
178     * @throws InterruptedException {@inheritDoc}
179     */
180     public E take() throws InterruptedException {
181     final ReentrantLock lock = this.lock;
182     lock.lockInterruptibly();
183     try {
184     for (;;) {
185     E first = q.peek();
186     if (first == null)
187     available.await();
188     else {
189     long delay = first.getDelay(NANOSECONDS);
190     if (delay <= 0L)
191     return q.poll();
192     first = null; // don't retain ref while waiting
193     if (leader != null)
194     available.await();
195     else {
196     Thread thisThread = Thread.currentThread();
197     leader = thisThread;
198     try {
199     available.awaitNanos(delay);
200     } finally {
201     if (leader == thisThread)
202     leader = null;
203     }
204     }
205     }
206     }
207     } finally {
208     if (leader == null && q.peek() != null)
209     available.signal();
210     lock.unlock();
211     }
212     }
213    
214     /**
215     * Retrieves and removes the head of this queue, waiting if necessary
216     * until an element with an expired delay is available on this queue,
217     * or the specified wait time expires.
218     *
219     * @return the head of this queue, or {@code null} if the
220     * specified waiting time elapses before an element with
221     * an expired delay becomes available
222     * @throws InterruptedException {@inheritDoc}
223     */
224     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
225     long nanos = unit.toNanos(timeout);
226     final ReentrantLock lock = this.lock;
227     lock.lockInterruptibly();
228     try {
229     for (;;) {
230     E first = q.peek();
231     if (first == null) {
232     if (nanos <= 0L)
233     return null;
234     else
235     nanos = available.awaitNanos(nanos);
236     } else {
237     long delay = first.getDelay(NANOSECONDS);
238     if (delay <= 0L)
239     return q.poll();
240     if (nanos <= 0L)
241     return null;
242     first = null; // don't retain ref while waiting
243     if (nanos < delay || leader != null)
244     nanos = available.awaitNanos(nanos);
245     else {
246     Thread thisThread = Thread.currentThread();
247     leader = thisThread;
248     try {
249     long timeLeft = available.awaitNanos(delay);
250     nanos -= delay - timeLeft;
251     } finally {
252     if (leader == thisThread)
253     leader = null;
254     }
255     }
256     }
257     }
258     } finally {
259     if (leader == null && q.peek() != null)
260     available.signal();
261     lock.unlock();
262     }
263     }
264    
265     /**
266     * Retrieves, but does not remove, the head of this queue, or
267     * returns {@code null} if this queue is empty. Unlike
268     * {@code poll}, if no expired elements are available in the queue,
269     * this method returns the element that will expire next,
270     * if one exists.
271     *
272     * @return the head of this queue, or {@code null} if this
273     * queue is empty
274     */
275     public E peek() {
276     final ReentrantLock lock = this.lock;
277     lock.lock();
278     try {
279     return q.peek();
280     } finally {
281     lock.unlock();
282     }
283     }
284    
285     public int size() {
286     final ReentrantLock lock = this.lock;
287     lock.lock();
288     try {
289     return q.size();
290     } finally {
291     lock.unlock();
292     }
293     }
294    
295     /**
296     * Returns first element only if it is expired.
297     * Used only by drainTo. Call only when holding lock.
298     */
299     private E peekExpired() {
300     // assert lock.isHeldByCurrentThread();
301     E first = q.peek();
302     return (first == null || first.getDelay(NANOSECONDS) > 0) ?
303     null : first;
304     }
305    
306     /**
307     * @throws UnsupportedOperationException {@inheritDoc}
308     * @throws ClassCastException {@inheritDoc}
309     * @throws NullPointerException {@inheritDoc}
310     * @throws IllegalArgumentException {@inheritDoc}
311     */
312     public int drainTo(Collection<? super E> c) {
313     if (c == null)
314     throw new NullPointerException();
315     if (c == this)
316     throw new IllegalArgumentException();
317     final ReentrantLock lock = this.lock;
318     lock.lock();
319     try {
320     int n = 0;
321     for (E e; (e = peekExpired()) != null;) {
322     c.add(e); // In this order, in case add() throws.
323     q.poll();
324     ++n;
325     }
326     return n;
327     } finally {
328     lock.unlock();
329     }
330     }
331    
332     /**
333     * @throws UnsupportedOperationException {@inheritDoc}
334     * @throws ClassCastException {@inheritDoc}
335     * @throws NullPointerException {@inheritDoc}
336     * @throws IllegalArgumentException {@inheritDoc}
337     */
338     public int drainTo(Collection<? super E> c, int maxElements) {
339     if (c == null)
340     throw new NullPointerException();
341     if (c == this)
342     throw new IllegalArgumentException();
343     if (maxElements <= 0)
344     return 0;
345     final ReentrantLock lock = this.lock;
346     lock.lock();
347     try {
348     int n = 0;
349     for (E e; n < maxElements && (e = peekExpired()) != null;) {
350     c.add(e); // In this order, in case add() throws.
351     q.poll();
352     ++n;
353     }
354     return n;
355     } finally {
356     lock.unlock();
357     }
358     }
359    
360     /**
361     * Atomically removes all of the elements from this delay queue.
362     * The queue will be empty after this call returns.
363     * Elements with an unexpired delay are not waited for; they are
364     * simply discarded from the queue.
365     */
366     public void clear() {
367     final ReentrantLock lock = this.lock;
368     lock.lock();
369     try {
370     q.clear();
371     } finally {
372     lock.unlock();
373     }
374     }
375    
376     /**
377     * Always returns {@code Integer.MAX_VALUE} because
378     * a {@code DelayQueue} is not capacity constrained.
379     *
380     * @return {@code Integer.MAX_VALUE}
381     */
382     public int remainingCapacity() {
383     return Integer.MAX_VALUE;
384     }
385    
386     /**
387     * Returns an array containing all of the elements in this queue.
388     * The returned array elements are in no particular order.
389     *
390     * <p>The returned array will be "safe" in that no references to it are
391     * maintained by this queue. (In other words, this method must allocate
392     * a new array). The caller is thus free to modify the returned array.
393     *
394     * <p>This method acts as bridge between array-based and collection-based
395     * APIs.
396     *
397     * @return an array containing all of the elements in this queue
398     */
399     public Object[] toArray() {
400     final ReentrantLock lock = this.lock;
401     lock.lock();
402     try {
403     return q.toArray();
404     } finally {
405     lock.unlock();
406     }
407     }
408    
409     /**
410     * Returns an array containing all of the elements in this queue; the
411     * runtime type of the returned array is that of the specified array.
412     * The returned array elements are in no particular order.
413     * If the queue fits in the specified array, it is returned therein.
414     * Otherwise, a new array is allocated with the runtime type of the
415     * specified array and the size of this queue.
416     *
417     * <p>If this queue fits in the specified array with room to spare
418     * (i.e., the array has more elements than this queue), the element in
419     * the array immediately following the end of the queue is set to
420     * {@code null}.
421     *
422     * <p>Like the {@link #toArray()} method, this method acts as bridge between
423     * array-based and collection-based APIs. Further, this method allows
424     * precise control over the runtime type of the output array, and may,
425     * under certain circumstances, be used to save allocation costs.
426     *
427     * <p>The following code can be used to dump a delay queue into a newly
428     * allocated array of {@code Delayed}:
429     *
430     * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
431     *
432     * Note that {@code toArray(new Object[0])} is identical in function to
433     * {@code toArray()}.
434     *
435     * @param a the array into which the elements of the queue are to
436     * be stored, if it is big enough; otherwise, a new array of the
437     * same runtime type is allocated for this purpose
438     * @return an array containing all of the elements in this queue
439     * @throws ArrayStoreException if the runtime type of the specified array
440     * is not a supertype of the runtime type of every element in
441     * this queue
442     * @throws NullPointerException if the specified array is null
443     */
444     public <T> T[] toArray(T[] a) {
445     final ReentrantLock lock = this.lock;
446     lock.lock();
447     try {
448     return q.toArray(a);
449     } finally {
450     lock.unlock();
451     }
452     }
453    
454     /**
455     * Removes a single instance of the specified element from this
456     * queue, if it is present, whether or not it has expired.
457     */
458     public boolean remove(Object o) {
459     final ReentrantLock lock = this.lock;
460     lock.lock();
461     try {
462     return q.remove(o);
463     } finally {
464     lock.unlock();
465     }
466     }
467    
468     /**
469     * Identity-based version for use in Itr.remove.
470     */
471     void removeEQ(Object o) {
472     final ReentrantLock lock = this.lock;
473     lock.lock();
474     try {
475     for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
476     if (o == it.next()) {
477     it.remove();
478     break;
479     }
480     }
481     } finally {
482     lock.unlock();
483     }
484     }
485    
486     /**
487     * Returns an iterator over all the elements (both expired and
488     * unexpired) in this queue. The iterator does not return the
489     * elements in any particular order.
490     *
491     * <p>The returned iterator is
492     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
493     *
494     * @return an iterator over the elements in this queue
495     */
496     public Iterator<E> iterator() {
497     return new Itr(toArray());
498     }
499    
500     /**
501     * Snapshot iterator that works off copy of underlying q array.
502     */
503     private class Itr implements Iterator<E> {
504     final Object[] array; // Array of all elements
505     int cursor; // index of next element to return
506     int lastRet; // index of last element, or -1 if no such
507    
508     Itr(Object[] array) {
509     lastRet = -1;
510     this.array = array;
511     }
512    
513     public boolean hasNext() {
514     return cursor < array.length;
515     }
516    
517     @SuppressWarnings("unchecked")
518     public E next() {
519     if (cursor >= array.length)
520     throw new NoSuchElementException();
521     lastRet = cursor;
522     return (E)array[cursor++];
523     }
524    
525     public void remove() {
526     if (lastRet < 0)
527     throw new IllegalStateException();
528     removeEQ(array[lastRet]);
529     lastRet = -1;
530     }
531     }
532    
533     }