ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.15
Committed: Sat Sep 13 18:51:11 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.14: +3 -1 lines
Log Message:
Proofreading pass -- many minor adjustments

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.15 * <p>This class implements all of the <em>optional</em> methods
20     * of the {@link Collection} and {@link Iterator} interfaces.
21 dl 1.4 * @since 1.5
22     * @author Doug Lea
23     */
24 tim 1.1
25 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
26     implements BlockingQueue<E> {
27 tim 1.6
28 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
29 dl 1.4 private transient final Condition available = lock.newCondition();
30 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
31 tim 1.1
32 dl 1.4 /**
33 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
34 dl 1.4 */
35 tim 1.1 public DelayQueue() {}
36    
37 tim 1.6 /**
38 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
39 dholmes 1.7 * given collection of {@link Delayed} instances.
40     *
41     * @throws NullPointerException if <tt>c</tt> or any element within it
42     * is <tt>null</tt>
43 tim 1.6 *
44     */
45     public DelayQueue(Collection<? extends E> c) {
46     this.addAll(c);
47     }
48    
49 dholmes 1.7 /**
50 dl 1.14 * Inserts the specified element to this delay queue.
51 dholmes 1.7 *
52 dl 1.14 * @param o the element to add
53 dholmes 1.7 * @return <tt>true</tt>
54 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
55 dholmes 1.7 */
56     public boolean offer(E o) {
57 dl 1.2 lock.lock();
58     try {
59 dl 1.3 E first = q.peek();
60 dholmes 1.7 q.offer(o);
61     if (first == null || o.compareTo(first) < 0)
62 dl 1.4 available.signalAll();
63 dl 1.2 return true;
64 tim 1.13 } finally {
65 dl 1.2 lock.unlock();
66     }
67     }
68    
69 dholmes 1.7
70     /**
71     * Adds the specified element to this delay queue. As the queue is
72     * unbounded this method will never block.
73 dl 1.14 * @param o the element to add
74     * @throws NullPointerException if the specified element is <tt>null</tt>.
75 dholmes 1.7 */
76     public void put(E o) {
77     offer(o);
78 dl 1.2 }
79    
80 dholmes 1.7 /**
81 dl 1.14 * Adds the specified element to this delay queue. As the queue is
82 dholmes 1.7 * unbounded this method will never block.
83 dl 1.14 * @param o the element to add
84     * @param timeout This parameter is ignored as the method never blocks
85 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
86     * @return <tt>true</tt>
87 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
88 dholmes 1.7 */
89 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
90 dholmes 1.7 return offer(o);
91 dl 1.2 }
92    
93 dholmes 1.7 /**
94     * Adds the specified element to this queue.
95 dl 1.14 * @param o the element to add
96 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
97     * <tt>Collection.add</tt>).
98     *
99 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
100 dholmes 1.7 */
101     public boolean add(E o) {
102     return offer(o);
103 dl 1.2 }
104    
105 dl 1.3 public E take() throws InterruptedException {
106 dl 1.2 lock.lockInterruptibly();
107     try {
108     for (;;) {
109 dl 1.3 E first = q.peek();
110 dl 1.12 if (first == null) {
111 dl 1.4 available.await();
112 tim 1.13 } else {
113 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
114 dl 1.12 if (delay > 0) {
115     long tl = available.awaitNanos(delay);
116 tim 1.13 } else {
117 dl 1.3 E x = q.poll();
118 dl 1.2 assert x != null;
119     if (q.size() != 0)
120 dl 1.4 available.signalAll(); // wake up other takers
121 dl 1.2 return x;
122 tim 1.6
123 dl 1.2 }
124     }
125     }
126 tim 1.13 } finally {
127 dl 1.2 lock.unlock();
128     }
129     }
130    
131 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
132 dl 1.2 lock.lockInterruptibly();
133     long nanos = unit.toNanos(time);
134     try {
135     for (;;) {
136 dl 1.3 E first = q.peek();
137 dl 1.2 if (first == null) {
138     if (nanos <= 0)
139     return null;
140     else
141 dl 1.4 nanos = available.awaitNanos(nanos);
142 tim 1.13 } else {
143 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
144     if (delay > 0) {
145     if (delay > nanos)
146     delay = nanos;
147 dl 1.4 long timeLeft = available.awaitNanos(delay);
148 dl 1.2 nanos -= delay - timeLeft;
149 tim 1.13 } else {
150 dl 1.3 E x = q.poll();
151 dl 1.2 assert x != null;
152     if (q.size() != 0)
153 tim 1.6 available.signalAll();
154 dl 1.2 return x;
155     }
156     }
157     }
158 tim 1.13 } finally {
159 dl 1.2 lock.unlock();
160     }
161     }
162    
163    
164 dl 1.3 public E poll() {
165 dl 1.2 lock.lock();
166     try {
167 dl 1.3 E first = q.peek();
168 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
169     return null;
170     else {
171 dl 1.3 E x = q.poll();
172 dl 1.2 assert x != null;
173     if (q.size() != 0)
174 tim 1.6 available.signalAll();
175 dl 1.2 return x;
176     }
177 tim 1.13 } finally {
178 dl 1.2 lock.unlock();
179     }
180     }
181    
182 dl 1.3 public E peek() {
183 dl 1.2 lock.lock();
184     try {
185     return q.peek();
186 tim 1.13 } finally {
187 dl 1.2 lock.unlock();
188     }
189 tim 1.1 }
190    
191 dl 1.2 public int size() {
192     lock.lock();
193     try {
194     return q.size();
195 tim 1.13 } finally {
196 dl 1.2 lock.unlock();
197     }
198     }
199    
200 dholmes 1.7 /**
201     * Atomically removes all of the elements from this delay queue.
202     * The queue will be empty after this call returns.
203     */
204 dl 1.2 public void clear() {
205     lock.lock();
206     try {
207     q.clear();
208 tim 1.13 } finally {
209 dl 1.2 lock.unlock();
210     }
211     }
212 tim 1.1
213 dholmes 1.7 /**
214     * Always returns <tt>Integer.MAX_VALUE</tt> because
215     * a <tt>DelayQueue</tt> is not capacity constrained.
216     * @return <tt>Integer.MAX_VALUE</tt>
217     */
218 dl 1.2 public int remainingCapacity() {
219     return Integer.MAX_VALUE;
220 tim 1.1 }
221 dl 1.2
222     public Object[] toArray() {
223     lock.lock();
224     try {
225     return q.toArray();
226 tim 1.13 } finally {
227 dl 1.2 lock.unlock();
228     }
229 tim 1.1 }
230 dl 1.2
231     public <T> T[] toArray(T[] array) {
232     lock.lock();
233     try {
234     return q.toArray(array);
235 tim 1.13 } finally {
236 dl 1.2 lock.unlock();
237     }
238 tim 1.1 }
239    
240 dholmes 1.7 public boolean remove(Object o) {
241 dl 1.2 lock.lock();
242     try {
243 dholmes 1.7 return q.remove(o);
244 tim 1.13 } finally {
245 dl 1.2 lock.unlock();
246     }
247     }
248    
249 dholmes 1.7 /**
250     * Returns an iterator over the elements in this queue. The iterator
251 dl 1.8 * does not return the elements in any particular order. The
252 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
253 dl 1.8 * throw {@link java.util.ConcurrentModificationException}
254     * upon detected interference.
255 dholmes 1.7 *
256     * @return an iterator over the elements in this queue.
257     */
258 dl 1.3 public Iterator<E> iterator() {
259 dl 1.2 lock.lock();
260     try {
261     return new Itr(q.iterator());
262 tim 1.13 } finally {
263 dl 1.2 lock.unlock();
264     }
265     }
266    
267 dl 1.3 private class Itr<E> implements Iterator<E> {
268     private final Iterator<E> iter;
269 tim 1.6 Itr(Iterator<E> i) {
270     iter = i;
271 dl 1.2 }
272    
273 tim 1.6 public boolean hasNext() {
274 dl 1.2 return iter.hasNext();
275 tim 1.6 }
276    
277     public E next() {
278 dl 1.2 lock.lock();
279     try {
280     return iter.next();
281 tim 1.13 } finally {
282 dl 1.2 lock.unlock();
283     }
284 tim 1.6 }
285    
286     public void remove() {
287 dl 1.2 lock.lock();
288     try {
289     iter.remove();
290 tim 1.13 } finally {
291 dl 1.2 lock.unlock();
292     }
293 tim 1.6 }
294 tim 1.1 }
295    
296     }