ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.4
Committed: Tue Jun 24 14:34:47 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.3: +15 -10 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9     import java.util.*;
10    
11     /**
12 dl 1.3 * An unbounded queue of <tt>Delayed</tt> elements, in which
13 dl 1.2 * elements can only be taken when their delay has expired.
14 dl 1.4 * @since 1.5
15     * @author Doug Lea
16     */
17 tim 1.1
18 dl 1.3 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
19     implements BlockingQueue<E> {
20    
21 dl 1.2 private transient final ReentrantLock lock = new ReentrantLock();
22 dl 1.4 private transient final Condition available = lock.newCondition();
23 dl 1.3 private final PriorityQueue<E> q = new PriorityQueue<E>();
24 tim 1.1
25 dl 1.4 /**
26     * Creates a new DelayQeueu
27     */
28 tim 1.1 public DelayQueue() {}
29    
30 dl 1.3 public boolean offer(E x) {
31 dl 1.2 lock.lock();
32     try {
33 dl 1.3 E first = q.peek();
34 dl 1.2 q.offer(x);
35     if (first == null || x.compareTo(first) < 0)
36 dl 1.4 available.signalAll();
37 dl 1.2 return true;
38     }
39     finally {
40     lock.unlock();
41     }
42     }
43    
44 dl 1.3 public void put(E x) {
45 dl 1.2 offer(x);
46     }
47    
48 dl 1.3 public boolean offer(E x, long time, TimeUnit unit) {
49 dl 1.2 return offer(x);
50     }
51    
52 dl 1.3 public boolean add(E x) {
53 dl 1.2 return offer(x);
54     }
55    
56 dl 1.3 public E take() throws InterruptedException {
57 dl 1.2 lock.lockInterruptibly();
58     try {
59     for (;;) {
60 dl 1.3 E first = q.peek();
61 dl 1.2 if (first == null)
62 dl 1.4 available.await();
63 dl 1.2 else {
64     long delay = first.getDelay(TimeUnit.NANOSECONDS);
65     if (delay > 0)
66 dl 1.4 available.awaitNanos(delay);
67 dl 1.2 else {
68 dl 1.3 E x = q.poll();
69 dl 1.2 assert x != null;
70     if (q.size() != 0)
71 dl 1.4 available.signalAll(); // wake up other takers
72 dl 1.2 return x;
73    
74     }
75     }
76     }
77     }
78     finally {
79     lock.unlock();
80     }
81     }
82    
83 dl 1.3 public E poll(long time, TimeUnit unit) throws InterruptedException {
84 dl 1.2 lock.lockInterruptibly();
85     long nanos = unit.toNanos(time);
86     try {
87     for (;;) {
88 dl 1.3 E first = q.peek();
89 dl 1.2 if (first == null) {
90     if (nanos <= 0)
91     return null;
92     else
93 dl 1.4 nanos = available.awaitNanos(nanos);
94 dl 1.2 }
95     else {
96     long delay = first.getDelay(TimeUnit.NANOSECONDS);
97     if (delay > 0) {
98     if (delay > nanos)
99     delay = nanos;
100 dl 1.4 long timeLeft = available.awaitNanos(delay);
101 dl 1.2 nanos -= delay - timeLeft;
102     }
103     else {
104 dl 1.3 E x = q.poll();
105 dl 1.2 assert x != null;
106     if (q.size() != 0)
107 dl 1.4 available.signalAll();
108 dl 1.2 return x;
109     }
110     }
111     }
112     }
113     finally {
114     lock.unlock();
115     }
116     }
117    
118    
119 dl 1.3 public E poll() {
120 dl 1.2 lock.lock();
121     try {
122 dl 1.3 E first = q.peek();
123 dl 1.2 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
124     return null;
125     else {
126 dl 1.3 E x = q.poll();
127 dl 1.2 assert x != null;
128     if (q.size() != 0)
129 dl 1.4 available.signalAll();
130 dl 1.2 return x;
131     }
132     }
133     finally {
134     lock.unlock();
135     }
136     }
137    
138 dl 1.3 public E peek() {
139 dl 1.2 lock.lock();
140     try {
141     return q.peek();
142     }
143     finally {
144     lock.unlock();
145     }
146 tim 1.1 }
147    
148 dl 1.2 public int size() {
149     lock.lock();
150     try {
151     return q.size();
152     }
153     finally {
154     lock.unlock();
155     }
156     }
157    
158     public void clear() {
159     lock.lock();
160     try {
161     q.clear();
162     }
163     finally {
164     lock.unlock();
165     }
166     }
167 tim 1.1
168 dl 1.2 public int remainingCapacity() {
169     return Integer.MAX_VALUE;
170 tim 1.1 }
171 dl 1.2
172     public Object[] toArray() {
173     lock.lock();
174     try {
175     return q.toArray();
176     }
177     finally {
178     lock.unlock();
179     }
180 tim 1.1 }
181 dl 1.2
182     public <T> T[] toArray(T[] array) {
183     lock.lock();
184     try {
185     return q.toArray(array);
186     }
187     finally {
188     lock.unlock();
189     }
190 tim 1.1 }
191    
192     public boolean remove(Object x) {
193 dl 1.2 lock.lock();
194     try {
195     return q.remove(x);
196     }
197     finally {
198     lock.unlock();
199     }
200     }
201    
202 dl 1.3 public Iterator<E> iterator() {
203 dl 1.2 lock.lock();
204     try {
205     return new Itr(q.iterator());
206     }
207     finally {
208     lock.unlock();
209     }
210     }
211    
212 dl 1.3 private class Itr<E> implements Iterator<E> {
213     private final Iterator<E> iter;
214     Itr(Iterator<E> i) {
215 dl 1.2 iter = i;
216     }
217    
218     public boolean hasNext() {
219     return iter.hasNext();
220     }
221    
222 dl 1.3 public E next() {
223 dl 1.2 lock.lock();
224     try {
225     return iter.next();
226     }
227     finally {
228     lock.unlock();
229     }
230     }
231    
232     public void remove() {
233     lock.lock();
234     try {
235     iter.remove();
236     }
237     finally {
238     lock.unlock();
239     }
240     }
241 tim 1.1 }
242    
243     }