ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.18
Committed: Sat Oct 11 15:37:31 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.17: +1 -1 lines
Log Message:
Redeclare some Conditions as ReentrantLock.ConditionObjects

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.15 * <p>This class implements all of the <em>optional</em> methods
20     * of the {@link Collection} and {@link Iterator} interfaces.
21 dl 1.4 * @since 1.5
22     * @author Doug Lea
23     */
24 tim 1.1
25 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
26     implements BlockingQueue<E> {
27 tim 1.6
28 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
29 dl 1.18 private transient final ReentrantLock.ConditionObject available = lock.newCondition();
30 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
31 tim 1.1
32 dl 1.4 /**
33 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
34 dl 1.4 */
35 tim 1.1 public DelayQueue() {}
36    
37 tim 1.6 /**
38 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
39 dholmes 1.7 * given collection of {@link Delayed} instances.
40     *
41     * @throws NullPointerException if <tt>c</tt> or any element within it
42     * is <tt>null</tt>
43 tim 1.6 *
44     */
45     public DelayQueue(Collection<? extends E> c) {
46     this.addAll(c);
47     }
48    
49 dholmes 1.7 /**
50 dl 1.16 * Inserts the specified element into this delay queue.
51 dholmes 1.7 *
52 dl 1.14 * @param o the element to add
53 dholmes 1.7 * @return <tt>true</tt>
54 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
55 dholmes 1.7 */
56     public boolean offer(E o) {
57 dl 1.2 lock.lock();
58     try {
59 dl 1.3 E first = q.peek();
60 dholmes 1.7 q.offer(o);
61     if (first == null || o.compareTo(first) < 0)
62 dl 1.4 available.signalAll();
63 dl 1.2 return true;
64 tim 1.13 } finally {
65 dl 1.2 lock.unlock();
66     }
67     }
68    
69 dholmes 1.7
70     /**
71     * Adds the specified element to this delay queue. As the queue is
72     * unbounded this method will never block.
73 dl 1.14 * @param o the element to add
74     * @throws NullPointerException if the specified element is <tt>null</tt>.
75 dholmes 1.7 */
76     public void put(E o) {
77     offer(o);
78 dl 1.2 }
79    
80 dholmes 1.7 /**
81 dl 1.16 * Inserts the specified element into this delay queue. As the queue is
82 dholmes 1.7 * unbounded this method will never block.
83 dl 1.14 * @param o the element to add
84     * @param timeout This parameter is ignored as the method never blocks
85 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
86     * @return <tt>true</tt>
87 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
88 dholmes 1.7 */
89 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
90 dholmes 1.7 return offer(o);
91 dl 1.2 }
92    
93 dholmes 1.7 /**
94     * Adds the specified element to this queue.
95 dl 1.14 * @param o the element to add
96 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
97     * <tt>Collection.add</tt>).
98     *
99 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
100 dholmes 1.7 */
101     public boolean add(E o) {
102     return offer(o);
103 dl 1.2 }
104    
105 dl 1.3 public E take() throws InterruptedException {
106 dl 1.2 lock.lockInterruptibly();
107     try {
108     for (;;) {
109 dl 1.3 E first = q.peek();
110 dl 1.12 if (first == null) {
111 dl 1.4 available.await();
112 tim 1.13 } else {
113 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
114 dl 1.12 if (delay > 0) {
115     long tl = available.awaitNanos(delay);
116 tim 1.13 } else {
117 dl 1.3 E x = q.poll();
118 dl 1.2 assert x != null;
119     if (q.size() != 0)
120 dl 1.4 available.signalAll(); // wake up other takers
121 dl 1.2 return x;
122 tim 1.6
123 dl 1.2 }
124     }
125     }
126 tim 1.13 } finally {
127 dl 1.2 lock.unlock();
128     }
129     }
130    
131 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
132 dl 1.2 lock.lockInterruptibly();
133     long nanos = unit.toNanos(time);
134     try {
135     for (;;) {
136 dl 1.3 E first = q.peek();
137 dl 1.2 if (first == null) {
138     if (nanos <= 0)
139     return null;
140     else
141 dl 1.4 nanos = available.awaitNanos(nanos);
142 tim 1.13 } else {
143 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
144     if (delay > 0) {
145     if (delay > nanos)
146     delay = nanos;
147 dl 1.4 long timeLeft = available.awaitNanos(delay);
148 dl 1.2 nanos -= delay - timeLeft;
149 tim 1.13 } else {
150 dl 1.3 E x = q.poll();
151 dl 1.2 assert x != null;
152     if (q.size() != 0)
153 tim 1.6 available.signalAll();
154 dl 1.2 return x;
155     }
156     }
157     }
158 tim 1.13 } finally {
159 dl 1.2 lock.unlock();
160     }
161     }
162    
163    
164 dl 1.3 public E poll() {
165 dl 1.2 lock.lock();
166     try {
167 dl 1.3 E first = q.peek();
168 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
169     return null;
170     else {
171 dl 1.3 E x = q.poll();
172 dl 1.2 assert x != null;
173     if (q.size() != 0)
174 tim 1.6 available.signalAll();
175 dl 1.2 return x;
176     }
177 tim 1.13 } finally {
178 dl 1.2 lock.unlock();
179     }
180     }
181    
182 dl 1.3 public E peek() {
183 dl 1.2 lock.lock();
184     try {
185     return q.peek();
186 tim 1.13 } finally {
187 dl 1.2 lock.unlock();
188     }
189 tim 1.1 }
190    
191 dl 1.2 public int size() {
192     lock.lock();
193     try {
194     return q.size();
195 tim 1.13 } finally {
196 dl 1.2 lock.unlock();
197     }
198     }
199    
200 dl 1.17 public int drainTo(Collection<? super E> c) {
201     if (c == null)
202     throw new NullPointerException();
203     if (c == this)
204     throw new IllegalArgumentException();
205     lock.lock();
206     try {
207     int n = 0;
208     for (;;) {
209     E first = q.peek();
210     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
211     break;
212     c.add(q.poll());
213     ++n;
214     }
215     if (n > 0)
216     available.signalAll();
217     return n;
218     } finally {
219     lock.unlock();
220     }
221     }
222    
223     public int drainTo(Collection<? super E> c, int maxElements) {
224     if (c == null)
225     throw new NullPointerException();
226     if (c == this)
227     throw new IllegalArgumentException();
228     if (maxElements <= 0)
229     return 0;
230     lock.lock();
231     try {
232     int n = 0;
233     while (n < maxElements) {
234     E first = q.peek();
235     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
236     break;
237     c.add(q.poll());
238     ++n;
239     }
240     if (n > 0)
241     available.signalAll();
242     return n;
243     } finally {
244     lock.unlock();
245     }
246     }
247    
248 dholmes 1.7 /**
249     * Atomically removes all of the elements from this delay queue.
250     * The queue will be empty after this call returns.
251     */
252 dl 1.2 public void clear() {
253     lock.lock();
254     try {
255     q.clear();
256 tim 1.13 } finally {
257 dl 1.2 lock.unlock();
258     }
259     }
260 tim 1.1
261 dholmes 1.7 /**
262     * Always returns <tt>Integer.MAX_VALUE</tt> because
263     * a <tt>DelayQueue</tt> is not capacity constrained.
264     * @return <tt>Integer.MAX_VALUE</tt>
265     */
266 dl 1.2 public int remainingCapacity() {
267     return Integer.MAX_VALUE;
268 tim 1.1 }
269 dl 1.2
270     public Object[] toArray() {
271     lock.lock();
272     try {
273     return q.toArray();
274 tim 1.13 } finally {
275 dl 1.2 lock.unlock();
276     }
277 tim 1.1 }
278 dl 1.2
279     public <T> T[] toArray(T[] array) {
280     lock.lock();
281     try {
282     return q.toArray(array);
283 tim 1.13 } finally {
284 dl 1.2 lock.unlock();
285     }
286 tim 1.1 }
287    
288 dholmes 1.7 public boolean remove(Object o) {
289 dl 1.2 lock.lock();
290     try {
291 dholmes 1.7 return q.remove(o);
292 tim 1.13 } finally {
293 dl 1.2 lock.unlock();
294     }
295     }
296    
297 dholmes 1.7 /**
298     * Returns an iterator over the elements in this queue. The iterator
299 dl 1.8 * does not return the elements in any particular order. The
300 dl 1.15 * returned iterator is a thread-safe "fast-fail" iterator that will
301 dl 1.8 * throw {@link java.util.ConcurrentModificationException}
302     * upon detected interference.
303 dholmes 1.7 *
304     * @return an iterator over the elements in this queue.
305     */
306 dl 1.3 public Iterator<E> iterator() {
307 dl 1.2 lock.lock();
308     try {
309     return new Itr(q.iterator());
310 tim 1.13 } finally {
311 dl 1.2 lock.unlock();
312     }
313     }
314    
315 dl 1.3 private class Itr<E> implements Iterator<E> {
316     private final Iterator<E> iter;
317 tim 1.6 Itr(Iterator<E> i) {
318     iter = i;
319 dl 1.2 }
320    
321 tim 1.6 public boolean hasNext() {
322 dl 1.2 return iter.hasNext();
323 tim 1.6 }
324    
325     public E next() {
326 dl 1.2 lock.lock();
327     try {
328     return iter.next();
329 tim 1.13 } finally {
330 dl 1.2 lock.unlock();
331     }
332 tim 1.6 }
333    
334     public void remove() {
335 dl 1.2 lock.lock();
336     try {
337     iter.remove();
338 tim 1.13 } finally {
339 dl 1.2 lock.unlock();
340     }
341 tim 1.6 }
342 tim 1.1 }
343    
344     }