ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRERELEASE_0_1
Changes since 1.1: +218 -90 lines
Log Message:
re-check-in initial implementations

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7    
8 tim 1.1 package java.util.concurrent;
9     import java.util.*;
10    
11     /**
12 dl 1.2 * An unbounded queue of <tt>DelayEntry</tt> elements, in which
13     * elements can only be taken when their delay has expired.
14 tim 1.1 **/
15    
16 dl 1.2 public class DelayQueue<E> extends AbstractQueue<DelayEntry<E>>
17     implements BlockingQueue<DelayEntry<E>> {
18    
19     private transient final ReentrantLock lock = new ReentrantLock();
20     private transient final Condition canTake = lock.newCondition();
21     private final PriorityQueue<DelayEntry<E>> q = new PriorityQueue<DelayEntry<E>>();
22 tim 1.1
23     public DelayQueue() {}
24    
25 dl 1.2 public boolean offer(DelayEntry<E> x) {
26     lock.lock();
27     try {
28     DelayEntry<E> first = q.peek();
29     q.offer(x);
30     if (first == null || x.compareTo(first) < 0)
31     canTake.signalAll();
32     return true;
33     }
34     finally {
35     lock.unlock();
36     }
37     }
38    
39     public void put(DelayEntry<E> x) {
40     offer(x);
41     }
42    
43     public boolean offer(DelayEntry<E> x, long time, TimeUnit unit) {
44     return offer(x);
45     }
46    
47     public boolean add(DelayEntry<E> x) {
48     return offer(x);
49     }
50    
51     public DelayEntry<E> take() throws InterruptedException {
52     lock.lockInterruptibly();
53     try {
54     for (;;) {
55     DelayEntry first = q.peek();
56     if (first == null)
57     canTake.await();
58     else {
59     long delay = first.getDelay(TimeUnit.NANOSECONDS);
60     if (delay > 0)
61     canTake.awaitNanos(delay);
62     else {
63     DelayEntry<E> x = q.poll();
64     assert x != null;
65     if (q.size() != 0)
66     canTake.signalAll(); // wake up other takers
67     return x;
68    
69     }
70     }
71     }
72     }
73     finally {
74     lock.unlock();
75     }
76     }
77    
78     public DelayEntry<E> poll(long time, TimeUnit unit) throws InterruptedException {
79     lock.lockInterruptibly();
80     long nanos = unit.toNanos(time);
81     try {
82     for (;;) {
83     DelayEntry first = q.peek();
84     if (first == null) {
85     if (nanos <= 0)
86     return null;
87     else
88     nanos = canTake.awaitNanos(nanos);
89     }
90     else {
91     long delay = first.getDelay(TimeUnit.NANOSECONDS);
92     if (delay > 0) {
93     if (delay > nanos)
94     delay = nanos;
95     long timeLeft = canTake.awaitNanos(delay);
96     nanos -= delay - timeLeft;
97     }
98     else {
99     DelayEntry<E> x = q.poll();
100     assert x != null;
101     if (q.size() != 0)
102     canTake.signalAll();
103     return x;
104     }
105     }
106     }
107     }
108     finally {
109     lock.unlock();
110     }
111     }
112    
113    
114     public DelayEntry<E> poll() {
115     lock.lock();
116     try {
117     DelayEntry first = q.peek();
118     if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
119     return null;
120     else {
121     DelayEntry<E> x = q.poll();
122     assert x != null;
123     if (q.size() != 0)
124     canTake.signalAll();
125     return x;
126     }
127     }
128     finally {
129     lock.unlock();
130     }
131     }
132    
133     public DelayEntry<E> peek() {
134     lock.lock();
135     try {
136     return q.peek();
137     }
138     finally {
139     lock.unlock();
140     }
141 tim 1.1 }
142    
143 dl 1.2 public int size() {
144     lock.lock();
145     try {
146     return q.size();
147     }
148     finally {
149     lock.unlock();
150     }
151     }
152    
153     public void clear() {
154     lock.lock();
155     try {
156     q.clear();
157     }
158     finally {
159     lock.unlock();
160     }
161     }
162 tim 1.1
163 dl 1.2 public int remainingCapacity() {
164     return Integer.MAX_VALUE;
165 tim 1.1 }
166 dl 1.2
167     public Object[] toArray() {
168     lock.lock();
169     try {
170     return q.toArray();
171     }
172     finally {
173     lock.unlock();
174     }
175 tim 1.1 }
176 dl 1.2
177     public <T> T[] toArray(T[] array) {
178     lock.lock();
179     try {
180     return q.toArray(array);
181     }
182     finally {
183     lock.unlock();
184     }
185 tim 1.1 }
186    
187     public boolean remove(Object x) {
188 dl 1.2 lock.lock();
189     try {
190     return q.remove(x);
191     }
192     finally {
193     lock.unlock();
194     }
195     }
196    
197     public Iterator<DelayEntry<E>> iterator() {
198     lock.lock();
199     try {
200     return new Itr(q.iterator());
201     }
202     finally {
203     lock.unlock();
204     }
205     }
206    
207     private class Itr<E> implements Iterator<DelayEntry<E>> {
208     private final Iterator<DelayEntry<E>> iter;
209     Itr(Iterator<DelayEntry<E>> i) {
210     iter = i;
211     }
212    
213     public boolean hasNext() {
214     return iter.hasNext();
215     }
216    
217     public DelayEntry<E> next() {
218     lock.lock();
219     try {
220     return iter.next();
221     }
222     finally {
223     lock.unlock();
224     }
225     }
226    
227     public void remove() {
228     lock.lock();
229     try {
230     iter.remove();
231     }
232     finally {
233     lock.unlock();
234     }
235     }
236 tim 1.1 }
237    
238     }