ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.13
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 9 months ago) by tim
Branch: MAIN
Changes since 1.12: +17 -34 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * @since 1.5
20 * @author Doug Lea
21 */
22
23 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24 implements BlockingQueue<E> {
25
26 private transient final ReentrantLock lock = new ReentrantLock();
27 private transient final Condition available = lock.newCondition();
28 private final PriorityQueue<E> q = new PriorityQueue<E>();
29
30 /**
31 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 */
33 public DelayQueue() {}
34
35 /**
36 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
37 * given collection of {@link Delayed} instances.
38 *
39 * @throws NullPointerException if <tt>c</tt> or any element within it
40 * is <tt>null</tt>
41 *
42 */
43 public DelayQueue(Collection<? extends E> c) {
44 this.addAll(c);
45 }
46
47 /**
48 * Add the specified element to this delay queue.
49 *
50 * @return <tt>true</tt>
51 * @throws NullPointerException {@inheritDoc}
52 */
53 public boolean offer(E o) {
54 lock.lock();
55 try {
56 E first = q.peek();
57 q.offer(o);
58 if (first == null || o.compareTo(first) < 0)
59 available.signalAll();
60 return true;
61 } finally {
62 lock.unlock();
63 }
64 }
65
66
67 /**
68 * Adds the specified element to this delay queue. As the queue is
69 * unbounded this method will never block.
70 * @throws NullPointerException {@inheritDoc}
71 */
72 public void put(E o) {
73 offer(o);
74 }
75
76 /**
77 * Adds the specified element to this priority queue. As the queue is
78 * unbounded this method will never block.
79 * @param o {@inheritDoc}
80 * @param time This parameter is ignored as the method never blocks
81 * @param unit This parameter is ignored as the method never blocks
82 * @throws NullPointerException {@inheritDoc}
83 * @return <tt>true</tt>
84 */
85 public boolean offer(E o, long time, TimeUnit unit) {
86 return offer(o);
87 }
88
89 /**
90 * Adds the specified element to this queue.
91 * @return <tt>true</tt> (as per the general contract of
92 * <tt>Collection.add</tt>).
93 *
94 * @throws NullPointerException {@inheritDoc}
95 */
96 public boolean add(E o) {
97 return offer(o);
98 }
99
100 public E take() throws InterruptedException {
101 lock.lockInterruptibly();
102 try {
103 for (;;) {
104 E first = q.peek();
105 if (first == null) {
106 available.await();
107 } else {
108 long delay = first.getDelay(TimeUnit.NANOSECONDS);
109 if (delay > 0) {
110 long tl = available.awaitNanos(delay);
111 } else {
112 E x = q.poll();
113 assert x != null;
114 if (q.size() != 0)
115 available.signalAll(); // wake up other takers
116 return x;
117
118 }
119 }
120 }
121 } finally {
122 lock.unlock();
123 }
124 }
125
126 public E poll(long time, TimeUnit unit) throws InterruptedException {
127 lock.lockInterruptibly();
128 long nanos = unit.toNanos(time);
129 try {
130 for (;;) {
131 E first = q.peek();
132 if (first == null) {
133 if (nanos <= 0)
134 return null;
135 else
136 nanos = available.awaitNanos(nanos);
137 } else {
138 long delay = first.getDelay(TimeUnit.NANOSECONDS);
139 if (delay > 0) {
140 if (delay > nanos)
141 delay = nanos;
142 long timeLeft = available.awaitNanos(delay);
143 nanos -= delay - timeLeft;
144 } else {
145 E x = q.poll();
146 assert x != null;
147 if (q.size() != 0)
148 available.signalAll();
149 return x;
150 }
151 }
152 }
153 } finally {
154 lock.unlock();
155 }
156 }
157
158
159 public E poll() {
160 lock.lock();
161 try {
162 E first = q.peek();
163 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
164 return null;
165 else {
166 E x = q.poll();
167 assert x != null;
168 if (q.size() != 0)
169 available.signalAll();
170 return x;
171 }
172 } finally {
173 lock.unlock();
174 }
175 }
176
177 public E peek() {
178 lock.lock();
179 try {
180 return q.peek();
181 } finally {
182 lock.unlock();
183 }
184 }
185
186 public int size() {
187 lock.lock();
188 try {
189 return q.size();
190 } finally {
191 lock.unlock();
192 }
193 }
194
195 /**
196 * Atomically removes all of the elements from this delay queue.
197 * The queue will be empty after this call returns.
198 */
199 public void clear() {
200 lock.lock();
201 try {
202 q.clear();
203 } finally {
204 lock.unlock();
205 }
206 }
207
208 /**
209 * Always returns <tt>Integer.MAX_VALUE</tt> because
210 * a <tt>DelayQueue</tt> is not capacity constrained.
211 * @return <tt>Integer.MAX_VALUE</tt>
212 */
213 public int remainingCapacity() {
214 return Integer.MAX_VALUE;
215 }
216
217 public Object[] toArray() {
218 lock.lock();
219 try {
220 return q.toArray();
221 } finally {
222 lock.unlock();
223 }
224 }
225
226 public <T> T[] toArray(T[] array) {
227 lock.lock();
228 try {
229 return q.toArray(array);
230 } finally {
231 lock.unlock();
232 }
233 }
234
235 /**
236 * Removes a single instance of the specified element from this
237 * queue, if it is present. More formally,
238 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
239 * o.equals(e))</tt>, if the queue contains one or more such
240 * elements. Returns <tt>true</tt> if the queue contained the
241 * specified element (or equivalently, if the queue changed as a
242 * result of the call).
243 *
244 * <p>This implementation iterates over the queue looking for the
245 * specified element. If it finds the element, it removes the element
246 * from the queue using the iterator's remove method.<p>
247 *
248 */
249 public boolean remove(Object o) {
250 lock.lock();
251 try {
252 return q.remove(o);
253 } finally {
254 lock.unlock();
255 }
256 }
257
258 /**
259 * Returns an iterator over the elements in this queue. The iterator
260 * does not return the elements in any particular order. The
261 * returned iterator is a "fast-fail" iterator that will
262 * throw {@link java.util.ConcurrentModificationException}
263 * upon detected interference.
264 *
265 * @return an iterator over the elements in this queue.
266 */
267 public Iterator<E> iterator() {
268 lock.lock();
269 try {
270 return new Itr(q.iterator());
271 } finally {
272 lock.unlock();
273 }
274 }
275
276 private class Itr<E> implements Iterator<E> {
277 private final Iterator<E> iter;
278 Itr(Iterator<E> i) {
279 iter = i;
280 }
281
282 public boolean hasNext() {
283 return iter.hasNext();
284 }
285
286 public E next() {
287 lock.lock();
288 try {
289 return iter.next();
290 } finally {
291 lock.unlock();
292 }
293 }
294
295 public void remove() {
296 lock.lock();
297 try {
298 iter.remove();
299 } finally {
300 lock.unlock();
301 }
302 }
303 }
304
305 }