ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.25
Committed: Tue Jan 27 11:36:31 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PFD
Changes since 1.24: +5 -0 lines
Log Message:
Add Collection framework membership doc

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * <p>This class implements all of the <em>optional</em> methods
20 * of the {@link Collection} and {@link Iterator} interfaces.
21 *
22 * <p>This class is a member of the
23 * <a href="{@docRoot}/../guide/collections/index.html">
24 * Java Collections Framework</a>.
25 *
26 * @since 1.5
27 * @author Doug Lea
28 * @param <E> the type of elements held in this collection
29 */
30
31 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
32 implements BlockingQueue<E> {
33
34 private transient final ReentrantLock lock = new ReentrantLock();
35 private transient final Condition available = lock.newCondition();
36 private final PriorityQueue<E> q = new PriorityQueue<E>();
37
38 /**
39 * Creates a new <tt>DelayQueue</tt> that is initially empty.
40 */
41 public DelayQueue() {}
42
43 /**
44 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
45 * given collection of {@link Delayed} instances.
46 *
47 * @param c the collection
48 * @throws NullPointerException if <tt>c</tt> or any element within it
49 * is <tt>null</tt>
50 *
51 */
52 public DelayQueue(Collection<? extends E> c) {
53 this.addAll(c);
54 }
55
56 /**
57 * Inserts the specified element into this delay queue.
58 *
59 * @param o the element to add
60 * @return <tt>true</tt>
61 * @throws NullPointerException if the specified element is <tt>null</tt>.
62 */
63 public boolean offer(E o) {
64 final ReentrantLock lock = this.lock;
65 lock.lock();
66 try {
67 E first = q.peek();
68 q.offer(o);
69 if (first == null || o.compareTo(first) < 0)
70 available.signalAll();
71 return true;
72 } finally {
73 lock.unlock();
74 }
75 }
76
77
78 /**
79 * Adds the specified element to this delay queue. As the queue is
80 * unbounded this method will never block.
81 * @param o the element to add
82 * @throws NullPointerException if the specified element is <tt>null</tt>.
83 */
84 public void put(E o) {
85 offer(o);
86 }
87
88 /**
89 * Inserts the specified element into this delay queue. As the queue is
90 * unbounded this method will never block.
91 * @param o the element to add
92 * @param timeout This parameter is ignored as the method never blocks
93 * @param unit This parameter is ignored as the method never blocks
94 * @return <tt>true</tt>
95 * @throws NullPointerException if the specified element is <tt>null</tt>.
96 */
97 public boolean offer(E o, long timeout, TimeUnit unit) {
98 return offer(o);
99 }
100
101 /**
102 * Adds the specified element to this queue.
103 * @param o the element to add
104 * @return <tt>true</tt> (as per the general contract of
105 * <tt>Collection.add</tt>).
106 *
107 * @throws NullPointerException if the specified element is <tt>null</tt>.
108 */
109 public boolean add(E o) {
110 return offer(o);
111 }
112
113 public E take() throws InterruptedException {
114 final ReentrantLock lock = this.lock;
115 lock.lockInterruptibly();
116 try {
117 for (;;) {
118 E first = q.peek();
119 if (first == null) {
120 available.await();
121 } else {
122 long delay = first.getDelay(TimeUnit.NANOSECONDS);
123 if (delay > 0) {
124 long tl = available.awaitNanos(delay);
125 } else {
126 E x = q.poll();
127 assert x != null;
128 if (q.size() != 0)
129 available.signalAll(); // wake up other takers
130 return x;
131
132 }
133 }
134 }
135 } finally {
136 lock.unlock();
137 }
138 }
139
140 public E poll(long time, TimeUnit unit) throws InterruptedException {
141 final ReentrantLock lock = this.lock;
142 lock.lockInterruptibly();
143 long nanos = unit.toNanos(time);
144 try {
145 for (;;) {
146 E first = q.peek();
147 if (first == null) {
148 if (nanos <= 0)
149 return null;
150 else
151 nanos = available.awaitNanos(nanos);
152 } else {
153 long delay = first.getDelay(TimeUnit.NANOSECONDS);
154 if (delay > 0) {
155 if (delay > nanos)
156 delay = nanos;
157 long timeLeft = available.awaitNanos(delay);
158 nanos -= delay - timeLeft;
159 } else {
160 E x = q.poll();
161 assert x != null;
162 if (q.size() != 0)
163 available.signalAll();
164 return x;
165 }
166 }
167 }
168 } finally {
169 lock.unlock();
170 }
171 }
172
173
174 public E poll() {
175 final ReentrantLock lock = this.lock;
176 lock.lock();
177 try {
178 E first = q.peek();
179 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
180 return null;
181 else {
182 E x = q.poll();
183 assert x != null;
184 if (q.size() != 0)
185 available.signalAll();
186 return x;
187 }
188 } finally {
189 lock.unlock();
190 }
191 }
192
193 public E peek() {
194 final ReentrantLock lock = this.lock;
195 lock.lock();
196 try {
197 return q.peek();
198 } finally {
199 lock.unlock();
200 }
201 }
202
203 public int size() {
204 final ReentrantLock lock = this.lock;
205 lock.lock();
206 try {
207 return q.size();
208 } finally {
209 lock.unlock();
210 }
211 }
212
213 public int drainTo(Collection<? super E> c) {
214 if (c == null)
215 throw new NullPointerException();
216 if (c == this)
217 throw new IllegalArgumentException();
218 final ReentrantLock lock = this.lock;
219 lock.lock();
220 try {
221 int n = 0;
222 for (;;) {
223 E first = q.peek();
224 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
225 break;
226 c.add(q.poll());
227 ++n;
228 }
229 if (n > 0)
230 available.signalAll();
231 return n;
232 } finally {
233 lock.unlock();
234 }
235 }
236
237 public int drainTo(Collection<? super E> c, int maxElements) {
238 if (c == null)
239 throw new NullPointerException();
240 if (c == this)
241 throw new IllegalArgumentException();
242 if (maxElements <= 0)
243 return 0;
244 final ReentrantLock lock = this.lock;
245 lock.lock();
246 try {
247 int n = 0;
248 while (n < maxElements) {
249 E first = q.peek();
250 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
251 break;
252 c.add(q.poll());
253 ++n;
254 }
255 if (n > 0)
256 available.signalAll();
257 return n;
258 } finally {
259 lock.unlock();
260 }
261 }
262
263 /**
264 * Atomically removes all of the elements from this delay queue.
265 * The queue will be empty after this call returns.
266 */
267 public void clear() {
268 final ReentrantLock lock = this.lock;
269 lock.lock();
270 try {
271 q.clear();
272 } finally {
273 lock.unlock();
274 }
275 }
276
277 /**
278 * Always returns <tt>Integer.MAX_VALUE</tt> because
279 * a <tt>DelayQueue</tt> is not capacity constrained.
280 * @return <tt>Integer.MAX_VALUE</tt>
281 */
282 public int remainingCapacity() {
283 return Integer.MAX_VALUE;
284 }
285
286 public Object[] toArray() {
287 final ReentrantLock lock = this.lock;
288 lock.lock();
289 try {
290 return q.toArray();
291 } finally {
292 lock.unlock();
293 }
294 }
295
296 public <T> T[] toArray(T[] array) {
297 final ReentrantLock lock = this.lock;
298 lock.lock();
299 try {
300 return q.toArray(array);
301 } finally {
302 lock.unlock();
303 }
304 }
305
306 public boolean remove(Object o) {
307 final ReentrantLock lock = this.lock;
308 lock.lock();
309 try {
310 return q.remove(o);
311 } finally {
312 lock.unlock();
313 }
314 }
315
316 /**
317 * Returns an iterator over the elements in this queue. The iterator
318 * does not return the elements in any particular order. The
319 * returned iterator is a thread-safe "fast-fail" iterator that will
320 * throw {@link java.util.ConcurrentModificationException}
321 * upon detected interference.
322 *
323 * @return an iterator over the elements in this queue.
324 */
325 public Iterator<E> iterator() {
326 final ReentrantLock lock = this.lock;
327 lock.lock();
328 try {
329 return new Itr(q.iterator());
330 } finally {
331 lock.unlock();
332 }
333 }
334
335 private class Itr<E> implements Iterator<E> {
336 private final Iterator<E> iter;
337 Itr(Iterator<E> i) {
338 iter = i;
339 }
340
341 public boolean hasNext() {
342 return iter.hasNext();
343 }
344
345 public E next() {
346 final ReentrantLock lock = DelayQueue.this.lock;
347 lock.lock();
348 try {
349 return iter.next();
350 } finally {
351 lock.unlock();
352 }
353 }
354
355 public void remove() {
356 final ReentrantLock lock = DelayQueue.this.lock;
357 lock.lock();
358 try {
359 iter.remove();
360 } finally {
361 lock.unlock();
362 }
363 }
364 }
365
366 }