ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/DelayQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/DelayQueue.java (file contents):
Revision 1.1 by tim, Wed May 14 21:30:46 2003 UTC vs.
Revision 1.2 by dl, Tue May 27 18:14:40 2003 UTC

# Line 1 | Line 1
1 < package java.util.concurrent;
1 > /*
2 > * Written by Doug Lea with assistance from members of JCP JSR-166
3 > * Expert Group and released to the public domain. Use, modify, and
4 > * redistribute this code in any way without acknowledgement.
5 > */
6 >
7  
8 + package java.util.concurrent;
9   import java.util.*;
10  
11   /**
12 < * An unbounded queue in which elements cannot be taken until
13 < * their indicated delays have elapsed.
12 > * An unbounded queue of <tt>DelayEntry</tt> elements, in which
13 > * elements can only be taken when their delay has expired.
14   **/
15  
16 < public class DelayQueue<E> extends AbstractCollection<E>
17 <        implements BlockingQueue<E>, java.io.Serializable {
16 > public class DelayQueue<E> extends AbstractQueue<DelayEntry<E>>
17 >        implements BlockingQueue<DelayEntry<E>> {
18 >
19 >    private transient final ReentrantLock lock = new ReentrantLock();
20 >    private transient final Condition canTake = lock.newCondition();
21 >    private final PriorityQueue<DelayEntry<E>> q = new PriorityQueue<DelayEntry<E>>();
22  
23      public DelayQueue() {}
24  
25 <    /**
26 <     * Add the given element to the queue, to be taken after the given delay.
27 <     * @param unit the time to delay
28 <     * @param unit the granularity of the time unit
29 <     * @param x the element
30 <     */
31 <    public boolean add(long delay, TimeUnit granularity, E x) {
32 <        return false;
33 <    }
34 <
35 <    /**
36 <     * Return the time until the given element may be taken,
37 <     * in the requested time granularity.
38 <     * @param element the element
39 <     * @param granularity the time granularity
40 <     * @throws NoSuchElementException if element not present
41 <     */
42 <    public long getDelay(E element, TimeUnit granularity) {
43 <        return 0;
44 <    }
45 <
46 <    /**
47 <     * Return the time until the earliest element may be taken,
48 <     * in the requested time granularity.
49 <     * @param granularity the time granularity
50 <     * @throws NoSuchElementException if empty
51 <     */
52 <    public long getEarliestDelay(TimeUnit granularity) {
53 <        return 0;
54 <    }
55 <
56 <
57 <    /**
58 <     * Equivalent to add(0, [any time unit], x).
59 <     **/
60 <    public void put(E x) {
61 <    }
62 <    /**
63 <     * Equivalent to add(0, [any time unit], x).
64 <     **/
65 <    public boolean offer(E x, long time, TimeUnit granularity) {
66 <        return false;
67 <    }
68 <    /**
69 <     * Equivalent to add(0, [any time unit], x).
70 <     **/
71 <    public boolean add(E x) {
72 <        return false;
73 <    }
74 <    /**
75 <     * Equivalent to add(0, [any time unit], x).
76 <     **/
77 <    public boolean offer(E x) {
78 <        return false;
25 >    public boolean offer(DelayEntry<E> x) {
26 >        lock.lock();
27 >        try {
28 >            DelayEntry<E> first = q.peek();
29 >            q.offer(x);
30 >            if (first == null || x.compareTo(first) < 0)
31 >                canTake.signalAll();
32 >            return true;
33 >        }
34 >        finally {
35 >            lock.unlock();
36 >        }
37 >    }
38 >
39 >    public void put(DelayEntry<E> x) {
40 >        offer(x);
41 >    }
42 >
43 >    public boolean offer(DelayEntry<E> x, long time, TimeUnit unit) {
44 >        return offer(x);
45 >    }
46 >
47 >    public boolean add(DelayEntry<E> x) {
48 >        return offer(x);
49 >    }
50 >
51 >    public DelayEntry<E> take() throws InterruptedException {
52 >        lock.lockInterruptibly();
53 >        try {
54 >            for (;;) {
55 >                DelayEntry first = q.peek();
56 >                if (first == null)
57 >                    canTake.await();
58 >                else {
59 >                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
60 >                    if (delay > 0)
61 >                        canTake.awaitNanos(delay);
62 >                    else {
63 >                        DelayEntry<E> x = q.poll();
64 >                        assert x != null;
65 >                        if (q.size() != 0)
66 >                            canTake.signalAll(); // wake up other takers
67 >                        return x;
68 >                        
69 >                    }
70 >                }
71 >            }
72 >        }
73 >        finally {
74 >            lock.unlock();
75 >        }
76 >    }
77 >
78 >    public DelayEntry<E> poll(long time, TimeUnit unit) throws InterruptedException {
79 >        lock.lockInterruptibly();
80 >        long nanos = unit.toNanos(time);
81 >        try {
82 >            for (;;) {
83 >                DelayEntry first = q.peek();
84 >                if (first == null) {
85 >                    if (nanos <= 0)
86 >                        return null;
87 >                    else
88 >                        nanos = canTake.awaitNanos(nanos);
89 >                }
90 >                else {
91 >                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
92 >                    if (delay > 0) {
93 >                        if (delay > nanos)
94 >                            delay = nanos;
95 >                        long timeLeft = canTake.awaitNanos(delay);
96 >                        nanos -= delay - timeLeft;
97 >                    }
98 >                    else {
99 >                        DelayEntry<E> x = q.poll();
100 >                        assert x != null;
101 >                        if (q.size() != 0)
102 >                            canTake.signalAll();
103 >                        return x;
104 >                    }
105 >                }
106 >            }
107 >        }
108 >        finally {
109 >            lock.unlock();
110 >        }
111 >    }
112 >
113 >
114 >    public DelayEntry<E> poll() {
115 >        lock.lock();
116 >        try {
117 >            DelayEntry first = q.peek();
118 >            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
119 >                return null;
120 >            else {
121 >                DelayEntry<E> x = q.poll();
122 >                assert x != null;
123 >                if (q.size() != 0)
124 >                    canTake.signalAll();
125 >                return x;
126 >            }
127 >        }
128 >        finally {
129 >            lock.unlock();
130 >        }
131 >    }
132 >
133 >    public DelayEntry<E> peek() {
134 >        lock.lock();
135 >        try {
136 >            return q.peek();
137 >        }
138 >        finally {
139 >            lock.unlock();
140 >        }
141      }
142  
143 <
144 <    public E take() throws InterruptedException {
145 <        return null;
146 <    }
147 <    public E remove() {
148 <        return null;
149 <    }
150 <    public Iterator<E> iterator() {
151 <      return null;
143 >    public int size() {
144 >        lock.lock();
145 >        try {
146 >            return q.size();
147 >        }
148 >        finally {
149 >            lock.unlock();
150 >        }
151 >    }
152 >
153 >    public void clear() {
154 >        lock.lock();
155 >        try {
156 >            q.clear();
157 >        }
158 >        finally {
159 >            lock.unlock();
160 >        }
161      }
162  
163 <    public boolean remove(Object x) {
164 <        return false;
84 <    }
85 <    public E element() {
86 <        return null;
87 <    }
88 <    public E poll() {
89 <        return null;
90 <    }
91 <    public E poll(long time, TimeUnit granularity) throws InterruptedException {
92 <        return null;
93 <    }
94 <    public E peek() {
95 <        return null;
96 <    }
97 <    public boolean isEmpty() {
98 <        return false;
99 <    }
100 <    public int size() {
101 <        return 0;
163 >    public int remainingCapacity() {
164 >        return Integer.MAX_VALUE;
165      }
166 +
167      public Object[] toArray() {
168 <        return null;
168 >        lock.lock();
169 >        try {
170 >            return q.toArray();
171 >        }
172 >        finally {
173 >            lock.unlock();
174 >        }
175      }
176  
177      public <T> T[] toArray(T[] array) {
178 <        return null;
178 >        lock.lock();
179 >        try {
180 >            return q.toArray(array);
181 >        }
182 >        finally {
183 >            lock.unlock();
184 >        }
185      }
186 +
187 +    public boolean remove(Object x) {
188 +        lock.lock();
189 +        try {
190 +            return q.remove(x);
191 +        }
192 +        finally {
193 +            lock.unlock();
194 +        }
195 +    }
196 +
197 +    public Iterator<DelayEntry<E>> iterator() {
198 +        lock.lock();
199 +        try {
200 +            return new Itr(q.iterator());
201 +        }
202 +        finally {
203 +            lock.unlock();
204 +        }
205 +    }
206 +
207 +    private class Itr<E> implements Iterator<DelayEntry<E>> {
208 +        private final Iterator<DelayEntry<E>> iter;
209 +        Itr(Iterator<DelayEntry<E>> i) {
210 +            iter = i;
211 +        }
212 +
213 +        public boolean hasNext() {
214 +            return iter.hasNext();
215 +        }
216 +        
217 +        public DelayEntry<E> next() {
218 +            lock.lock();
219 +            try {
220 +                return iter.next();
221 +            }
222 +            finally {
223 +                lock.unlock();
224 +            }
225 +        }
226 +        
227 +        public void remove() {
228 +            lock.lock();
229 +            try {
230 +                iter.remove();
231 +            }
232 +            finally {
233 +                lock.unlock();
234 +            }
235 +        }
236 +    }
237 +
238   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines