ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.6
Committed: Thu Jul 31 19:50:24 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.5: +25 -15 lines
Log Message:
Add Collection constructor, fix typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded queue of <tt>Delayed</tt> elements, in which
14 * elements can only be taken when their delay has expired.
15 * @since 1.5
16 * @author Doug Lea
17 */
18
19 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
20 implements BlockingQueue<E> {
21
22 private transient final ReentrantLock lock = new ReentrantLock();
23 private transient final Condition available = lock.newCondition();
24 private final PriorityQueue<E> q = new PriorityQueue<E>();
25
26 /**
27 * Creates a new DelayQueue with no elements
28 */
29 public DelayQueue() {}
30
31 /**
32 * Create a new DelayQueue with elements taken from the given
33 * collection of {@link Delayed} instances.
34 *
35 * @fixme Should the body be wrapped with try-lock-finally-unlock?
36 */
37 public DelayQueue(Collection<? extends E> c) {
38 this.addAll(c);
39 }
40
41 public boolean offer(E x) {
42 lock.lock();
43 try {
44 E first = q.peek();
45 q.offer(x);
46 if (first == null || x.compareTo(first) < 0)
47 available.signalAll();
48 return true;
49 }
50 finally {
51 lock.unlock();
52 }
53 }
54
55 public void put(E x) {
56 offer(x);
57 }
58
59 public boolean offer(E x, long time, TimeUnit unit) {
60 return offer(x);
61 }
62
63 public boolean add(E x) {
64 return offer(x);
65 }
66
67 public E take() throws InterruptedException {
68 lock.lockInterruptibly();
69 try {
70 for (;;) {
71 E first = q.peek();
72 if (first == null)
73 available.await();
74 else {
75 long delay = first.getDelay(TimeUnit.NANOSECONDS);
76 if (delay > 0)
77 available.awaitNanos(delay);
78 else {
79 E x = q.poll();
80 assert x != null;
81 if (q.size() != 0)
82 available.signalAll(); // wake up other takers
83 return x;
84
85 }
86 }
87 }
88 }
89 finally {
90 lock.unlock();
91 }
92 }
93
94 public E poll(long time, TimeUnit unit) throws InterruptedException {
95 lock.lockInterruptibly();
96 long nanos = unit.toNanos(time);
97 try {
98 for (;;) {
99 E first = q.peek();
100 if (first == null) {
101 if (nanos <= 0)
102 return null;
103 else
104 nanos = available.awaitNanos(nanos);
105 }
106 else {
107 long delay = first.getDelay(TimeUnit.NANOSECONDS);
108 if (delay > 0) {
109 if (delay > nanos)
110 delay = nanos;
111 long timeLeft = available.awaitNanos(delay);
112 nanos -= delay - timeLeft;
113 }
114 else {
115 E x = q.poll();
116 assert x != null;
117 if (q.size() != 0)
118 available.signalAll();
119 return x;
120 }
121 }
122 }
123 }
124 finally {
125 lock.unlock();
126 }
127 }
128
129
130 public E poll() {
131 lock.lock();
132 try {
133 E first = q.peek();
134 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
135 return null;
136 else {
137 E x = q.poll();
138 assert x != null;
139 if (q.size() != 0)
140 available.signalAll();
141 return x;
142 }
143 }
144 finally {
145 lock.unlock();
146 }
147 }
148
149 public E peek() {
150 lock.lock();
151 try {
152 return q.peek();
153 }
154 finally {
155 lock.unlock();
156 }
157 }
158
159 public int size() {
160 lock.lock();
161 try {
162 return q.size();
163 }
164 finally {
165 lock.unlock();
166 }
167 }
168
169 public void clear() {
170 lock.lock();
171 try {
172 q.clear();
173 }
174 finally {
175 lock.unlock();
176 }
177 }
178
179 public int remainingCapacity() {
180 return Integer.MAX_VALUE;
181 }
182
183 public Object[] toArray() {
184 lock.lock();
185 try {
186 return q.toArray();
187 }
188 finally {
189 lock.unlock();
190 }
191 }
192
193 public <T> T[] toArray(T[] array) {
194 lock.lock();
195 try {
196 return q.toArray(array);
197 }
198 finally {
199 lock.unlock();
200 }
201 }
202
203 public boolean remove(Object x) {
204 lock.lock();
205 try {
206 return q.remove(x);
207 }
208 finally {
209 lock.unlock();
210 }
211 }
212
213 public Iterator<E> iterator() {
214 lock.lock();
215 try {
216 return new Itr(q.iterator());
217 }
218 finally {
219 lock.unlock();
220 }
221 }
222
223 private class Itr<E> implements Iterator<E> {
224 private final Iterator<E> iter;
225 Itr(Iterator<E> i) {
226 iter = i;
227 }
228
229 public boolean hasNext() {
230 return iter.hasNext();
231 }
232
233 public E next() {
234 lock.lock();
235 try {
236 return iter.next();
237 }
238 finally {
239 lock.unlock();
240 }
241 }
242
243 public void remove() {
244 lock.lock();
245 try {
246 iter.remove();
247 }
248 finally {
249 lock.unlock();
250 }
251 }
252 }
253
254 }