ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.4
Committed: Tue Jun 24 14:34:47 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.3: +15 -10 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.*;
10
11 /**
12 * An unbounded queue of <tt>Delayed</tt> elements, in which
13 * elements can only be taken when their delay has expired.
14 * @since 1.5
15 * @author Doug Lea
16 */
17
18 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
19 implements BlockingQueue<E> {
20
21 private transient final ReentrantLock lock = new ReentrantLock();
22 private transient final Condition available = lock.newCondition();
23 private final PriorityQueue<E> q = new PriorityQueue<E>();
24
25 /**
26 * Creates a new DelayQeueu
27 */
28 public DelayQueue() {}
29
30 public boolean offer(E x) {
31 lock.lock();
32 try {
33 E first = q.peek();
34 q.offer(x);
35 if (first == null || x.compareTo(first) < 0)
36 available.signalAll();
37 return true;
38 }
39 finally {
40 lock.unlock();
41 }
42 }
43
44 public void put(E x) {
45 offer(x);
46 }
47
48 public boolean offer(E x, long time, TimeUnit unit) {
49 return offer(x);
50 }
51
52 public boolean add(E x) {
53 return offer(x);
54 }
55
56 public E take() throws InterruptedException {
57 lock.lockInterruptibly();
58 try {
59 for (;;) {
60 E first = q.peek();
61 if (first == null)
62 available.await();
63 else {
64 long delay = first.getDelay(TimeUnit.NANOSECONDS);
65 if (delay > 0)
66 available.awaitNanos(delay);
67 else {
68 E x = q.poll();
69 assert x != null;
70 if (q.size() != 0)
71 available.signalAll(); // wake up other takers
72 return x;
73
74 }
75 }
76 }
77 }
78 finally {
79 lock.unlock();
80 }
81 }
82
83 public E poll(long time, TimeUnit unit) throws InterruptedException {
84 lock.lockInterruptibly();
85 long nanos = unit.toNanos(time);
86 try {
87 for (;;) {
88 E first = q.peek();
89 if (first == null) {
90 if (nanos <= 0)
91 return null;
92 else
93 nanos = available.awaitNanos(nanos);
94 }
95 else {
96 long delay = first.getDelay(TimeUnit.NANOSECONDS);
97 if (delay > 0) {
98 if (delay > nanos)
99 delay = nanos;
100 long timeLeft = available.awaitNanos(delay);
101 nanos -= delay - timeLeft;
102 }
103 else {
104 E x = q.poll();
105 assert x != null;
106 if (q.size() != 0)
107 available.signalAll();
108 return x;
109 }
110 }
111 }
112 }
113 finally {
114 lock.unlock();
115 }
116 }
117
118
119 public E poll() {
120 lock.lock();
121 try {
122 E first = q.peek();
123 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
124 return null;
125 else {
126 E x = q.poll();
127 assert x != null;
128 if (q.size() != 0)
129 available.signalAll();
130 return x;
131 }
132 }
133 finally {
134 lock.unlock();
135 }
136 }
137
138 public E peek() {
139 lock.lock();
140 try {
141 return q.peek();
142 }
143 finally {
144 lock.unlock();
145 }
146 }
147
148 public int size() {
149 lock.lock();
150 try {
151 return q.size();
152 }
153 finally {
154 lock.unlock();
155 }
156 }
157
158 public void clear() {
159 lock.lock();
160 try {
161 q.clear();
162 }
163 finally {
164 lock.unlock();
165 }
166 }
167
168 public int remainingCapacity() {
169 return Integer.MAX_VALUE;
170 }
171
172 public Object[] toArray() {
173 lock.lock();
174 try {
175 return q.toArray();
176 }
177 finally {
178 lock.unlock();
179 }
180 }
181
182 public <T> T[] toArray(T[] array) {
183 lock.lock();
184 try {
185 return q.toArray(array);
186 }
187 finally {
188 lock.unlock();
189 }
190 }
191
192 public boolean remove(Object x) {
193 lock.lock();
194 try {
195 return q.remove(x);
196 }
197 finally {
198 lock.unlock();
199 }
200 }
201
202 public Iterator<E> iterator() {
203 lock.lock();
204 try {
205 return new Itr(q.iterator());
206 }
207 finally {
208 lock.unlock();
209 }
210 }
211
212 private class Itr<E> implements Iterator<E> {
213 private final Iterator<E> iter;
214 Itr(Iterator<E> i) {
215 iter = i;
216 }
217
218 public boolean hasNext() {
219 return iter.hasNext();
220 }
221
222 public E next() {
223 lock.lock();
224 try {
225 return iter.next();
226 }
227 finally {
228 lock.unlock();
229 }
230 }
231
232 public void remove() {
233 lock.lock();
234 try {
235 iter.remove();
236 }
237 finally {
238 lock.unlock();
239 }
240 }
241 }
242
243 }