ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.12
Committed: Thu Aug 7 16:00:28 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.11: +5 -3 lines
Log Message:
ScheduledExecutor must prestart core threads

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9 dl 1.5 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.9 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
14 dholmes 1.7 * elements, in which an element can only be taken when its delay has expired.
15     * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
16     * expired furthest in the past - if no delay has expired there is no head and
17     * <tt>poll</tt> will return <tt>null</tt>.
18     * This queue does not permit <tt>null</tt> elements.
19 dl 1.4 * @since 1.5
20     * @author Doug Lea
21     */
22 tim 1.1
23 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
24     implements BlockingQueue<E> {
25 tim 1.6
26 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
27 dl 1.4 private transient final Condition available = lock.newCondition();
28 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
29 tim 1.1
30 dl 1.4 /**
31 dholmes 1.7 * Creates a new <tt>DelayQueue</tt> that is initially empty.
32 dl 1.4 */
33 tim 1.1 public DelayQueue() {}
34    
35 tim 1.6 /**
36 tim 1.9 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
37 dholmes 1.7 * given collection of {@link Delayed} instances.
38     *
39     * @throws NullPointerException if <tt>c</tt> or any element within it
40     * is <tt>null</tt>
41 tim 1.6 *
42     */
43     public DelayQueue(Collection<? extends E> c) {
44     this.addAll(c);
45     }
46    
47 dholmes 1.7 /**
48     * Add the specified element to this delay queue.
49     *
50     * @return <tt>true</tt>
51     * @throws NullPointerException {@inheritDoc}
52     */
53     public boolean offer(E o) {
54 dl 1.2 lock.lock();
55     try {
56 dl 1.3 E first = q.peek();
57 dholmes 1.7 q.offer(o);
58     if (first == null || o.compareTo(first) < 0)
59 dl 1.4 available.signalAll();
60 dl 1.2 return true;
61     }
62     finally {
63     lock.unlock();
64     }
65     }
66    
67 dholmes 1.7
68     /**
69     * Adds the specified element to this delay queue. As the queue is
70     * unbounded this method will never block.
71     * @throws NullPointerException {@inheritDoc}
72     */
73     public void put(E o) {
74     offer(o);
75 dl 1.2 }
76    
77 dholmes 1.7 /**
78     * Adds the specified element to this priority queue. As the queue is
79     * unbounded this method will never block.
80     * @param o {@inheritDoc}
81 tim 1.11 * @param time This parameter is ignored as the method never blocks
82 dholmes 1.7 * @param unit This parameter is ignored as the method never blocks
83     * @throws NullPointerException {@inheritDoc}
84     * @return <tt>true</tt>
85     */
86     public boolean offer(E o, long time, TimeUnit unit) {
87     return offer(o);
88 dl 1.2 }
89    
90 dholmes 1.7 /**
91     * Adds the specified element to this queue.
92     * @return <tt>true</tt> (as per the general contract of
93     * <tt>Collection.add</tt>).
94     *
95     * @throws NullPointerException {@inheritDoc}
96     */
97     public boolean add(E o) {
98     return offer(o);
99 dl 1.2 }
100    
101 dl 1.3 public E take() throws InterruptedException {
102 dl 1.2 lock.lockInterruptibly();
103     try {
104     for (;;) {
105 dl 1.3 E first = q.peek();
106 dl 1.12 if (first == null) {
107 dl 1.4 available.await();
108 dl 1.12 }
109 dl 1.2 else {
110     long delay = first.getDelay(TimeUnit.NANOSECONDS);
111 dl 1.12 if (delay > 0) {
112     long tl = available.awaitNanos(delay);
113     }
114 dl 1.2 else {
115 dl 1.3 E x = q.poll();
116 dl 1.2 assert x != null;
117     if (q.size() != 0)
118 dl 1.4 available.signalAll(); // wake up other takers
119 dl 1.2 return x;
120 tim 1.6
121 dl 1.2 }
122     }
123     }
124     }
125     finally {
126     lock.unlock();
127     }
128     }
129    
130 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
131 dl 1.2 lock.lockInterruptibly();
132     long nanos = unit.toNanos(time);
133     try {
134     for (;;) {
135 dl 1.3 E first = q.peek();
136 dl 1.2 if (first == null) {
137     if (nanos <= 0)
138     return null;
139     else
140 dl 1.4 nanos = available.awaitNanos(nanos);
141 dl 1.2 }
142     else {
143     long delay = first.getDelay(TimeUnit.NANOSECONDS);
144     if (delay > 0) {
145     if (delay > nanos)
146     delay = nanos;
147 dl 1.4 long timeLeft = available.awaitNanos(delay);
148 dl 1.2 nanos -= delay - timeLeft;
149     }
150     else {
151 dl 1.3 E x = q.poll();
152 dl 1.2 assert x != null;
153     if (q.size() != 0)
154 tim 1.6 available.signalAll();
155 dl 1.2 return x;
156     }
157     }
158     }
159     }
160     finally {
161     lock.unlock();
162     }
163     }
164    
165    
166 dl 1.3 public E poll() {
167 dl 1.2 lock.lock();
168     try {
169 dl 1.3 E first = q.peek();
170 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
171     return null;
172     else {
173 dl 1.3 E x = q.poll();
174 dl 1.2 assert x != null;
175     if (q.size() != 0)
176 tim 1.6 available.signalAll();
177 dl 1.2 return x;
178     }
179     }
180     finally {
181     lock.unlock();
182     }
183     }
184    
185 dl 1.3 public E peek() {
186 dl 1.2 lock.lock();
187     try {
188     return q.peek();
189     }
190     finally {
191     lock.unlock();
192     }
193 tim 1.1 }
194    
195 dl 1.2 public int size() {
196     lock.lock();
197     try {
198     return q.size();
199     }
200     finally {
201     lock.unlock();
202     }
203     }
204    
205 dholmes 1.7 /**
206     * Atomically removes all of the elements from this delay queue.
207     * The queue will be empty after this call returns.
208     */
209 dl 1.2 public void clear() {
210     lock.lock();
211     try {
212     q.clear();
213     }
214     finally {
215     lock.unlock();
216     }
217     }
218 tim 1.1
219 dholmes 1.7 /**
220     * Always returns <tt>Integer.MAX_VALUE</tt> because
221     * a <tt>DelayQueue</tt> is not capacity constrained.
222     * @return <tt>Integer.MAX_VALUE</tt>
223     */
224 dl 1.2 public int remainingCapacity() {
225     return Integer.MAX_VALUE;
226 tim 1.1 }
227 dl 1.2
228     public Object[] toArray() {
229     lock.lock();
230     try {
231     return q.toArray();
232     }
233     finally {
234     lock.unlock();
235     }
236 tim 1.1 }
237 dl 1.2
238     public <T> T[] toArray(T[] array) {
239     lock.lock();
240     try {
241     return q.toArray(array);
242     }
243     finally {
244     lock.unlock();
245     }
246 tim 1.1 }
247    
248 dholmes 1.7 /**
249     * Removes a single instance of the specified element from this
250     * queue, if it is present. More formally,
251     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
252     * o.equals(e))</tt>, if the queue contains one or more such
253     * elements. Returns <tt>true</tt> if the queue contained the
254     * specified element (or equivalently, if the queue changed as a
255     * result of the call).
256     *
257     * <p>This implementation iterates over the queue looking for the
258     * specified element. If it finds the element, it removes the element
259     * from the queue using the iterator's remove method.<p>
260     *
261     */
262     public boolean remove(Object o) {
263 dl 1.2 lock.lock();
264     try {
265 dholmes 1.7 return q.remove(o);
266 dl 1.2 }
267     finally {
268     lock.unlock();
269     }
270     }
271    
272 dholmes 1.7 /**
273     * Returns an iterator over the elements in this queue. The iterator
274 dl 1.8 * does not return the elements in any particular order. The
275     * returned iterator is a "fast-fail" iterator that will
276     * throw {@link java.util.ConcurrentModificationException}
277     * upon detected interference.
278 dholmes 1.7 *
279     * @return an iterator over the elements in this queue.
280     */
281 dl 1.3 public Iterator<E> iterator() {
282 dl 1.2 lock.lock();
283     try {
284     return new Itr(q.iterator());
285     }
286     finally {
287     lock.unlock();
288     }
289     }
290    
291 dl 1.3 private class Itr<E> implements Iterator<E> {
292     private final Iterator<E> iter;
293 tim 1.6 Itr(Iterator<E> i) {
294     iter = i;
295 dl 1.2 }
296    
297 tim 1.6 public boolean hasNext() {
298 dl 1.2 return iter.hasNext();
299 tim 1.6 }
300    
301     public E next() {
302 dl 1.2 lock.lock();
303     try {
304     return iter.next();
305     }
306     finally {
307     lock.unlock();
308     }
309 tim 1.6 }
310    
311     public void remove() {
312 dl 1.2 lock.lock();
313     try {
314     iter.remove();
315     }
316     finally {
317     lock.unlock();
318     }
319 tim 1.6 }
320 tim 1.1 }
321    
322     }