ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.16
Committed: Mon Sep 15 12:02:46 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.15: +2 -2 lines
Log Message:
Fix some javadoc inconsistencies

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * <p>This class implements all of the <em>optional</em> methods
20 * of the {@link Collection} and {@link Iterator} interfaces.
21 * @since 1.5
22 * @author Doug Lea
23 */
24
25 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
26 implements BlockingQueue<E> {
27
28 private transient final ReentrantLock lock = new ReentrantLock();
29 private transient final Condition available = lock.newCondition();
30 private final PriorityQueue<E> q = new PriorityQueue<E>();
31
32 /**
33 * Creates a new <tt>DelayQueue</tt> that is initially empty.
34 */
35 public DelayQueue() {}
36
37 /**
38 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
39 * given collection of {@link Delayed} instances.
40 *
41 * @throws NullPointerException if <tt>c</tt> or any element within it
42 * is <tt>null</tt>
43 *
44 */
45 public DelayQueue(Collection<? extends E> c) {
46 this.addAll(c);
47 }
48
49 /**
50 * Inserts the specified element into this delay queue.
51 *
52 * @param o the element to add
53 * @return <tt>true</tt>
54 * @throws NullPointerException if the specified element is <tt>null</tt>.
55 */
56 public boolean offer(E o) {
57 lock.lock();
58 try {
59 E first = q.peek();
60 q.offer(o);
61 if (first == null || o.compareTo(first) < 0)
62 available.signalAll();
63 return true;
64 } finally {
65 lock.unlock();
66 }
67 }
68
69
70 /**
71 * Adds the specified element to this delay queue. As the queue is
72 * unbounded this method will never block.
73 * @param o the element to add
74 * @throws NullPointerException if the specified element is <tt>null</tt>.
75 */
76 public void put(E o) {
77 offer(o);
78 }
79
80 /**
81 * Inserts the specified element into this delay queue. As the queue is
82 * unbounded this method will never block.
83 * @param o the element to add
84 * @param timeout This parameter is ignored as the method never blocks
85 * @param unit This parameter is ignored as the method never blocks
86 * @return <tt>true</tt>
87 * @throws NullPointerException if the specified element is <tt>null</tt>.
88 */
89 public boolean offer(E o, long timeout, TimeUnit unit) {
90 return offer(o);
91 }
92
93 /**
94 * Adds the specified element to this queue.
95 * @param o the element to add
96 * @return <tt>true</tt> (as per the general contract of
97 * <tt>Collection.add</tt>).
98 *
99 * @throws NullPointerException if the specified element is <tt>null</tt>.
100 */
101 public boolean add(E o) {
102 return offer(o);
103 }
104
105 public E take() throws InterruptedException {
106 lock.lockInterruptibly();
107 try {
108 for (;;) {
109 E first = q.peek();
110 if (first == null) {
111 available.await();
112 } else {
113 long delay = first.getDelay(TimeUnit.NANOSECONDS);
114 if (delay > 0) {
115 long tl = available.awaitNanos(delay);
116 } else {
117 E x = q.poll();
118 assert x != null;
119 if (q.size() != 0)
120 available.signalAll(); // wake up other takers
121 return x;
122
123 }
124 }
125 }
126 } finally {
127 lock.unlock();
128 }
129 }
130
131 public E poll(long time, TimeUnit unit) throws InterruptedException {
132 lock.lockInterruptibly();
133 long nanos = unit.toNanos(time);
134 try {
135 for (;;) {
136 E first = q.peek();
137 if (first == null) {
138 if (nanos <= 0)
139 return null;
140 else
141 nanos = available.awaitNanos(nanos);
142 } else {
143 long delay = first.getDelay(TimeUnit.NANOSECONDS);
144 if (delay > 0) {
145 if (delay > nanos)
146 delay = nanos;
147 long timeLeft = available.awaitNanos(delay);
148 nanos -= delay - timeLeft;
149 } else {
150 E x = q.poll();
151 assert x != null;
152 if (q.size() != 0)
153 available.signalAll();
154 return x;
155 }
156 }
157 }
158 } finally {
159 lock.unlock();
160 }
161 }
162
163
164 public E poll() {
165 lock.lock();
166 try {
167 E first = q.peek();
168 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
169 return null;
170 else {
171 E x = q.poll();
172 assert x != null;
173 if (q.size() != 0)
174 available.signalAll();
175 return x;
176 }
177 } finally {
178 lock.unlock();
179 }
180 }
181
182 public E peek() {
183 lock.lock();
184 try {
185 return q.peek();
186 } finally {
187 lock.unlock();
188 }
189 }
190
191 public int size() {
192 lock.lock();
193 try {
194 return q.size();
195 } finally {
196 lock.unlock();
197 }
198 }
199
200 /**
201 * Atomically removes all of the elements from this delay queue.
202 * The queue will be empty after this call returns.
203 */
204 public void clear() {
205 lock.lock();
206 try {
207 q.clear();
208 } finally {
209 lock.unlock();
210 }
211 }
212
213 /**
214 * Always returns <tt>Integer.MAX_VALUE</tt> because
215 * a <tt>DelayQueue</tt> is not capacity constrained.
216 * @return <tt>Integer.MAX_VALUE</tt>
217 */
218 public int remainingCapacity() {
219 return Integer.MAX_VALUE;
220 }
221
222 public Object[] toArray() {
223 lock.lock();
224 try {
225 return q.toArray();
226 } finally {
227 lock.unlock();
228 }
229 }
230
231 public <T> T[] toArray(T[] array) {
232 lock.lock();
233 try {
234 return q.toArray(array);
235 } finally {
236 lock.unlock();
237 }
238 }
239
240 public boolean remove(Object o) {
241 lock.lock();
242 try {
243 return q.remove(o);
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 /**
250 * Returns an iterator over the elements in this queue. The iterator
251 * does not return the elements in any particular order. The
252 * returned iterator is a thread-safe "fast-fail" iterator that will
253 * throw {@link java.util.ConcurrentModificationException}
254 * upon detected interference.
255 *
256 * @return an iterator over the elements in this queue.
257 */
258 public Iterator<E> iterator() {
259 lock.lock();
260 try {
261 return new Itr(q.iterator());
262 } finally {
263 lock.unlock();
264 }
265 }
266
267 private class Itr<E> implements Iterator<E> {
268 private final Iterator<E> iter;
269 Itr(Iterator<E> i) {
270 iter = i;
271 }
272
273 public boolean hasNext() {
274 return iter.hasNext();
275 }
276
277 public E next() {
278 lock.lock();
279 try {
280 return iter.next();
281 } finally {
282 lock.unlock();
283 }
284 }
285
286 public void remove() {
287 lock.lock();
288 try {
289 iter.remove();
290 } finally {
291 lock.unlock();
292 }
293 }
294 }
295
296 }