ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.18
Committed: Sat Oct 11 15:37:31 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.17: +1 -1 lines
Log Message:
Redeclare some Conditions as ReentrantLock.ConditionObjects

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * <p>This class implements all of the <em>optional</em> methods
20 * of the {@link Collection} and {@link Iterator} interfaces.
21 * @since 1.5
22 * @author Doug Lea
23 */
24
25 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
26 implements BlockingQueue<E> {
27
28 private transient final ReentrantLock lock = new ReentrantLock();
29 private transient final ReentrantLock.ConditionObject available = lock.newCondition();
30 private final PriorityQueue<E> q = new PriorityQueue<E>();
31
32 /**
33 * Creates a new <tt>DelayQueue</tt> that is initially empty.
34 */
35 public DelayQueue() {}
36
37 /**
38 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
39 * given collection of {@link Delayed} instances.
40 *
41 * @throws NullPointerException if <tt>c</tt> or any element within it
42 * is <tt>null</tt>
43 *
44 */
45 public DelayQueue(Collection<? extends E> c) {
46 this.addAll(c);
47 }
48
49 /**
50 * Inserts the specified element into this delay queue.
51 *
52 * @param o the element to add
53 * @return <tt>true</tt>
54 * @throws NullPointerException if the specified element is <tt>null</tt>.
55 */
56 public boolean offer(E o) {
57 lock.lock();
58 try {
59 E first = q.peek();
60 q.offer(o);
61 if (first == null || o.compareTo(first) < 0)
62 available.signalAll();
63 return true;
64 } finally {
65 lock.unlock();
66 }
67 }
68
69
70 /**
71 * Adds the specified element to this delay queue. As the queue is
72 * unbounded this method will never block.
73 * @param o the element to add
74 * @throws NullPointerException if the specified element is <tt>null</tt>.
75 */
76 public void put(E o) {
77 offer(o);
78 }
79
80 /**
81 * Inserts the specified element into this delay queue. As the queue is
82 * unbounded this method will never block.
83 * @param o the element to add
84 * @param timeout This parameter is ignored as the method never blocks
85 * @param unit This parameter is ignored as the method never blocks
86 * @return <tt>true</tt>
87 * @throws NullPointerException if the specified element is <tt>null</tt>.
88 */
89 public boolean offer(E o, long timeout, TimeUnit unit) {
90 return offer(o);
91 }
92
93 /**
94 * Adds the specified element to this queue.
95 * @param o the element to add
96 * @return <tt>true</tt> (as per the general contract of
97 * <tt>Collection.add</tt>).
98 *
99 * @throws NullPointerException if the specified element is <tt>null</tt>.
100 */
101 public boolean add(E o) {
102 return offer(o);
103 }
104
105 public E take() throws InterruptedException {
106 lock.lockInterruptibly();
107 try {
108 for (;;) {
109 E first = q.peek();
110 if (first == null) {
111 available.await();
112 } else {
113 long delay = first.getDelay(TimeUnit.NANOSECONDS);
114 if (delay > 0) {
115 long tl = available.awaitNanos(delay);
116 } else {
117 E x = q.poll();
118 assert x != null;
119 if (q.size() != 0)
120 available.signalAll(); // wake up other takers
121 return x;
122
123 }
124 }
125 }
126 } finally {
127 lock.unlock();
128 }
129 }
130
131 public E poll(long time, TimeUnit unit) throws InterruptedException {
132 lock.lockInterruptibly();
133 long nanos = unit.toNanos(time);
134 try {
135 for (;;) {
136 E first = q.peek();
137 if (first == null) {
138 if (nanos <= 0)
139 return null;
140 else
141 nanos = available.awaitNanos(nanos);
142 } else {
143 long delay = first.getDelay(TimeUnit.NANOSECONDS);
144 if (delay > 0) {
145 if (delay > nanos)
146 delay = nanos;
147 long timeLeft = available.awaitNanos(delay);
148 nanos -= delay - timeLeft;
149 } else {
150 E x = q.poll();
151 assert x != null;
152 if (q.size() != 0)
153 available.signalAll();
154 return x;
155 }
156 }
157 }
158 } finally {
159 lock.unlock();
160 }
161 }
162
163
164 public E poll() {
165 lock.lock();
166 try {
167 E first = q.peek();
168 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
169 return null;
170 else {
171 E x = q.poll();
172 assert x != null;
173 if (q.size() != 0)
174 available.signalAll();
175 return x;
176 }
177 } finally {
178 lock.unlock();
179 }
180 }
181
182 public E peek() {
183 lock.lock();
184 try {
185 return q.peek();
186 } finally {
187 lock.unlock();
188 }
189 }
190
191 public int size() {
192 lock.lock();
193 try {
194 return q.size();
195 } finally {
196 lock.unlock();
197 }
198 }
199
200 public int drainTo(Collection<? super E> c) {
201 if (c == null)
202 throw new NullPointerException();
203 if (c == this)
204 throw new IllegalArgumentException();
205 lock.lock();
206 try {
207 int n = 0;
208 for (;;) {
209 E first = q.peek();
210 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
211 break;
212 c.add(q.poll());
213 ++n;
214 }
215 if (n > 0)
216 available.signalAll();
217 return n;
218 } finally {
219 lock.unlock();
220 }
221 }
222
223 public int drainTo(Collection<? super E> c, int maxElements) {
224 if (c == null)
225 throw new NullPointerException();
226 if (c == this)
227 throw new IllegalArgumentException();
228 if (maxElements <= 0)
229 return 0;
230 lock.lock();
231 try {
232 int n = 0;
233 while (n < maxElements) {
234 E first = q.peek();
235 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
236 break;
237 c.add(q.poll());
238 ++n;
239 }
240 if (n > 0)
241 available.signalAll();
242 return n;
243 } finally {
244 lock.unlock();
245 }
246 }
247
248 /**
249 * Atomically removes all of the elements from this delay queue.
250 * The queue will be empty after this call returns.
251 */
252 public void clear() {
253 lock.lock();
254 try {
255 q.clear();
256 } finally {
257 lock.unlock();
258 }
259 }
260
261 /**
262 * Always returns <tt>Integer.MAX_VALUE</tt> because
263 * a <tt>DelayQueue</tt> is not capacity constrained.
264 * @return <tt>Integer.MAX_VALUE</tt>
265 */
266 public int remainingCapacity() {
267 return Integer.MAX_VALUE;
268 }
269
270 public Object[] toArray() {
271 lock.lock();
272 try {
273 return q.toArray();
274 } finally {
275 lock.unlock();
276 }
277 }
278
279 public <T> T[] toArray(T[] array) {
280 lock.lock();
281 try {
282 return q.toArray(array);
283 } finally {
284 lock.unlock();
285 }
286 }
287
288 public boolean remove(Object o) {
289 lock.lock();
290 try {
291 return q.remove(o);
292 } finally {
293 lock.unlock();
294 }
295 }
296
297 /**
298 * Returns an iterator over the elements in this queue. The iterator
299 * does not return the elements in any particular order. The
300 * returned iterator is a thread-safe "fast-fail" iterator that will
301 * throw {@link java.util.ConcurrentModificationException}
302 * upon detected interference.
303 *
304 * @return an iterator over the elements in this queue.
305 */
306 public Iterator<E> iterator() {
307 lock.lock();
308 try {
309 return new Itr(q.iterator());
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 private class Itr<E> implements Iterator<E> {
316 private final Iterator<E> iter;
317 Itr(Iterator<E> i) {
318 iter = i;
319 }
320
321 public boolean hasNext() {
322 return iter.hasNext();
323 }
324
325 public E next() {
326 lock.lock();
327 try {
328 return iter.next();
329 } finally {
330 lock.unlock();
331 }
332 }
333
334 public void remove() {
335 lock.lock();
336 try {
337 iter.remove();
338 } finally {
339 lock.unlock();
340 }
341 }
342 }
343
344 }