ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.11
Committed: Wed Aug 6 18:22:09 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
CVS Tags: JSR166_CR1
Changes since 1.10: +1 -1 lines
Log Message:
Fixes to minor errors found by DocCheck

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.4 * @since 1.5
20     * @author Doug Lea
21     */
22 tim 1.1
23 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24     implements BlockingQueue<E> {
25 tim 1.6
26 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
27 dl 1.4 private transient final Condition available = lock.newCondition();
28 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
29 tim 1.1
30 dl 1.4 /**
31 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 dl 1.4 */
33 tim 1.1 public DelayQueue() {}
34    
35 tim 1.6 /**
36 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
37 dholmes 1.7 * given collection of {@link Delayed} instances.
38     *
39     * @throws NullPointerException if <tt>c</tt> or any element within it
40     * is <tt>null</tt>
41 tim 1.6 *
42     */
43     public DelayQueue(Collection<? extends E> c) {
44     this.addAll(c);
45     }
46    
47 dholmes 1.7 /**
48     * Add the specified element to this delay queue.
49     *
50     * @return <tt>true</tt>
51     * @throws NullPointerException {@inheritDoc}
52     */
53     public boolean offer(E o) {
54 dl 1.2 lock.lock();
55     try {
56 dl 1.3 E first = q.peek();
57 dholmes 1.7 q.offer(o);
58     if (first == null || o.compareTo(first) < 0)
59 dl 1.4 available.signalAll();
60 dl 1.2 return true;
61     }
62     finally {
63     lock.unlock();
64     }
65     }
66    
67 dholmes 1.7
68     /**
69     * Adds the specified element to this delay queue. As the queue is
70     * unbounded this method will never block.
71     * @throws NullPointerException {@inheritDoc}
72     */
73     public void put(E o) {
74     offer(o);
75 dl 1.2 }
76    
77 dholmes 1.7 /**
78     * Adds the specified element to this priority queue. As the queue is
79     * unbounded this method will never block.
80     * @param o {@inheritDoc}
81 tim 1.11 * @param time This parameter is ignored as the method never blocks
82 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
83     * @throws NullPointerException {@inheritDoc}
84     * @return <tt>true</tt>
85     */
86     public boolean offer(E o, long time, TimeUnit unit) {
87     return offer(o);
88 dl 1.2 }
89    
90 dholmes 1.7 /**
91     * Adds the specified element to this queue.
92     * @return <tt>true</tt> (as per the general contract of
93     * <tt>Collection.add</tt>).
94     *
95     * @throws NullPointerException {@inheritDoc}
96     */
97     public boolean add(E o) {
98     return offer(o);
99 dl 1.2 }
100    
101 dl 1.3 public E take() throws InterruptedException {
102 dl 1.2 lock.lockInterruptibly();
103     try {
104     for (;;) {
105 dl 1.3 E first = q.peek();
106 dl 1.2 if (first == null)
107 dl 1.4 available.await();
108 dl 1.2 else {
109     long delay = first.getDelay(TimeUnit.NANOSECONDS);
110     if (delay > 0)
111 dl 1.4 available.awaitNanos(delay);
112 dl 1.2 else {
113 dl 1.3 E x = q.poll();
114 dl 1.2 assert x != null;
115     if (q.size() != 0)
116 dl 1.4 available.signalAll(); // wake up other takers
117 dl 1.2 return x;
118 tim 1.6
119 dl 1.2 }
120     }
121     }
122     }
123     finally {
124     lock.unlock();
125     }
126     }
127    
128 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
129 dl 1.2 lock.lockInterruptibly();
130     long nanos = unit.toNanos(time);
131     try {
132     for (;;) {
133 dl 1.3 E first = q.peek();
134 dl 1.2 if (first == null) {
135     if (nanos <= 0)
136     return null;
137     else
138 dl 1.4 nanos = available.awaitNanos(nanos);
139 dl 1.2 }
140     else {
141     long delay = first.getDelay(TimeUnit.NANOSECONDS);
142     if (delay > 0) {
143     if (delay > nanos)
144     delay = nanos;
145 dl 1.4 long timeLeft = available.awaitNanos(delay);
146 dl 1.2 nanos -= delay - timeLeft;
147     }
148     else {
149 dl 1.3 E x = q.poll();
150 dl 1.2 assert x != null;
151     if (q.size() != 0)
152 tim 1.6 available.signalAll();
153 dl 1.2 return x;
154     }
155     }
156     }
157     }
158     finally {
159     lock.unlock();
160     }
161     }
162    
163    
164 dl 1.3 public E poll() {
165 dl 1.2 lock.lock();
166     try {
167 dl 1.3 E first = q.peek();
168 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
169     return null;
170     else {
171 dl 1.3 E x = q.poll();
172 dl 1.2 assert x != null;
173     if (q.size() != 0)
174 tim 1.6 available.signalAll();
175 dl 1.2 return x;
176     }
177     }
178     finally {
179     lock.unlock();
180     }
181     }
182    
183 dl 1.3 public E peek() {
184 dl 1.2 lock.lock();
185     try {
186     return q.peek();
187     }
188     finally {
189     lock.unlock();
190     }
191 tim 1.1 }
192    
193 dl 1.2 public int size() {
194     lock.lock();
195     try {
196     return q.size();
197     }
198     finally {
199     lock.unlock();
200     }
201     }
202    
203 dholmes 1.7 /**
204     * Atomically removes all of the elements from this delay queue.
205     * The queue will be empty after this call returns.
206     */
207 dl 1.2 public void clear() {
208     lock.lock();
209     try {
210     q.clear();
211     }
212     finally {
213     lock.unlock();
214     }
215     }
216 tim 1.1
217 dholmes 1.7 /**
218     * Always returns <tt>Integer.MAX_VALUE</tt> because
219     * a <tt>DelayQueue</tt> is not capacity constrained.
220     * @return <tt>Integer.MAX_VALUE</tt>
221     */
222 dl 1.2 public int remainingCapacity() {
223     return Integer.MAX_VALUE;
224 tim 1.1 }
225 dl 1.2
226     public Object[] toArray() {
227     lock.lock();
228     try {
229     return q.toArray();
230     }
231     finally {
232     lock.unlock();
233     }
234 tim 1.1 }
235 dl 1.2
236     public <T> T[] toArray(T[] array) {
237     lock.lock();
238     try {
239     return q.toArray(array);
240     }
241     finally {
242     lock.unlock();
243     }
244 tim 1.1 }
245    
246 dholmes 1.7 /**
247     * Removes a single instance of the specified element from this
248     * queue, if it is present. More formally,
249     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
250     * o.equals(e))</tt>, if the queue contains one or more such
251     * elements. Returns <tt>true</tt> if the queue contained the
252     * specified element (or equivalently, if the queue changed as a
253     * result of the call).
254     *
255     * <p>This implementation iterates over the queue looking for the
256     * specified element. If it finds the element, it removes the element
257     * from the queue using the iterator's remove method.<p>
258     *
259     */
260     public boolean remove(Object o) {
261 dl 1.2 lock.lock();
262     try {
263 dholmes 1.7 return q.remove(o);
264 dl 1.2 }
265     finally {
266     lock.unlock();
267     }
268     }
269    
270 dholmes 1.7 /**
271     * Returns an iterator over the elements in this queue. The iterator
272 dl 1.8 * does not return the elements in any particular order. The
273     * returned iterator is a "fast-fail" iterator that will
274     * throw {@link java.util.ConcurrentModificationException}
275     * upon detected interference.
276 dholmes 1.7 *
277     * @return an iterator over the elements in this queue.
278     */
279 dl 1.3 public Iterator<E> iterator() {
280 dl 1.2 lock.lock();
281     try {
282     return new Itr(q.iterator());
283     }
284     finally {
285     lock.unlock();
286     }
287     }
288    
289 dl 1.3 private class Itr<E> implements Iterator<E> {
290     private final Iterator<E> iter;
291 tim 1.6 Itr(Iterator<E> i) {
292     iter = i;
293 dl 1.2 }
294    
295 tim 1.6 public boolean hasNext() {
296 dl 1.2 return iter.hasNext();
297 tim 1.6 }
298    
299     public E next() {
300 dl 1.2 lock.lock();
301     try {
302     return iter.next();
303     }
304     finally {
305     lock.unlock();
306     }
307 tim 1.6 }
308    
309     public void remove() {
310 dl 1.2 lock.lock();
311     try {
312     iter.remove();
313     }
314     finally {
315     lock.unlock();
316     }
317 tim 1.6 }
318 tim 1.1 }
319    
320     }