ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.14
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.13: +12 -23 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 * elements, in which an element can only be taken when its delay has expired.
15 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16 * expired furthest in the past - if no delay has expired there is no head and
17 * <tt>poll</tt> will return <tt>null</tt>.
18 * This queue does not permit <tt>null</tt> elements.
19 * @since 1.5
20 * @author Doug Lea
21 */
22
23 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24 implements BlockingQueue<E> {
25
26 private transient final ReentrantLock lock = new ReentrantLock();
27 private transient final Condition available = lock.newCondition();
28 private final PriorityQueue<E> q = new PriorityQueue<E>();
29
30 /**
31 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 */
33 public DelayQueue() {}
34
35 /**
36 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
37 * given collection of {@link Delayed} instances.
38 *
39 * @throws NullPointerException if <tt>c</tt> or any element within it
40 * is <tt>null</tt>
41 *
42 */
43 public DelayQueue(Collection<? extends E> c) {
44 this.addAll(c);
45 }
46
47 /**
48 * Inserts the specified element to this delay queue.
49 *
50 * @param o the element to add
51 * @return <tt>true</tt>
52 * @throws NullPointerException if the specified element is <tt>null</tt>.
53 */
54 public boolean offer(E o) {
55 lock.lock();
56 try {
57 E first = q.peek();
58 q.offer(o);
59 if (first == null || o.compareTo(first) < 0)
60 available.signalAll();
61 return true;
62 } finally {
63 lock.unlock();
64 }
65 }
66
67
68 /**
69 * Adds the specified element to this delay queue. As the queue is
70 * unbounded this method will never block.
71 * @param o the element to add
72 * @throws NullPointerException if the specified element is <tt>null</tt>.
73 */
74 public void put(E o) {
75 offer(o);
76 }
77
78 /**
79 * Adds the specified element to this delay queue. As the queue is
80 * unbounded this method will never block.
81 * @param o the element to add
82 * @param timeout This parameter is ignored as the method never blocks
83 * @param unit This parameter is ignored as the method never blocks
84 * @return <tt>true</tt>
85 * @throws NullPointerException if the specified element is <tt>null</tt>.
86 */
87 public boolean offer(E o, long timeout, TimeUnit unit) {
88 return offer(o);
89 }
90
91 /**
92 * Adds the specified element to this queue.
93 * @param o the element to add
94 * @return <tt>true</tt> (as per the general contract of
95 * <tt>Collection.add</tt>).
96 *
97 * @throws NullPointerException if the specified element is <tt>null</tt>.
98 */
99 public boolean add(E o) {
100 return offer(o);
101 }
102
103 public E take() throws InterruptedException {
104 lock.lockInterruptibly();
105 try {
106 for (;;) {
107 E first = q.peek();
108 if (first == null) {
109 available.await();
110 } else {
111 long delay = first.getDelay(TimeUnit.NANOSECONDS);
112 if (delay > 0) {
113 long tl = available.awaitNanos(delay);
114 } else {
115 E x = q.poll();
116 assert x != null;
117 if (q.size() != 0)
118 available.signalAll(); // wake up other takers
119 return x;
120
121 }
122 }
123 }
124 } finally {
125 lock.unlock();
126 }
127 }
128
129 public E poll(long time, TimeUnit unit) throws InterruptedException {
130 lock.lockInterruptibly();
131 long nanos = unit.toNanos(time);
132 try {
133 for (;;) {
134 E first = q.peek();
135 if (first == null) {
136 if (nanos <= 0)
137 return null;
138 else
139 nanos = available.awaitNanos(nanos);
140 } else {
141 long delay = first.getDelay(TimeUnit.NANOSECONDS);
142 if (delay > 0) {
143 if (delay > nanos)
144 delay = nanos;
145 long timeLeft = available.awaitNanos(delay);
146 nanos -= delay - timeLeft;
147 } else {
148 E x = q.poll();
149 assert x != null;
150 if (q.size() != 0)
151 available.signalAll();
152 return x;
153 }
154 }
155 }
156 } finally {
157 lock.unlock();
158 }
159 }
160
161
162 public E poll() {
163 lock.lock();
164 try {
165 E first = q.peek();
166 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
167 return null;
168 else {
169 E x = q.poll();
170 assert x != null;
171 if (q.size() != 0)
172 available.signalAll();
173 return x;
174 }
175 } finally {
176 lock.unlock();
177 }
178 }
179
180 public E peek() {
181 lock.lock();
182 try {
183 return q.peek();
184 } finally {
185 lock.unlock();
186 }
187 }
188
189 public int size() {
190 lock.lock();
191 try {
192 return q.size();
193 } finally {
194 lock.unlock();
195 }
196 }
197
198 /**
199 * Atomically removes all of the elements from this delay queue.
200 * The queue will be empty after this call returns.
201 */
202 public void clear() {
203 lock.lock();
204 try {
205 q.clear();
206 } finally {
207 lock.unlock();
208 }
209 }
210
211 /**
212 * Always returns <tt>Integer.MAX_VALUE</tt> because
213 * a <tt>DelayQueue</tt> is not capacity constrained.
214 * @return <tt>Integer.MAX_VALUE</tt>
215 */
216 public int remainingCapacity() {
217 return Integer.MAX_VALUE;
218 }
219
220 public Object[] toArray() {
221 lock.lock();
222 try {
223 return q.toArray();
224 } finally {
225 lock.unlock();
226 }
227 }
228
229 public <T> T[] toArray(T[] array) {
230 lock.lock();
231 try {
232 return q.toArray(array);
233 } finally {
234 lock.unlock();
235 }
236 }
237
238 public boolean remove(Object o) {
239 lock.lock();
240 try {
241 return q.remove(o);
242 } finally {
243 lock.unlock();
244 }
245 }
246
247 /**
248 * Returns an iterator over the elements in this queue. The iterator
249 * does not return the elements in any particular order. The
250 * returned iterator is a "fast-fail" iterator that will
251 * throw {@link java.util.ConcurrentModificationException}
252 * upon detected interference.
253 *
254 * @return an iterator over the elements in this queue.
255 */
256 public Iterator<E> iterator() {
257 lock.lock();
258 try {
259 return new Itr(q.iterator());
260 } finally {
261 lock.unlock();
262 }
263 }
264
265 private class Itr<E> implements Iterator<E> {
266 private final Iterator<E> iter;
267 Itr(Iterator<E> i) {
268 iter = i;
269 }
270
271 public boolean hasNext() {
272 return iter.hasNext();
273 }
274
275 public E next() {
276 lock.lock();
277 try {
278 return iter.next();
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public void remove() {
285 lock.lock();
286 try {
287 iter.remove();
288 } finally {
289 lock.unlock();
290 }
291 }
292 }
293
294 }