ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.5
Committed: Tue Jul 8 00:46:33 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2
Changes since 1.4: +1 -0 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7
8 package java.util.concurrent;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded queue of <tt>Delayed</tt> elements, in which
14 * elements can only be taken when their delay has expired.
15 * @since 1.5
16 * @author Doug Lea
17 */
18
19 public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
20 implements BlockingQueue<E> {
21
22 private transient final ReentrantLock lock = new ReentrantLock();
23 private transient final Condition available = lock.newCondition();
24 private final PriorityQueue<E> q = new PriorityQueue<E>();
25
26 /**
27 * Creates a new DelayQeueu
28 */
29 public DelayQueue() {}
30
31 public boolean offer(E x) {
32 lock.lock();
33 try {
34 E first = q.peek();
35 q.offer(x);
36 if (first == null || x.compareTo(first) < 0)
37 available.signalAll();
38 return true;
39 }
40 finally {
41 lock.unlock();
42 }
43 }
44
45 public void put(E x) {
46 offer(x);
47 }
48
49 public boolean offer(E x, long time, TimeUnit unit) {
50 return offer(x);
51 }
52
53 public boolean add(E x) {
54 return offer(x);
55 }
56
57 public E take() throws InterruptedException {
58 lock.lockInterruptibly();
59 try {
60 for (;;) {
61 E first = q.peek();
62 if (first == null)
63 available.await();
64 else {
65 long delay = first.getDelay(TimeUnit.NANOSECONDS);
66 if (delay > 0)
67 available.awaitNanos(delay);
68 else {
69 E x = q.poll();
70 assert x != null;
71 if (q.size() != 0)
72 available.signalAll(); // wake up other takers
73 return x;
74
75 }
76 }
77 }
78 }
79 finally {
80 lock.unlock();
81 }
82 }
83
84 public E poll(long time, TimeUnit unit) throws InterruptedException {
85 lock.lockInterruptibly();
86 long nanos = unit.toNanos(time);
87 try {
88 for (;;) {
89 E first = q.peek();
90 if (first == null) {
91 if (nanos <= 0)
92 return null;
93 else
94 nanos = available.awaitNanos(nanos);
95 }
96 else {
97 long delay = first.getDelay(TimeUnit.NANOSECONDS);
98 if (delay > 0) {
99 if (delay > nanos)
100 delay = nanos;
101 long timeLeft = available.awaitNanos(delay);
102 nanos -= delay - timeLeft;
103 }
104 else {
105 E x = q.poll();
106 assert x != null;
107 if (q.size() != 0)
108 available.signalAll();
109 return x;
110 }
111 }
112 }
113 }
114 finally {
115 lock.unlock();
116 }
117 }
118
119
120 public E poll() {
121 lock.lock();
122 try {
123 E first = q.peek();
124 if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
125 return null;
126 else {
127 E x = q.poll();
128 assert x != null;
129 if (q.size() != 0)
130 available.signalAll();
131 return x;
132 }
133 }
134 finally {
135 lock.unlock();
136 }
137 }
138
139 public E peek() {
140 lock.lock();
141 try {
142 return q.peek();
143 }
144 finally {
145 lock.unlock();
146 }
147 }
148
149 public int size() {
150 lock.lock();
151 try {
152 return q.size();
153 }
154 finally {
155 lock.unlock();
156 }
157 }
158
159 public void clear() {
160 lock.lock();
161 try {
162 q.clear();
163 }
164 finally {
165 lock.unlock();
166 }
167 }
168
169 public int remainingCapacity() {
170 return Integer.MAX_VALUE;
171 }
172
173 public Object[] toArray() {
174 lock.lock();
175 try {
176 return q.toArray();
177 }
178 finally {
179 lock.unlock();
180 }
181 }
182
183 public <T> T[] toArray(T[] array) {
184 lock.lock();
185 try {
186 return q.toArray(array);
187 }
188 finally {
189 lock.unlock();
190 }
191 }
192
193 public boolean remove(Object x) {
194 lock.lock();
195 try {
196 return q.remove(x);
197 }
198 finally {
199 lock.unlock();
200 }
201 }
202
203 public Iterator<E> iterator() {
204 lock.lock();
205 try {
206 return new Itr(q.iterator());
207 }
208 finally {
209 lock.unlock();
210 }
211 }
212
213 private class Itr<E> implements Iterator<E> {
214 private final Iterator<E> iter;
215 Itr(Iterator<E> i) {
216 iter = i;
217 }
218
219 public boolean hasNext() {
220 return iter.hasNext();
221 }
222
223 public E next() {
224 lock.lock();
225 try {
226 return iter.next();
227 }
228 finally {
229 lock.unlock();
230 }
231 }
232
233 public void remove() {
234 lock.lock();
235 try {
236 iter.remove();
237 }
238 finally {
239 lock.unlock();
240 }
241 }
242 }
243
244 }