ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java (file contents):
Revision 1.4 by dl, Fri Jun 6 16:53:05 2003 UTC vs.
Revision 1.5 by dl, Sun Jun 22 21:47:17 2003 UTC

# Line 8 | Line 8 | package java.util.concurrent;
8   import java.util.*;
9  
10   /**
11 < * A blocking queue based on a {@link PriorityQueue}, obeying its
12 < * ordering rules and implementation characteristics.  The queue is
13 < * essentially unbounded; it blocks only on attempts to insert more
14 < * than <tt>Integer.MAX_VALUE</tt> untaken elements.
11 > * An unbounded blocking queue based on a {@link PriorityQueue},
12 > * obeying its ordering rules and implementation characteristics.
13   **/
14 < public class PriorityBlockingQueue<E> extends AbstractBlockingQueueFromQueue<E>
14 > public class PriorityBlockingQueue<E> extends AbstractQueue<E>
15          implements BlockingQueue<E>, java.io.Serializable {
16  
17 +    private final PriorityQueue<E> q;
18 +    private final FairReentrantLock lock = new FairReentrantLock();
19 +    private final Condition notEmpty = lock.newCondition();
20 +
21      /**
22       * Create a new priority queue with the default initial capacity (11)
23       * that orders its elements according to their natural ordering.
24       */
25      public PriorityBlockingQueue() {
26 <        super(new PriorityQueue<E>(), Integer.MAX_VALUE);
26 >        q = new PriorityQueue<E>();
27      }
28  
29      /**
# Line 31 | Line 33 | public class PriorityBlockingQueue<E> ex
33       * @param initialCapacity the initial capacity for this priority queue.
34       */
35      public PriorityBlockingQueue(int initialCapacity) {
36 <        super(new PriorityQueue<E>(initialCapacity, null), Integer.MAX_VALUE);
36 >        q = new PriorityQueue<E>(initialCapacity, null);
37      }
38  
39      /**
# Line 42 | Line 44 | public class PriorityBlockingQueue<E> ex
44       * @param comparator the comparator used to order this priority queue.
45       */
46      public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
47 <        super(new PriorityQueue<E>(initialCapacity, comparator), Integer.MAX_VALUE);
47 >        q = new PriorityQueue<E>(initialCapacity, comparator);
48      }
49  
50      /**
# Line 65 | Line 67 | public class PriorityBlockingQueue<E> ex
67       *         element of the specified collection is <tt>null</tt>.
68       */
69      public PriorityBlockingQueue(Collection<E> initialElements) {
70 <        super(new PriorityQueue<E>(initialElements), Integer.MAX_VALUE);
70 >        q = new PriorityQueue<E>(initialElements);
71 >    }
72 >
73 >    public boolean offer(E x) {
74 >        if (x == null) throw new IllegalArgumentException();
75 >        lock.lock();
76 >        try {
77 >            boolean ok = q.offer(x);
78 >            assert ok;
79 >            notEmpty.signal();
80 >            return true;
81 >        }
82 >        finally {
83 >            lock.unlock();
84 >        }
85 >    }
86 >
87 >    public void put(E x) throws InterruptedException {
88 >        offer(x); // never need to block
89 >    }
90 >
91 >    public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
92 >        return offer(x); // never need to block
93 >    }
94 >
95 >    public E take() throws InterruptedException {
96 >        lock.lockInterruptibly();
97 >        try {
98 >            try {
99 >                while (q.size() == 0)
100 >                    notEmpty.await();
101 >            }
102 >            catch (InterruptedException ie) {
103 >                notEmpty.signal(); // propagate to non-interrupted thread
104 >                throw ie;
105 >            }
106 >            E x = q.poll();
107 >            assert x != null;
108 >            return x;
109 >        }
110 >        finally {
111 >            lock.unlock();
112 >        }
113 >    }
114 >
115 >
116 >    public E poll() {
117 >        lock.lock();
118 >        try {
119 >            return q.poll();
120 >        }
121 >        finally {
122 >            lock.unlock();
123 >        }
124 >    }
125 >
126 >    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
127 >        lock.lockInterruptibly();
128 >        long nanos = unit.toNanos(timeout);
129 >        try {
130 >            for (;;) {
131 >                E x = q.poll();
132 >                if (x != null)
133 >                    return x;
134 >                if (nanos <= 0)
135 >                    return null;
136 >                try {
137 >                    nanos = notEmpty.awaitNanos(nanos);
138 >                }
139 >                catch (InterruptedException ie) {
140 >                    notEmpty.signal(); // propagate to non-interrupted thread
141 >                    throw ie;
142 >                }
143 >
144 >            }
145 >        }
146 >        finally {
147 >            lock.unlock();
148 >        }
149 >    }
150 >
151 >    public E peek() {
152 >        lock.lock();
153 >        try {
154 >            return q.peek();
155 >        }
156 >        finally {
157 >            lock.unlock();
158 >        }
159 >    }
160 >
161 >    public int size() {
162 >        lock.lock();
163 >        try {
164 >            return q.size();
165 >        }
166 >        finally {
167 >            lock.unlock();
168 >        }
169 >    }
170 >
171 >    /**
172 >     * Always returns <tt>Integer.MAX_VALUE</tt> because
173 >     * PriorityBlockingQueues are not capacity constrained.
174 >     * @return <tt>Integer.MAX_VALUE</tt>
175 >     */
176 >    public int remainingCapacity() {
177 >        return Integer.MAX_VALUE;
178 >    }
179 >
180 >    public boolean remove(Object x) {
181 >        lock.lock();
182 >        try {
183 >            return q.remove(x);
184 >        }
185 >        finally {
186 >            lock.unlock();
187 >        }
188 >    }
189 >
190 >    public boolean contains(Object x) {
191 >        lock.lock();
192 >        try {
193 >            return q.contains(x);
194 >        }
195 >        finally {
196 >            lock.unlock();
197 >        }
198 >    }
199 >
200 >    public Object[] toArray() {
201 >        lock.lock();
202 >        try {
203 >            return q.toArray();
204 >        }
205 >        finally {
206 >            lock.unlock();
207 >        }
208 >    }
209 >
210 >
211 >    public String toString() {
212 >        lock.lock();
213 >        try {
214 >            return q.toString();
215 >        }
216 >        finally {
217 >            lock.unlock();
218 >        }
219 >    }
220 >
221 >    public <T> T[] toArray(T[] a) {
222 >        lock.lock();
223 >        try {
224 >            return q.toArray(a);
225 >        }
226 >        finally {
227 >            lock.unlock();
228 >        }
229 >    }
230 >
231 >    public Iterator<E> iterator() {
232 >        lock.lock();
233 >        try {
234 >            return new Itr(q.iterator());
235 >        }
236 >        finally {
237 >            lock.unlock();
238 >        }
239 >    }
240 >
241 >    private class Itr<E> implements Iterator<E> {
242 >        private final Iterator<E> iter;
243 >        Itr(Iterator<E> i) {
244 >            iter = i;
245 >        }
246 >
247 >        public boolean hasNext() {
248 >            /*
249 >             * No sync -- we rely on underlying hasNext to be
250 >             * stateless, in which case we can return true by mistake
251 >             * only when next() willl subsequently throw
252 >             * ConcurrentModificationException.
253 >             */
254 >            return iter.hasNext();
255 >        }
256 >        
257 >        public E next() {
258 >            lock.lock();
259 >            try {
260 >                return iter.next();
261 >            }
262 >            finally {
263 >                lock.unlock();
264 >            }
265 >        }
266 >        
267 >        public void remove() {
268 >            lock.lock();
269 >            try {
270 >                iter.remove();
271 >            }
272 >            finally {
273 >                lock.unlock();
274 >            }
275 >        }
276 >    }
277 >
278 >    /**
279 >     * Save the state to a stream (that is, serialize it).  This
280 >     * merely wraps default serialization within lock.  The
281 >     * serialization strategy for items is left to underlying
282 >     * Queue. Note that locking is not needed on deserialization, so
283 >     * readObject is not defined, just relying on default.
284 >     */
285 >    private void writeObject(java.io.ObjectOutputStream s)
286 >        throws java.io.IOException {
287 >        lock.lock();
288 >        try {
289 >            s.defaultWriteObject();
290 >        }
291 >        finally {
292 >            lock.unlock();
293 >        }
294      }
295  
296   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines