ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.3
Committed: Sun Jun 1 18:38:01 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1
Changes since 1.2: +25 -25 lines
Log Message:
Replaced DelayEntry with Delayed interface

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.*;
10
11 /**
12 * An unbounded queue of <tt>Delayed</tt> elements, in which
13 * elements can only be taken when their delay has expired.
14 **/
15
16 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
17 implements BlockingQueue<E> {
18
19 private transient final ReentrantLock lock = new ReentrantLock();
20 private transient final Condition canTake = lock.newCondition();
21 private final PriorityQueue<E> q = new PriorityQueue<E>();
22
23 public DelayQueue() {}
24
25 public boolean offer(E x) {
26 lock.lock();
27 try {
28 E first = q.peek();
29 q.offer(x);
30 if (first == null || x.compareTo(first) < 0)
31 canTake.signalAll();
32 return true;
33 }
34 finally {
35 lock.unlock();
36 }
37 }
38
39 public void put(E x) {
40 offer(x);
41 }
42
43 public boolean offer(E x, long time, TimeUnit unit) {
44 return offer(x);
45 }
46
47 public boolean add(E x) {
48 return offer(x);
49 }
50
51 public E take() throws InterruptedException {
52 lock.lockInterruptibly();
53 try {
54 for (;;) {
55 E first = q.peek();
56 if (first == null)
57 canTake.await();
58 else {
59 long delay = first.getDelay(TimeUnit.NANOSECONDS);
60 if (delay > 0)
61 canTake.awaitNanos(delay);
62 else {
63 E x = q.poll();
64 assert x != null;
65 if (q.size() != 0)
66 canTake.signalAll(); // wake up other takers
67 return x;
68
69 }
70 }
71 }
72 }
73 finally {
74 lock.unlock();
75 }
76 }
77
78 public E poll(long time, TimeUnit unit) throws InterruptedException {
79 lock.lockInterruptibly();
80 long nanos = unit.toNanos(time);
81 try {
82 for (;;) {
83 E first = q.peek();
84 if (first == null) {
85 if (nanos <= 0)
86 return null;
87 else
88 nanos = canTake.awaitNanos(nanos);
89 }
90 else {
91 long delay = first.getDelay(TimeUnit.NANOSECONDS);
92 if (delay > 0) {
93 if (delay > nanos)
94 delay = nanos;
95 long timeLeft = canTake.awaitNanos(delay);
96 nanos -= delay - timeLeft;
97 }
98 else {
99 E x = q.poll();
100 assert x != null;
101 if (q.size() != 0)
102 canTake.signalAll();
103 return x;
104 }
105 }
106 }
107 }
108 finally {
109 lock.unlock();
110 }
111 }
112
113
114 public E poll() {
115 lock.lock();
116 try {
117 E first = q.peek();
118 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
119 return null;
120 else {
121 E x = q.poll();
122 assert x != null;
123 if (q.size() != 0)
124 canTake.signalAll();
125 return x;
126 }
127 }
128 finally {
129 lock.unlock();
130 }
131 }
132
133 public E peek() {
134 lock.lock();
135 try {
136 return q.peek();
137 }
138 finally {
139 lock.unlock();
140 }
141 }
142
143 public int size() {
144 lock.lock();
145 try {
146 return q.size();
147 }
148 finally {
149 lock.unlock();
150 }
151 }
152
153 public void clear() {
154 lock.lock();
155 try {
156 q.clear();
157 }
158 finally {
159 lock.unlock();
160 }
161 }
162
163 public int remainingCapacity() {
164 return Integer.MAX_VALUE;
165 }
166
167 public Object[] toArray() {
168 lock.lock();
169 try {
170 return q.toArray();
171 }
172 finally {
173 lock.unlock();
174 }
175 }
176
177 public <T> T[] toArray(T[] array) {
178 lock.lock();
179 try {
180 return q.toArray(array);
181 }
182 finally {
183 lock.unlock();
184 }
185 }
186
187 public boolean remove(Object x) {
188 lock.lock();
189 try {
190 return q.remove(x);
191 }
192 finally {
193 lock.unlock();
194 }
195 }
196
197 public Iterator<E> iterator() {
198 lock.lock();
199 try {
200 return new Itr(q.iterator());
201 }
202 finally {
203 lock.unlock();
204 }
205 }
206
207 private class Itr<E> implements Iterator<E> {
208 private final Iterator<E> iter;
209 Itr(Iterator<E> i) {
210 iter = i;
211 }
212
213 public boolean hasNext() {
214 return iter.hasNext();
215 }
216
217 public E next() {
218 lock.lock();
219 try {
220 return iter.next();
221 }
222 finally {
223 lock.unlock();
224 }
225 }
226
227 public void remove() {
228 lock.lock();
229 try {
230 iter.remove();
231 }
232 finally {
233 lock.unlock();
234 }
235 }
236 }
237
238 }