11 |
|
/** |
12 |
|
* An unbounded queue of <tt>Delayed</tt> elements, in which |
13 |
|
* elements can only be taken when their delay has expired. |
14 |
< |
**/ |
14 |
> |
* @since 1.5 |
15 |
> |
* @author Doug Lea |
16 |
> |
*/ |
17 |
|
|
18 |
|
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> |
19 |
|
implements BlockingQueue<E> { |
20 |
|
|
21 |
|
private transient final ReentrantLock lock = new ReentrantLock(); |
22 |
< |
private transient final Condition canTake = lock.newCondition(); |
22 |
> |
private transient final Condition available = lock.newCondition(); |
23 |
|
private final PriorityQueue<E> q = new PriorityQueue<E>(); |
24 |
|
|
25 |
+ |
/** |
26 |
+ |
* Creates a new DelayQeueu |
27 |
+ |
*/ |
28 |
|
public DelayQueue() {} |
29 |
|
|
30 |
|
public boolean offer(E x) { |
33 |
|
E first = q.peek(); |
34 |
|
q.offer(x); |
35 |
|
if (first == null || x.compareTo(first) < 0) |
36 |
< |
canTake.signalAll(); |
36 |
> |
available.signalAll(); |
37 |
|
return true; |
38 |
|
} |
39 |
|
finally { |
59 |
|
for (;;) { |
60 |
|
E first = q.peek(); |
61 |
|
if (first == null) |
62 |
< |
canTake.await(); |
62 |
> |
available.await(); |
63 |
|
else { |
64 |
|
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
65 |
|
if (delay > 0) |
66 |
< |
canTake.awaitNanos(delay); |
66 |
> |
available.awaitNanos(delay); |
67 |
|
else { |
68 |
|
E x = q.poll(); |
69 |
|
assert x != null; |
70 |
|
if (q.size() != 0) |
71 |
< |
canTake.signalAll(); // wake up other takers |
71 |
> |
available.signalAll(); // wake up other takers |
72 |
|
return x; |
73 |
|
|
74 |
|
} |
90 |
|
if (nanos <= 0) |
91 |
|
return null; |
92 |
|
else |
93 |
< |
nanos = canTake.awaitNanos(nanos); |
93 |
> |
nanos = available.awaitNanos(nanos); |
94 |
|
} |
95 |
|
else { |
96 |
|
long delay = first.getDelay(TimeUnit.NANOSECONDS); |
97 |
|
if (delay > 0) { |
98 |
|
if (delay > nanos) |
99 |
|
delay = nanos; |
100 |
< |
long timeLeft = canTake.awaitNanos(delay); |
100 |
> |
long timeLeft = available.awaitNanos(delay); |
101 |
|
nanos -= delay - timeLeft; |
102 |
|
} |
103 |
|
else { |
104 |
|
E x = q.poll(); |
105 |
|
assert x != null; |
106 |
|
if (q.size() != 0) |
107 |
< |
canTake.signalAll(); |
107 |
> |
available.signalAll(); |
108 |
|
return x; |
109 |
|
} |
110 |
|
} |
126 |
|
E x = q.poll(); |
127 |
|
assert x != null; |
128 |
|
if (q.size() != 0) |
129 |
< |
canTake.signalAll(); |
129 |
> |
available.signalAll(); |
130 |
|
return x; |
131 |
|
} |
132 |
|
} |