ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.7
Committed: Wed Aug 6 01:57:53 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.6: +80 -16 lines
Log Message:
Final major updates to Collection related classes.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.7 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14     * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.4 * @since 1.5
20     * @author Doug Lea
21     */
22 tim 1.1
23 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24     implements BlockingQueue<E> {
25 tim 1.6
26 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
27 dl 1.4 private transient final Condition available = lock.newCondition();
28 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
29 tim 1.1
30 dl 1.4 /**
31 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 dl 1.4 */
33 tim 1.1 public DelayQueue() {}
34    
35 tim 1.6 /**
36 dholmes 1.7 * Creates a <tt>DelayQueue</tt> initialy containing the elements of the
37     * given collection of {@link Delayed} instances.
38     *
39     * @throws NullPointerException if <tt>c</tt> or any element within it
40     * is <tt>null</tt>
41 tim 1.6 *
42     * @fixme Should the body be wrapped with try-lock-finally-unlock?
43     */
44     public DelayQueue(Collection<? extends E> c) {
45     this.addAll(c);
46     }
47    
48 dholmes 1.7 /**
49     * Add the specified element to this delay queue.
50     *
51     * @return <tt>true</tt>
52     * @throws NullPointerException {@inheritDoc}
53     */
54     public boolean offer(E o) {
55 dl 1.2 lock.lock();
56     try {
57 dl 1.3 E first = q.peek();
58 dholmes 1.7 q.offer(o);
59     if (first == null || o.compareTo(first) < 0)
60 dl 1.4 available.signalAll();
61 dl 1.2 return true;
62     }
63     finally {
64     lock.unlock();
65     }
66     }
67    
68 dholmes 1.7
69     /**
70     * Adds the specified element to this delay queue. As the queue is
71     * unbounded this method will never block.
72     * @throws NullPointerException {@inheritDoc}
73     */
74     public void put(E o) {
75     offer(o);
76 dl 1.2 }
77    
78 dholmes 1.7 /**
79     * Adds the specified element to this priority queue. As the queue is
80     * unbounded this method will never block.
81     * @param o {@inheritDoc}
82     * @param timeout This parameter is ignored as the method never blocks
83     * @param unit This parameter is ignored as the method never blocks
84     * @throws NullPointerException {@inheritDoc}
85     * @return <tt>true</tt>
86     */
87     public boolean offer(E o, long time, TimeUnit unit) {
88     return offer(o);
89 dl 1.2 }
90    
91 dholmes 1.7 /**
92     * Adds the specified element to this queue.
93     * @return <tt>true</tt> (as per the general contract of
94     * <tt>Collection.add</tt>).
95     *
96     * @throws NullPointerException {@inheritDoc}
97     */
98     public boolean add(E o) {
99     return offer(o);
100 dl 1.2 }
101    
102 dl 1.3 public E take() throws InterruptedException {
103 dl 1.2 lock.lockInterruptibly();
104     try {
105     for (;;) {
106 dl 1.3 E first = q.peek();
107 dl 1.2 if (first == null)
108 dl 1.4 available.await();
109 dl 1.2 else {
110     long delay = first.getDelay(TimeUnit.NANOSECONDS);
111     if (delay > 0)
112 dl 1.4 available.awaitNanos(delay);
113 dl 1.2 else {
114 dl 1.3 E x = q.poll();
115 dl 1.2 assert x != null;
116     if (q.size() != 0)
117 dl 1.4 available.signalAll(); // wake up other takers
118 dl 1.2 return x;
119 tim 1.6
120 dl 1.2 }
121     }
122     }
123     }
124     finally {
125     lock.unlock();
126     }
127     }
128    
129 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
130 dl 1.2 lock.lockInterruptibly();
131     long nanos = unit.toNanos(time);
132     try {
133     for (;;) {
134 dl 1.3 E first = q.peek();
135 dl 1.2 if (first == null) {
136     if (nanos <= 0)
137     return null;
138     else
139 dl 1.4 nanos = available.awaitNanos(nanos);
140 dl 1.2 }
141     else {
142     long delay = first.getDelay(TimeUnit.NANOSECONDS);
143     if (delay > 0) {
144     if (delay > nanos)
145     delay = nanos;
146 dl 1.4 long timeLeft = available.awaitNanos(delay);
147 dl 1.2 nanos -= delay - timeLeft;
148     }
149     else {
150 dl 1.3 E x = q.poll();
151 dl 1.2 assert x != null;
152     if (q.size() != 0)
153 tim 1.6 available.signalAll();
154 dl 1.2 return x;
155     }
156     }
157     }
158     }
159     finally {
160     lock.unlock();
161     }
162     }
163    
164    
165 dl 1.3 public E poll() {
166 dl 1.2 lock.lock();
167     try {
168 dl 1.3 E first = q.peek();
169 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
170     return null;
171     else {
172 dl 1.3 E x = q.poll();
173 dl 1.2 assert x != null;
174     if (q.size() != 0)
175 tim 1.6 available.signalAll();
176 dl 1.2 return x;
177     }
178     }
179     finally {
180     lock.unlock();
181     }
182     }
183    
184 dl 1.3 public E peek() {
185 dl 1.2 lock.lock();
186     try {
187     return q.peek();
188     }
189     finally {
190     lock.unlock();
191     }
192 tim 1.1 }
193    
194 dl 1.2 public int size() {
195     lock.lock();
196     try {
197     return q.size();
198     }
199     finally {
200     lock.unlock();
201     }
202     }
203    
204 dholmes 1.7 /**
205     * Atomically removes all of the elements from this delay queue.
206     * The queue will be empty after this call returns.
207     */
208 dl 1.2 public void clear() {
209     lock.lock();
210     try {
211     q.clear();
212     }
213     finally {
214     lock.unlock();
215     }
216     }
217 tim 1.1
218 dholmes 1.7 /**
219     * Always returns <tt>Integer.MAX_VALUE</tt> because
220     * a <tt>DelayQueue</tt> is not capacity constrained.
221     * @return <tt>Integer.MAX_VALUE</tt>
222     */
223 dl 1.2 public int remainingCapacity() {
224     return Integer.MAX_VALUE;
225 tim 1.1 }
226 dl 1.2
227     public Object[] toArray() {
228     lock.lock();
229     try {
230     return q.toArray();
231     }
232     finally {
233     lock.unlock();
234     }
235 tim 1.1 }
236 dl 1.2
237     public <T> T[] toArray(T[] array) {
238     lock.lock();
239     try {
240     return q.toArray(array);
241     }
242     finally {
243     lock.unlock();
244     }
245 tim 1.1 }
246    
247 dholmes 1.7 /**
248     * Removes a single instance of the specified element from this
249     * queue, if it is present. More formally,
250     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
251     * o.equals(e))</tt>, if the queue contains one or more such
252     * elements. Returns <tt>true</tt> if the queue contained the
253     * specified element (or equivalently, if the queue changed as a
254     * result of the call).
255     *
256     * <p>This implementation iterates over the queue looking for the
257     * specified element. If it finds the element, it removes the element
258     * from the queue using the iterator's remove method.<p>
259     *
260     */
261     public boolean remove(Object o) {
262 dl 1.2 lock.lock();
263     try {
264 dholmes 1.7 return q.remove(o);
265 dl 1.2 }
266     finally {
267     lock.unlock();
268     }
269     }
270    
271 dholmes 1.7 /**
272     * Returns an iterator over the elements in this queue. The iterator
273     * does not return the elements in any particular order.
274     *
275     * @return an iterator over the elements in this queue.
276     */
277 dl 1.3 public Iterator<E> iterator() {
278 dl 1.2 lock.lock();
279     try {
280     return new Itr(q.iterator());
281     }
282     finally {
283     lock.unlock();
284     }
285     }
286    
287 dl 1.3 private class Itr<E> implements Iterator<E> {
288     private final Iterator<E> iter;
289 tim 1.6 Itr(Iterator<E> i) {
290     iter = i;
291 dl 1.2 }
292    
293 tim 1.6 public boolean hasNext() {
294 dl 1.2 return iter.hasNext();
295 tim 1.6 }
296    
297     public E next() {
298 dl 1.2 lock.lock();
299     try {
300     return iter.next();
301     }
302     finally {
303     lock.unlock();
304     }
305 tim 1.6 }
306    
307     public void remove() {
308 dl 1.2 lock.lock();
309     try {
310     iter.remove();
311     }
312     finally {
313     lock.unlock();
314     }
315 tim 1.6 }
316 tim 1.1 }
317    
318     }