ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.10
Committed: Wed Aug 6 16:08:49 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.9: +0 -1 lines
Log Message:
Misc doc touch-ups

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * @since 1.5
20 * @author Doug Lea
21 */
22
23 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24 implements BlockingQueue<E> {
25
26 private transient final ReentrantLock lock = new ReentrantLock();
27 private transient final Condition available = lock.newCondition();
28 private final PriorityQueue<E> q = new PriorityQueue<E>();
29
30 /**
31 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 */
33 public DelayQueue() {}
34
35 /**
36 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
37 * given collection of {@link Delayed} instances.
38 *
39 * @throws NullPointerException if <tt>c</tt> or any element within it
40 * is <tt>null</tt>
41 *
42 */
43 public DelayQueue(Collection<? extends E> c) {
44 this.addAll(c);
45 }
46
47 /**
48 * Add the specified element to this delay queue.
49 *
50 * @return <tt>true</tt>
51 * @throws NullPointerException {@inheritDoc}
52 */
53 public boolean offer(E o) {
54 lock.lock();
55 try {
56 E first = q.peek();
57 q.offer(o);
58 if (first == null || o.compareTo(first) < 0)
59 available.signalAll();
60 return true;
61 }
62 finally {
63 lock.unlock();
64 }
65 }
66
67
68 /**
69 * Adds the specified element to this delay queue. As the queue is
70 * unbounded this method will never block.
71 * @throws NullPointerException {@inheritDoc}
72 */
73 public void put(E o) {
74 offer(o);
75 }
76
77 /**
78 * Adds the specified element to this priority queue. As the queue is
79 * unbounded this method will never block.
80 * @param o {@inheritDoc}
81 * @param timeout This parameter is ignored as the method never blocks
82 * @param unit This parameter is ignored as the method never blocks
83 * @throws NullPointerException {@inheritDoc}
84 * @return <tt>true</tt>
85 */
86 public boolean offer(E o, long time, TimeUnit unit) {
87 return offer(o);
88 }
89
90 /**
91 * Adds the specified element to this queue.
92 * @return <tt>true</tt> (as per the general contract of
93 * <tt>Collection.add</tt>).
94 *
95 * @throws NullPointerException {@inheritDoc}
96 */
97 public boolean add(E o) {
98 return offer(o);
99 }
100
101 public E take() throws InterruptedException {
102 lock.lockInterruptibly();
103 try {
104 for (;;) {
105 E first = q.peek();
106 if (first == null)
107 available.await();
108 else {
109 long delay = first.getDelay(TimeUnit.NANOSECONDS);
110 if (delay > 0)
111 available.awaitNanos(delay);
112 else {
113 E x = q.poll();
114 assert x != null;
115 if (q.size() != 0)
116 available.signalAll(); // wake up other takers
117 return x;
118
119 }
120 }
121 }
122 }
123 finally {
124 lock.unlock();
125 }
126 }
127
128 public E poll(long time, TimeUnit unit) throws InterruptedException {
129 lock.lockInterruptibly();
130 long nanos = unit.toNanos(time);
131 try {
132 for (;;) {
133 E first = q.peek();
134 if (first == null) {
135 if (nanos <= 0)
136 return null;
137 else
138 nanos = available.awaitNanos(nanos);
139 }
140 else {
141 long delay = first.getDelay(TimeUnit.NANOSECONDS);
142 if (delay > 0) {
143 if (delay > nanos)
144 delay = nanos;
145 long timeLeft = available.awaitNanos(delay);
146 nanos -= delay - timeLeft;
147 }
148 else {
149 E x = q.poll();
150 assert x != null;
151 if (q.size() != 0)
152 available.signalAll();
153 return x;
154 }
155 }
156 }
157 }
158 finally {
159 lock.unlock();
160 }
161 }
162
163
164 public E poll() {
165 lock.lock();
166 try {
167 E first = q.peek();
168 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
169 return null;
170 else {
171 E x = q.poll();
172 assert x != null;
173 if (q.size() != 0)
174 available.signalAll();
175 return x;
176 }
177 }
178 finally {
179 lock.unlock();
180 }
181 }
182
183 public E peek() {
184 lock.lock();
185 try {
186 return q.peek();
187 }
188 finally {
189 lock.unlock();
190 }
191 }
192
193 public int size() {
194 lock.lock();
195 try {
196 return q.size();
197 }
198 finally {
199 lock.unlock();
200 }
201 }
202
203 /**
204 * Atomically removes all of the elements from this delay queue.
205 * The queue will be empty after this call returns.
206 */
207 public void clear() {
208 lock.lock();
209 try {
210 q.clear();
211 }
212 finally {
213 lock.unlock();
214 }
215 }
216
217 /**
218 * Always returns <tt>Integer.MAX_VALUE</tt> because
219 * a <tt>DelayQueue</tt> is not capacity constrained.
220 * @return <tt>Integer.MAX_VALUE</tt>
221 */
222 public int remainingCapacity() {
223 return Integer.MAX_VALUE;
224 }
225
226 public Object[] toArray() {
227 lock.lock();
228 try {
229 return q.toArray();
230 }
231 finally {
232 lock.unlock();
233 }
234 }
235
236 public <T> T[] toArray(T[] array) {
237 lock.lock();
238 try {
239 return q.toArray(array);
240 }
241 finally {
242 lock.unlock();
243 }
244 }
245
246 /**
247 * Removes a single instance of the specified element from this
248 * queue, if it is present. More formally,
249 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
250 * o.equals(e))</tt>, if the queue contains one or more such
251 * elements. Returns <tt>true</tt> if the queue contained the
252 * specified element (or equivalently, if the queue changed as a
253 * result of the call).
254 *
255 * <p>This implementation iterates over the queue looking for the
256 * specified element. If it finds the element, it removes the element
257 * from the queue using the iterator's remove method.<p>
258 *
259 */
260 public boolean remove(Object o) {
261 lock.lock();
262 try {
263 return q.remove(o);
264 }
265 finally {
266 lock.unlock();
267 }
268 }
269
270 /**
271 * Returns an iterator over the elements in this queue. The iterator
272 * does not return the elements in any particular order. The
273 * returned iterator is a "fast-fail" iterator that will
274 * throw {@link java.util.ConcurrentModificationException}
275 * upon detected interference.
276 *
277 * @return an iterator over the elements in this queue.
278 */
279 public Iterator<E> iterator() {
280 lock.lock();
281 try {
282 return new Itr(q.iterator());
283 }
284 finally {
285 lock.unlock();
286 }
287 }
288
289 private class Itr<E> implements Iterator<E> {
290 private final Iterator<E> iter;
291 Itr(Iterator<E> i) {
292 iter = i;
293 }
294
295 public boolean hasNext() {
296 return iter.hasNext();
297 }
298
299 public E next() {
300 lock.lock();
301 try {
302 return iter.next();
303 }
304 finally {
305 lock.unlock();
306 }
307 }
308
309 public void remove() {
310 lock.lock();
311 try {
312 iter.remove();
313 }
314 finally {
315 lock.unlock();
316 }
317 }
318 }
319
320 }