39 |
|
implements BlockingQueue<E> { |
40 |
|
|
41 |
|
private transient final ReentrantLock lock = new ReentrantLock(); |
42 |
– |
private transient final Condition available = lock.newCondition(); |
42 |
|
private final PriorityQueue<E> q = new PriorityQueue<E>(); |
43 |
|
|
44 |
|
/** |
45 |
+ |
* Thread designated to wait for the element at the head of |
46 |
+ |
* the queue. This variant of the Leader-Follower pattern |
47 |
+ |
* (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to |
48 |
+ |
* minimize unnecessary timed waiting. When a thread becomes |
49 |
+ |
* the leader, it waits only for the next delay to elapse, but |
50 |
+ |
* other threads await indefinitely. The leader thread must |
51 |
+ |
* signal some other thread before returning from take() or |
52 |
+ |
* poll(...), unless some other thread becomes leader in the |
53 |
+ |
* interim. Whenever the head of the queue is replaced with |
54 |
+ |
* an element with an earlier expiration time, the leader |
55 |
+ |
* field is invalidated by being reset to null, and some |
56 |
+ |
* waiting thread, but not necessarily the current leader, is |
57 |
+ |
* signalled. So waiting threads must be prepared to acquire |
58 |
+ |
* and lose leadership while waiting. |
59 |
+ |
*/ |
60 |
+ |
private Thread leader = null; |
61 |
+ |
|
62 |
+ |
/** |
63 |
+ |
* Condition signalled when a newer element becomes available |
64 |
+ |
* at the head of the queue or a new thread may need to |
65 |
+ |
* become leader. |
66 |
+ |
*/ |
67 |
+ |
private final Condition available = lock.newCondition(); |
68 |
+ |
|
69 |
+ |
/** |
70 |
|
* Creates a new <tt>DelayQueue</tt> that is initially empty. |
71 |
|
*/ |
72 |
|
public DelayQueue() {} |
105 |
|
final ReentrantLock lock = this.lock; |
106 |
|
lock.lock(); |
107 |
|
try { |
84 |
– |
E first = q.peek(); |
108 |
|
q.offer(e); |
109 |
< |
if (first == null || e.compareTo(first) < 0) |
110 |
< |
available.signalAll(); |
109 |
> |
if (q.peek() == e) { |
110 |
> |
leader = null; |
111 |
> |
available.signal(); |
112 |
> |
} |
113 |
|
return true; |
114 |
|
} finally { |
115 |
|
lock.unlock(); |
155 |
|
E first = q.peek(); |
156 |
|
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) |
157 |
|
return null; |
158 |
< |
else { |
159 |
< |
E x = q.poll(); |
135 |
< |
assert x != null; |
136 |
< |
if (q.size() != 0) |
137 |
< |
available.signalAll(); |
138 |
< |
return x; |
139 |
< |
} |
158 |
> |
else |
159 |
> |
return q.poll(); |
160 |
|
} finally { |
161 |
|
lock.unlock(); |
162 |
|
} |
175 |
|
try { |
176 |
|
for (;;) { |
177 |
|
E first = q.peek(); |
178 |
< |
if (first == null) { |
178 |
> |
if (first == null) |
179 |
|
available.await(); |
180 |
< |
} else { |
181 |
< |
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
182 |
< |
if (delay > 0) { |
183 |
< |
long tl = available.awaitNanos(delay); |
184 |
< |
} else { |
185 |
< |
E x = q.poll(); |
186 |
< |
assert x != null; |
187 |
< |
if (q.size() != 0) |
188 |
< |
available.signalAll(); // wake up other takers |
189 |
< |
return x; |
190 |
< |
|
180 |
> |
else { |
181 |
> |
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
182 |
> |
if (delay <= 0) |
183 |
> |
return q.poll(); |
184 |
> |
else if (leader != null) |
185 |
> |
available.await(); |
186 |
> |
else { |
187 |
> |
Thread thisThread = Thread.currentThread(); |
188 |
> |
leader = thisThread; |
189 |
> |
try { |
190 |
> |
available.awaitNanos(delay); |
191 |
> |
} finally { |
192 |
> |
if (leader == thisThread) |
193 |
> |
leader = null; |
194 |
> |
} |
195 |
|
} |
196 |
|
} |
197 |
|
} |
198 |
|
} finally { |
199 |
+ |
if (leader == null && q.peek() != null) |
200 |
+ |
available.signal(); |
201 |
|
lock.unlock(); |
202 |
|
} |
203 |
|
} |
226 |
|
nanos = available.awaitNanos(nanos); |
227 |
|
} else { |
228 |
|
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
229 |
< |
if (delay > 0) { |
230 |
< |
if (nanos <= 0) |
231 |
< |
return null; |
232 |
< |
if (delay > nanos) |
233 |
< |
delay = nanos; |
234 |
< |
long timeLeft = available.awaitNanos(delay); |
235 |
< |
nanos -= delay - timeLeft; |
236 |
< |
} else { |
237 |
< |
E x = q.poll(); |
238 |
< |
assert x != null; |
239 |
< |
if (q.size() != 0) |
240 |
< |
available.signalAll(); |
241 |
< |
return x; |
242 |
< |
} |
229 |
> |
if (delay <= 0) |
230 |
> |
return q.poll(); |
231 |
> |
if (nanos <= 0) |
232 |
> |
return null; |
233 |
> |
if (nanos < delay || leader != null) |
234 |
> |
nanos = available.awaitNanos(nanos); |
235 |
> |
else { |
236 |
> |
Thread thisThread = Thread.currentThread(); |
237 |
> |
leader = thisThread; |
238 |
> |
try { |
239 |
> |
long timeLeft = available.awaitNanos(delay); |
240 |
> |
nanos -= delay - timeLeft; |
241 |
> |
} finally { |
242 |
> |
if (leader == thisThread) |
243 |
> |
leader = null; |
244 |
> |
} |
245 |
> |
} |
246 |
|
} |
247 |
|
} |
248 |
|
} finally { |
249 |
+ |
if (leader == null && q.peek() != null) |
250 |
+ |
available.signal(); |
251 |
|
lock.unlock(); |
252 |
|
} |
253 |
|
} |
304 |
|
c.add(q.poll()); |
305 |
|
++n; |
306 |
|
} |
276 |
– |
if (n > 0) |
277 |
– |
available.signalAll(); |
307 |
|
return n; |
308 |
|
} finally { |
309 |
|
lock.unlock(); |
334 |
|
c.add(q.poll()); |
335 |
|
++n; |
336 |
|
} |
308 |
– |
if (n > 0) |
309 |
– |
available.signalAll(); |
337 |
|
return n; |
338 |
|
} finally { |
339 |
|
lock.unlock(); |
482 |
|
return cursor < array.length; |
483 |
|
} |
484 |
|
|
485 |
+ |
@SuppressWarnings("unchecked") |
486 |
|
public E next() { |
487 |
|
if (cursor >= array.length) |
488 |
|
throw new NoSuchElementException(); |