ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.14
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.13: +12 -23 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.4 * @since 1.5
20     * @author Doug Lea
21     */
22 tim 1.1
23 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24     implements BlockingQueue<E> {
25 tim 1.6
26 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
27 dl 1.4 private transient final Condition available = lock.newCondition();
28 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
29 tim 1.1
30 dl 1.4 /**
31 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 dl 1.4 */
33 tim 1.1 public DelayQueue() {}
34    
35 tim 1.6 /**
36 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
37 dholmes 1.7 * given collection of {@link Delayed} instances.
38     *
39     * @throws NullPointerException if <tt>c</tt> or any element within it
40     * is <tt>null</tt>
41 tim 1.6 *
42     */
43     public DelayQueue(Collection<? extends E> c) {
44     this.addAll(c);
45     }
46    
47 dholmes 1.7 /**
48 dl 1.14 * Inserts the specified element to this delay queue.
49 dholmes 1.7 *
50 dl 1.14 * @param o the element to add
51 dholmes 1.7 * @return <tt>true</tt>
52 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
53 dholmes 1.7 */
54     public boolean offer(E o) {
55 dl 1.2 lock.lock();
56     try {
57 dl 1.3 E first = q.peek();
58 dholmes 1.7 q.offer(o);
59     if (first == null || o.compareTo(first) < 0)
60 dl 1.4 available.signalAll();
61 dl 1.2 return true;
62 tim 1.13 } finally {
63 dl 1.2 lock.unlock();
64     }
65     }
66    
67 dholmes 1.7
68     /**
69     * Adds the specified element to this delay queue. As the queue is
70     * unbounded this method will never block.
71 dl 1.14 * @param o the element to add
72     * @throws NullPointerException if the specified element is <tt>null</tt>.
73 dholmes 1.7 */
74     public void put(E o) {
75     offer(o);
76 dl 1.2 }
77    
78 dholmes 1.7 /**
79 dl 1.14 * Adds the specified element to this delay queue. As the queue is
80 dholmes 1.7 * unbounded this method will never block.
81 dl 1.14 * @param o the element to add
82     * @param timeout This parameter is ignored as the method never blocks
83 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
84     * @return <tt>true</tt>
85 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
86 dholmes 1.7 */
87 dl 1.14 public boolean offer(E o, long timeout, TimeUnit unit) {
88 dholmes 1.7 return offer(o);
89 dl 1.2 }
90    
91 dholmes 1.7 /**
92     * Adds the specified element to this queue.
93 dl 1.14 * @param o the element to add
94 dholmes 1.7 * @return <tt>true</tt> (as per the general contract of
95     * <tt>Collection.add</tt>).
96     *
97 dl 1.14 * @throws NullPointerException if the specified element is <tt>null</tt>.
98 dholmes 1.7 */
99     public boolean add(E o) {
100     return offer(o);
101 dl 1.2 }
102    
103 dl 1.3 public E take() throws InterruptedException {
104 dl 1.2 lock.lockInterruptibly();
105     try {
106     for (;;) {
107 dl 1.3 E first = q.peek();
108 dl 1.12 if (first == null) {
109 dl 1.4 available.await();
110 tim 1.13 } else {
111 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
112 dl 1.12 if (delay > 0) {
113     long tl = available.awaitNanos(delay);
114 tim 1.13 } else {
115 dl 1.3 E x = q.poll();
116 dl 1.2 assert x != null;
117     if (q.size() != 0)
118 dl 1.4 available.signalAll(); // wake up other takers
119 dl 1.2 return x;
120 tim 1.6
121 dl 1.2 }
122     }
123     }
124 tim 1.13 } finally {
125 dl 1.2 lock.unlock();
126     }
127     }
128    
129 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
130 dl 1.2 lock.lockInterruptibly();
131     long nanos = unit.toNanos(time);
132     try {
133     for (;;) {
134 dl 1.3 E first = q.peek();
135 dl 1.2 if (first == null) {
136     if (nanos <= 0)
137     return null;
138     else
139 dl 1.4 nanos = available.awaitNanos(nanos);
140 tim 1.13 } else {
141 dl 1.2 long delay = first.getDelay(TimeUnit.NANOSECONDS);
142     if (delay > 0) {
143     if (delay > nanos)
144     delay = nanos;
145 dl 1.4 long timeLeft = available.awaitNanos(delay);
146 dl 1.2 nanos -= delay - timeLeft;
147 tim 1.13 } else {
148 dl 1.3 E x = q.poll();
149 dl 1.2 assert x != null;
150     if (q.size() != 0)
151 tim 1.6 available.signalAll();
152 dl 1.2 return x;
153     }
154     }
155     }
156 tim 1.13 } finally {
157 dl 1.2 lock.unlock();
158     }
159     }
160    
161    
162 dl 1.3 public E poll() {
163 dl 1.2 lock.lock();
164     try {
165 dl 1.3 E first = q.peek();
166 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
167     return null;
168     else {
169 dl 1.3 E x = q.poll();
170 dl 1.2 assert x != null;
171     if (q.size() != 0)
172 tim 1.6 available.signalAll();
173 dl 1.2 return x;
174     }
175 tim 1.13 } finally {
176 dl 1.2 lock.unlock();
177     }
178     }
179    
180 dl 1.3 public E peek() {
181 dl 1.2 lock.lock();
182     try {
183     return q.peek();
184 tim 1.13 } finally {
185 dl 1.2 lock.unlock();
186     }
187 tim 1.1 }
188    
189 dl 1.2 public int size() {
190     lock.lock();
191     try {
192     return q.size();
193 tim 1.13 } finally {
194 dl 1.2 lock.unlock();
195     }
196     }
197    
198 dholmes 1.7 /**
199     * Atomically removes all of the elements from this delay queue.
200     * The queue will be empty after this call returns.
201     */
202 dl 1.2 public void clear() {
203     lock.lock();
204     try {
205     q.clear();
206 tim 1.13 } finally {
207 dl 1.2 lock.unlock();
208     }
209     }
210 tim 1.1
211 dholmes 1.7 /**
212     * Always returns <tt>Integer.MAX_VALUE</tt> because
213     * a <tt>DelayQueue</tt> is not capacity constrained.
214     * @return <tt>Integer.MAX_VALUE</tt>
215     */
216 dl 1.2 public int remainingCapacity() {
217     return Integer.MAX_VALUE;
218 tim 1.1 }
219 dl 1.2
220     public Object[] toArray() {
221     lock.lock();
222     try {
223     return q.toArray();
224 tim 1.13 } finally {
225 dl 1.2 lock.unlock();
226     }
227 tim 1.1 }
228 dl 1.2
229     public <T> T[] toArray(T[] array) {
230     lock.lock();
231     try {
232     return q.toArray(array);
233 tim 1.13 } finally {
234 dl 1.2 lock.unlock();
235     }
236 tim 1.1 }
237    
238 dholmes 1.7 public boolean remove(Object o) {
239 dl 1.2 lock.lock();
240     try {
241 dholmes 1.7 return q.remove(o);
242 tim 1.13 } finally {
243 dl 1.2 lock.unlock();
244     }
245     }
246    
247 dholmes 1.7 /**
248     * Returns an iterator over the elements in this queue. The iterator
249 dl 1.8 * does not return the elements in any particular order. The
250     * returned iterator is a "fast-fail" iterator that will
251     * throw {@link java.util.ConcurrentModificationException}
252     * upon detected interference.
253 dholmes 1.7 *
254     * @return an iterator over the elements in this queue.
255     */
256 dl 1.3 public Iterator<E> iterator() {
257 dl 1.2 lock.lock();
258     try {
259     return new Itr(q.iterator());
260 tim 1.13 } finally {
261 dl 1.2 lock.unlock();
262     }
263     }
264    
265 dl 1.3 private class Itr<E> implements Iterator<E> {
266     private final Iterator<E> iter;
267 tim 1.6 Itr(Iterator<E> i) {
268     iter = i;
269 dl 1.2 }
270    
271 tim 1.6 public boolean hasNext() {
272 dl 1.2 return iter.hasNext();
273 tim 1.6 }
274    
275     public E next() {
276 dl 1.2 lock.lock();
277     try {
278     return iter.next();
279 tim 1.13 } finally {
280 dl 1.2 lock.unlock();
281     }
282 tim 1.6 }
283    
284     public void remove() {
285 dl 1.2 lock.lock();
286     try {
287     iter.remove();
288 tim 1.13 } finally {
289 dl 1.2 lock.unlock();
290     }
291 tim 1.6 }
292 tim 1.1 }
293    
294     }