ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.20
Committed: Sun Oct 19 13:38:34 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_NOV3_FREEZE, JSR166_DEC9_PRE_ES_SUBMIT, JSR166_DEC9_POST_ES_SUBMIT
Changes since 1.19: +1 -1 lines
Log Message:
Changed doc strings for generic params

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * <p>This class implements all of the <em>optional</em> methods
20 * of the {@link Collection} and {@link Iterator} interfaces.
21 * @since 1.5
22 * @author Doug Lea
23 * @param <E> the type of elements held in this collection
24 */
25
26 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
27 implements BlockingQueue<E> {
28
29 private transient final ReentrantLock lock = new ReentrantLock();
30 private transient final ReentrantLock.ConditionObject available = lock.newCondition();
31 private final PriorityQueue<E> q = new PriorityQueue<E>();
32
33 /**
34 * Creates a new <tt>DelayQueue</tt> that is initially empty.
35 */
36 public DelayQueue() {}
37
38 /**
39 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
40 * given collection of {@link Delayed} instances.
41 *
42 * @throws NullPointerException if <tt>c</tt> or any element within it
43 * is <tt>null</tt>
44 *
45 */
46 public DelayQueue(Collection<? extends E> c) {
47 this.addAll(c);
48 }
49
50 /**
51 * Inserts the specified element into this delay queue.
52 *
53 * @param o the element to add
54 * @return <tt>true</tt>
55 * @throws NullPointerException if the specified element is <tt>null</tt>.
56 */
57 public boolean offer(E o) {
58 lock.lock();
59 try {
60 E first = q.peek();
61 q.offer(o);
62 if (first == null || o.compareTo(first) < 0)
63 available.signalAll();
64 return true;
65 } finally {
66 lock.unlock();
67 }
68 }
69
70
71 /**
72 * Adds the specified element to this delay queue. As the queue is
73 * unbounded this method will never block.
74 * @param o the element to add
75 * @throws NullPointerException if the specified element is <tt>null</tt>.
76 */
77 public void put(E o) {
78 offer(o);
79 }
80
81 /**
82 * Inserts the specified element into this delay queue. As the queue is
83 * unbounded this method will never block.
84 * @param o the element to add
85 * @param timeout This parameter is ignored as the method never blocks
86 * @param unit This parameter is ignored as the method never blocks
87 * @return <tt>true</tt>
88 * @throws NullPointerException if the specified element is <tt>null</tt>.
89 */
90 public boolean offer(E o, long timeout, TimeUnit unit) {
91 return offer(o);
92 }
93
94 /**
95 * Adds the specified element to this queue.
96 * @param o the element to add
97 * @return <tt>true</tt> (as per the general contract of
98 * <tt>Collection.add</tt>).
99 *
100 * @throws NullPointerException if the specified element is <tt>null</tt>.
101 */
102 public boolean add(E o) {
103 return offer(o);
104 }
105
106 public E take() throws InterruptedException {
107 lock.lockInterruptibly();
108 try {
109 for (;;) {
110 E first = q.peek();
111 if (first == null) {
112 available.await();
113 } else {
114 long delay = first.getDelay(TimeUnit.NANOSECONDS);
115 if (delay > 0) {
116 long tl = available.awaitNanos(delay);
117 } else {
118 E x = q.poll();
119 assert x != null;
120 if (q.size() != 0)
121 available.signalAll(); // wake up other takers
122 return x;
123
124 }
125 }
126 }
127 } finally {
128 lock.unlock();
129 }
130 }
131
132 public E poll(long time, TimeUnit unit) throws InterruptedException {
133 lock.lockInterruptibly();
134 long nanos = unit.toNanos(time);
135 try {
136 for (;;) {
137 E first = q.peek();
138 if (first == null) {
139 if (nanos <= 0)
140 return null;
141 else
142 nanos = available.awaitNanos(nanos);
143 } else {
144 long delay = first.getDelay(TimeUnit.NANOSECONDS);
145 if (delay > 0) {
146 if (delay > nanos)
147 delay = nanos;
148 long timeLeft = available.awaitNanos(delay);
149 nanos -= delay - timeLeft;
150 } else {
151 E x = q.poll();
152 assert x != null;
153 if (q.size() != 0)
154 available.signalAll();
155 return x;
156 }
157 }
158 }
159 } finally {
160 lock.unlock();
161 }
162 }
163
164
165 public E poll() {
166 lock.lock();
167 try {
168 E first = q.peek();
169 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
170 return null;
171 else {
172 E x = q.poll();
173 assert x != null;
174 if (q.size() != 0)
175 available.signalAll();
176 return x;
177 }
178 } finally {
179 lock.unlock();
180 }
181 }
182
183 public E peek() {
184 lock.lock();
185 try {
186 return q.peek();
187 } finally {
188 lock.unlock();
189 }
190 }
191
192 public int size() {
193 lock.lock();
194 try {
195 return q.size();
196 } finally {
197 lock.unlock();
198 }
199 }
200
201 public int drainTo(Collection<? super E> c) {
202 if (c == null)
203 throw new NullPointerException();
204 if (c == this)
205 throw new IllegalArgumentException();
206 lock.lock();
207 try {
208 int n = 0;
209 for (;;) {
210 E first = q.peek();
211 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
212 break;
213 c.add(q.poll());
214 ++n;
215 }
216 if (n > 0)
217 available.signalAll();
218 return n;
219 } finally {
220 lock.unlock();
221 }
222 }
223
224 public int drainTo(Collection<? super E> c, int maxElements) {
225 if (c == null)
226 throw new NullPointerException();
227 if (c == this)
228 throw new IllegalArgumentException();
229 if (maxElements <= 0)
230 return 0;
231 lock.lock();
232 try {
233 int n = 0;
234 while (n < maxElements) {
235 E first = q.peek();
236 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
237 break;
238 c.add(q.poll());
239 ++n;
240 }
241 if (n > 0)
242 available.signalAll();
243 return n;
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 /**
250 * Atomically removes all of the elements from this delay queue.
251 * The queue will be empty after this call returns.
252 */
253 public void clear() {
254 lock.lock();
255 try {
256 q.clear();
257 } finally {
258 lock.unlock();
259 }
260 }
261
262 /**
263 * Always returns <tt>Integer.MAX_VALUE</tt> because
264 * a <tt>DelayQueue</tt> is not capacity constrained.
265 * @return <tt>Integer.MAX_VALUE</tt>
266 */
267 public int remainingCapacity() {
268 return Integer.MAX_VALUE;
269 }
270
271 public Object[] toArray() {
272 lock.lock();
273 try {
274 return q.toArray();
275 } finally {
276 lock.unlock();
277 }
278 }
279
280 public <T> T[] toArray(T[] array) {
281 lock.lock();
282 try {
283 return q.toArray(array);
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 public boolean remove(Object o) {
290 lock.lock();
291 try {
292 return q.remove(o);
293 } finally {
294 lock.unlock();
295 }
296 }
297
298 /**
299 * Returns an iterator over the elements in this queue. The iterator
300 * does not return the elements in any particular order. The
301 * returned iterator is a thread-safe "fast-fail" iterator that will
302 * throw {@link java.util.ConcurrentModificationException}
303 * upon detected interference.
304 *
305 * @return an iterator over the elements in this queue.
306 */
307 public Iterator<E> iterator() {
308 lock.lock();
309 try {
310 return new Itr(q.iterator());
311 } finally {
312 lock.unlock();
313 }
314 }
315
316 private class Itr<E> implements Iterator<E> {
317 private final Iterator<E> iter;
318 Itr(Iterator<E> i) {
319 iter = i;
320 }
321
322 public boolean hasNext() {
323 return iter.hasNext();
324 }
325
326 public E next() {
327 lock.lock();
328 try {
329 return iter.next();
330 } finally {
331 lock.unlock();
332 }
333 }
334
335 public void remove() {
336 lock.lock();
337 try {
338 iter.remove();
339 } finally {
340 lock.unlock();
341 }
342 }
343 }
344
345 }