ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.19
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.18: +18 -36 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} based on a
14 * {@link PriorityQueue},
15 * obeying its ordering rules and implementation characteristics.
16 * While this queue is logically unbounded,
17 * attempted additions may fail due to resource exhaustion (causing
18 * <tt>OutOfMemoryError</tt>).
19 * @since 1.5
20 * @author Doug Lea
21 */
22 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
23 implements BlockingQueue<E>, java.io.Serializable {
24
25 private final PriorityQueue<E> q;
26 private final ReentrantLock lock = new ReentrantLock(true);
27 private final Condition notEmpty = lock.newCondition();
28
29 /**
30 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
31 * capacity
32 * (11) that orders its elements according to their natural
33 * ordering (using <tt>Comparable</tt>).
34 */
35 public PriorityBlockingQueue() {
36 q = new PriorityQueue<E>();
37 }
38
39 /**
40 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
41 * capacity
42 * that orders its elements according to their natural ordering
43 * (using <tt>Comparable</tt>).
44 *
45 * @param initialCapacity the initial capacity for this priority queue.
46 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
47 * than 1
48 */
49 public PriorityBlockingQueue(int initialCapacity) {
50 q = new PriorityQueue<E>(initialCapacity, null);
51 }
52
53 /**
54 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
55 * capacity
56 * that orders its elements according to the specified comparator.
57 *
58 * @param initialCapacity the initial capacity for this priority queue.
59 * @param comparator the comparator used to order this priority queue.
60 * If <tt>null</tt> then the order depends on the elements' natural
61 * ordering.
62 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
63 * than 1
64 */
65 public PriorityBlockingQueue(int initialCapacity,
66 Comparator<? super E> comparator) {
67 q = new PriorityQueue<E>(initialCapacity, comparator);
68 }
69
70 /**
71 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
72 * in the specified collection. The priority queue has an initial
73 * capacity of 110% of the size of the specified collection. If
74 * the specified collection is a {@link SortedSet} or a {@link
75 * PriorityQueue}, this priority queue will be sorted according to
76 * the same comparator, or according to its elements' natural
77 * order if the collection is sorted according to its elements'
78 * natural order. Otherwise, this priority queue is ordered
79 * according to its elements' natural order.
80 *
81 * @param c the collection whose elements are to be placed
82 * into this priority queue.
83 * @throws ClassCastException if elements of the specified collection
84 * cannot be compared to one another according to the priority
85 * queue's ordering.
86 * @throws NullPointerException if <tt>c</tt> or any element within it
87 * is <tt>null</tt>
88 */
89 public PriorityBlockingQueue(Collection<? extends E> c) {
90 q = new PriorityQueue<E>(c);
91 }
92
93
94 // these first few override just to update doc comments
95
96 /**
97 * Adds the specified element to this queue.
98 * @return <tt>true</tt> (as per the general contract of
99 * <tt>Collection.add</tt>).
100 *
101 * @throws NullPointerException {@inheritDoc}
102 * @throws ClassCastException if the specified element cannot be compared
103 * with elements currently in the priority queue according
104 * to the priority queue's ordering.
105 */
106 public boolean add(E o) {
107 return super.add(o);
108 }
109
110 /**
111 * Adds all of the elements in the specified collection to this queue.
112 * The behavior of this operation is undefined if
113 * the specified collection is modified while the operation is in
114 * progress. (This implies that the behavior of this call is undefined if
115 * the specified collection is this queue, and this queue is nonempty.)
116 * <p>
117 * This implementation iterates over the specified collection, and adds
118 * each object returned by the iterator to this collection, in turn.
119 * @throws NullPointerException {@inheritDoc}
120 * @throws ClassCastException if any element cannot be compared
121 * with elements currently in the priority queue according
122 * to the priority queue's ordering.
123 */
124 public boolean addAll(Collection<? extends E> c) {
125 return super.addAll(c);
126 }
127
128 /**
129 * Returns the comparator used to order this collection, or <tt>null</tt>
130 * if this collection is sorted according to its elements natural ordering
131 * (using <tt>Comparable</tt>).
132 *
133 * @return the comparator used to order this collection, or <tt>null</tt>
134 * if this collection is sorted according to its elements natural ordering.
135 */
136 public Comparator comparator() {
137 return q.comparator();
138 }
139
140 /**
141 * Adds the specified element to this priority queue.
142 *
143 * @return <tt>true</tt>
144 * @throws ClassCastException if the specified element cannot be compared
145 * with elements currently in the priority queue according
146 * to the priority queue's ordering.
147 * @throws NullPointerException {@inheritDoc}
148 */
149 public boolean offer(E o) {
150 if (o == null) throw new NullPointerException();
151 lock.lock();
152 try {
153 boolean ok = q.offer(o);
154 assert ok;
155 notEmpty.signal();
156 return true;
157 } finally {
158 lock.unlock();
159 }
160 }
161
162 /**
163 * Adds the specified element to this priority queue. As the queue is
164 * unbounded this method will never block.
165 * @throws ClassCastException if the element cannot be compared
166 * with elements currently in the priority queue according
167 * to the priority queue's ordering.
168 * @throws NullPointerException {@inheritDoc}
169 */
170 public void put(E o) {
171 offer(o); // never need to block
172 }
173
174 /**
175 * Adds the specified element to this priority queue. As the queue is
176 * unbounded this method will never block.
177 * @param o {@inheritDoc}
178 * @param timeout This parameter is ignored as the method never blocks
179 * @param unit This parameter is ignored as the method never blocks
180 * @throws ClassCastException if the element cannot be compared
181 * with elements currently in the priority queue according
182 * to the priority queue's ordering.
183 * @throws NullPointerException {@inheritDoc}
184 * @return <tt>true</tt>
185 */
186 public boolean offer(E o, long timeout, TimeUnit unit) {
187 return offer(o); // never need to block
188 }
189
190 public E take() throws InterruptedException {
191 lock.lockInterruptibly();
192 try {
193 try {
194 while (q.size() == 0)
195 notEmpty.await();
196 } catch (InterruptedException ie) {
197 notEmpty.signal(); // propagate to non-interrupted thread
198 throw ie;
199 }
200 E x = q.poll();
201 assert x != null;
202 return x;
203 } finally {
204 lock.unlock();
205 }
206 }
207
208
209 public E poll() {
210 lock.lock();
211 try {
212 return q.poll();
213 } finally {
214 lock.unlock();
215 }
216 }
217
218 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
219 long nanos = unit.toNanos(timeout);
220 lock.lockInterruptibly();
221 try {
222 for (;;) {
223 E x = q.poll();
224 if (x != null)
225 return x;
226 if (nanos <= 0)
227 return null;
228 try {
229 nanos = notEmpty.awaitNanos(nanos);
230 } catch (InterruptedException ie) {
231 notEmpty.signal(); // propagate to non-interrupted thread
232 throw ie;
233 }
234 }
235 } finally {
236 lock.unlock();
237 }
238 }
239
240 public E peek() {
241 lock.lock();
242 try {
243 return q.peek();
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 public int size() {
250 lock.lock();
251 try {
252 return q.size();
253 } finally {
254 lock.unlock();
255 }
256 }
257
258 /**
259 * Always returns <tt>Integer.MAX_VALUE</tt> because
260 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
261 * @return <tt>Integer.MAX_VALUE</tt>
262 */
263 public int remainingCapacity() {
264 return Integer.MAX_VALUE;
265 }
266
267 /**
268 * Removes a single instance of the specified element from this
269 * queue, if it is present. More formally,
270 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
271 * o.equals(e))</tt>, if the queue contains one or more such
272 * elements. Returns <tt>true</tt> if the queue contained the
273 * specified element (or equivalently, if the queue changed as a
274 * result of the call).
275 *
276 * <p>This implementation iterates over the queue looking for the
277 * specified element. If it finds the element, it removes the element
278 * from the queue using the iterator's remove method.<p>
279 *
280 */
281 public boolean remove(Object o) {
282 lock.lock();
283 try {
284 return q.remove(o);
285 } finally {
286 lock.unlock();
287 }
288 }
289
290 public boolean contains(Object o) {
291 lock.lock();
292 try {
293 return q.contains(o);
294 } finally {
295 lock.unlock();
296 }
297 }
298
299 public Object[] toArray() {
300 lock.lock();
301 try {
302 return q.toArray();
303 } finally {
304 lock.unlock();
305 }
306 }
307
308
309 public String toString() {
310 lock.lock();
311 try {
312 return q.toString();
313 } finally {
314 lock.unlock();
315 }
316 }
317
318 /**
319 * Atomically removes all of the elements from this delay queue.
320 * The queue will be empty after this call returns.
321 */
322 public void clear() {
323 lock.lock();
324 try {
325 q.clear();
326 } finally {
327 lock.unlock();
328 }
329 }
330
331 public <T> T[] toArray(T[] a) {
332 lock.lock();
333 try {
334 return q.toArray(a);
335 } finally {
336 lock.unlock();
337 }
338 }
339
340 /**
341 * Returns an iterator over the elements in this queue. The iterator
342 * does not return the elements in any particular order.
343 * The
344 * returned iterator is a "fast-fail" iterator that will
345 * throw {@link java.util.ConcurrentModificationException}
346 * upon detected interference.
347 *
348 * @return an iterator over the elements in this queue.
349 */
350 public Iterator<E> iterator() {
351 lock.lock();
352 try {
353 return new Itr(q.iterator());
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 private class Itr<E> implements Iterator<E> {
360 private final Iterator<E> iter;
361 Itr(Iterator<E> i) {
362 iter = i;
363 }
364
365 public boolean hasNext() {
366 /*
367 * No sync -- we rely on underlying hasNext to be
368 * stateless, in which case we can return true by mistake
369 * only when next() willl subsequently throw
370 * ConcurrentModificationException.
371 */
372 return iter.hasNext();
373 }
374
375 public E next() {
376 lock.lock();
377 try {
378 return iter.next();
379 } finally {
380 lock.unlock();
381 }
382 }
383
384 public void remove() {
385 lock.lock();
386 try {
387 iter.remove();
388 } finally {
389 lock.unlock();
390 }
391 }
392 }
393
394 /**
395 * Save the state to a stream (that is, serialize it). This
396 * merely wraps default serialization within lock. The
397 * serialization strategy for items is left to underlying
398 * Queue. Note that locking is not needed on deserialization, so
399 * readObject is not defined, just relying on default.
400 */
401 private void writeObject(java.io.ObjectOutputStream s)
402 throws java.io.IOException {
403 lock.lock();
404 try {
405 s.defaultWriteObject();
406 } finally {
407 lock.unlock();
408 }
409 }
410
411 }