ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.36
Committed: Sat Apr 10 14:24:36 2004 UTC (20 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.35: +1 -1 lines
Log Message:
Don't use raw type for Comparable

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on natural
19 * ordering also does not permit insertion of non-comparable objects
20 * (doing so results in <tt>ClassCastException</tt>).
21 *
22 * <p>This class implements all of the <em>optional</em> methods
23 * of the {@link Collection} and {@link Iterator} interfaces.
24 * <p>The Iterator provided in method {@link #iterator()} is
25 * <em>not</em> guaranteed to traverse the elements of the
26 * PriorityBlockingQueue in any particular order. If you need ordered
27 * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28 *
29 * <p>This class is a member of the
30 * <a href="{@docRoot}/../guide/collections/index.html">
31 * Java Collections Framework</a>.
32 *
33 * @since 1.5
34 * @author Doug Lea
35 * @param <E> the type of elements held in this collection
36 */
37 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
38 implements BlockingQueue<E>, java.io.Serializable {
39 private static final long serialVersionUID = 5595510919245408276L;
40
41 private final PriorityQueue<E> q;
42 private final ReentrantLock lock = new ReentrantLock(true);
43 private final Condition notEmpty = lock.newCondition();
44
45 /**
46 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
47 * capacity
48 * (11) that orders its elements according to their natural
49 * ordering (using <tt>Comparable</tt>).
50 */
51 public PriorityBlockingQueue() {
52 q = new PriorityQueue<E>();
53 }
54
55 /**
56 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
57 * capacity
58 * that orders its elements according to their natural ordering
59 * (using <tt>Comparable</tt>).
60 *
61 * @param initialCapacity the initial capacity for this priority queue.
62 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
63 * than 1
64 */
65 public PriorityBlockingQueue(int initialCapacity) {
66 q = new PriorityQueue<E>(initialCapacity, null);
67 }
68
69 /**
70 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
71 * capacity
72 * that orders its elements according to the specified comparator.
73 *
74 * @param initialCapacity the initial capacity for this priority queue.
75 * @param comparator the comparator used to order this priority queue.
76 * If <tt>null</tt> then the order depends on the elements' natural
77 * ordering.
78 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
79 * than 1
80 */
81 public PriorityBlockingQueue(int initialCapacity,
82 Comparator<? super E> comparator) {
83 q = new PriorityQueue<E>(initialCapacity, comparator);
84 }
85
86 /**
87 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
88 * in the specified collection. The priority queue has an initial
89 * capacity of 110% of the size of the specified collection. If
90 * the specified collection is a {@link SortedSet} or a {@link
91 * PriorityQueue}, this priority queue will be sorted according to
92 * the same comparator, or according to its elements' natural
93 * order if the collection is sorted according to its elements'
94 * natural order. Otherwise, this priority queue is ordered
95 * according to its elements' natural order.
96 *
97 * @param c the collection whose elements are to be placed
98 * into this priority queue.
99 * @throws ClassCastException if elements of the specified collection
100 * cannot be compared to one another according to the priority
101 * queue's ordering.
102 * @throws NullPointerException if <tt>c</tt> or any element within it
103 * is <tt>null</tt>
104 */
105 public PriorityBlockingQueue(Collection<? extends E> c) {
106 q = new PriorityQueue<E>(c);
107 }
108
109
110 // these first few override just to update doc comments
111
112 /**
113 * Adds the specified element to this queue.
114 * @param o the element to add
115 * @return <tt>true</tt> (as per the general contract of
116 * <tt>Collection.add</tt>).
117 *
118 * @throws NullPointerException if the specified element is <tt>null</tt>.
119 * @throws ClassCastException if the specified element cannot be compared
120 * with elements currently in the priority queue according
121 * to the priority queue's ordering.
122 */
123 public boolean add(E o) {
124 return super.add(o);
125 }
126
127 /**
128 * Returns the comparator used to order this collection, or <tt>null</tt>
129 * if this collection is sorted according to its elements natural ordering
130 * (using <tt>Comparable</tt>).
131 *
132 * @return the comparator used to order this collection, or <tt>null</tt>
133 * if this collection is sorted according to its elements natural ordering.
134 */
135 public Comparator<? super E> comparator() {
136 return q.comparator();
137 }
138
139 /**
140 * Inserts the specified element into this priority queue.
141 *
142 * @param o the element to add
143 * @return <tt>true</tt>
144 * @throws ClassCastException if the specified element cannot be compared
145 * with elements currently in the priority queue according
146 * to the priority queue's ordering.
147 * @throws NullPointerException if the specified element is <tt>null</tt>.
148 */
149 public boolean offer(E o) {
150 if (o == null) throw new NullPointerException();
151 final ReentrantLock lock = this.lock;
152 lock.lock();
153 try {
154 boolean ok = q.offer(o);
155 assert ok;
156 notEmpty.signal();
157 return true;
158 } finally {
159 lock.unlock();
160 }
161 }
162
163 /**
164 * Adds the specified element to this priority queue. As the queue is
165 * unbounded this method will never block.
166 * @param o the element to add
167 * @throws ClassCastException if the element cannot be compared
168 * with elements currently in the priority queue according
169 * to the priority queue's ordering.
170 * @throws NullPointerException if the specified element is <tt>null</tt>.
171 */
172 public void put(E o) {
173 offer(o); // never need to block
174 }
175
176 /**
177 * Inserts the specified element into this priority queue. As the queue is
178 * unbounded this method will never block.
179 * @param o the element to add
180 * @param timeout This parameter is ignored as the method never blocks
181 * @param unit This parameter is ignored as the method never blocks
182 * @return <tt>true</tt>
183 * @throws ClassCastException if the element cannot be compared
184 * with elements currently in the priority queue according
185 * to the priority queue's ordering.
186 * @throws NullPointerException if the specified element is <tt>null</tt>.
187 */
188 public boolean offer(E o, long timeout, TimeUnit unit) {
189 return offer(o); // never need to block
190 }
191
192 public E take() throws InterruptedException {
193 final ReentrantLock lock = this.lock;
194 lock.lockInterruptibly();
195 try {
196 try {
197 while (q.size() == 0)
198 notEmpty.await();
199 } catch (InterruptedException ie) {
200 notEmpty.signal(); // propagate to non-interrupted thread
201 throw ie;
202 }
203 E x = q.poll();
204 assert x != null;
205 return x;
206 } finally {
207 lock.unlock();
208 }
209 }
210
211
212 public E poll() {
213 final ReentrantLock lock = this.lock;
214 lock.lock();
215 try {
216 return q.poll();
217 } finally {
218 lock.unlock();
219 }
220 }
221
222 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
223 long nanos = unit.toNanos(timeout);
224 final ReentrantLock lock = this.lock;
225 lock.lockInterruptibly();
226 try {
227 for (;;) {
228 E x = q.poll();
229 if (x != null)
230 return x;
231 if (nanos <= 0)
232 return null;
233 try {
234 nanos = notEmpty.awaitNanos(nanos);
235 } catch (InterruptedException ie) {
236 notEmpty.signal(); // propagate to non-interrupted thread
237 throw ie;
238 }
239 }
240 } finally {
241 lock.unlock();
242 }
243 }
244
245 public E peek() {
246 final ReentrantLock lock = this.lock;
247 lock.lock();
248 try {
249 return q.peek();
250 } finally {
251 lock.unlock();
252 }
253 }
254
255 public int size() {
256 final ReentrantLock lock = this.lock;
257 lock.lock();
258 try {
259 return q.size();
260 } finally {
261 lock.unlock();
262 }
263 }
264
265 /**
266 * Always returns <tt>Integer.MAX_VALUE</tt> because
267 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
268 * @return <tt>Integer.MAX_VALUE</tt>
269 */
270 public int remainingCapacity() {
271 return Integer.MAX_VALUE;
272 }
273
274 public boolean remove(Object o) {
275 final ReentrantLock lock = this.lock;
276 lock.lock();
277 try {
278 return q.remove(o);
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public boolean contains(Object o) {
285 final ReentrantLock lock = this.lock;
286 lock.lock();
287 try {
288 return q.contains(o);
289 } finally {
290 lock.unlock();
291 }
292 }
293
294 public Object[] toArray() {
295 final ReentrantLock lock = this.lock;
296 lock.lock();
297 try {
298 return q.toArray();
299 } finally {
300 lock.unlock();
301 }
302 }
303
304
305 public String toString() {
306 final ReentrantLock lock = this.lock;
307 lock.lock();
308 try {
309 return q.toString();
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 public int drainTo(Collection<? super E> c) {
316 if (c == null)
317 throw new NullPointerException();
318 if (c == this)
319 throw new IllegalArgumentException();
320 final ReentrantLock lock = this.lock;
321 lock.lock();
322 try {
323 int n = 0;
324 E e;
325 while ( (e = q.poll()) != null) {
326 c.add(e);
327 ++n;
328 }
329 return n;
330 } finally {
331 lock.unlock();
332 }
333 }
334
335 public int drainTo(Collection<? super E> c, int maxElements) {
336 if (c == null)
337 throw new NullPointerException();
338 if (c == this)
339 throw new IllegalArgumentException();
340 if (maxElements <= 0)
341 return 0;
342 final ReentrantLock lock = this.lock;
343 lock.lock();
344 try {
345 int n = 0;
346 E e;
347 while (n < maxElements && (e = q.poll()) != null) {
348 c.add(e);
349 ++n;
350 }
351 return n;
352 } finally {
353 lock.unlock();
354 }
355 }
356
357 /**
358 * Atomically removes all of the elements from this delay queue.
359 * The queue will be empty after this call returns.
360 */
361 public void clear() {
362 final ReentrantLock lock = this.lock;
363 lock.lock();
364 try {
365 q.clear();
366 } finally {
367 lock.unlock();
368 }
369 }
370
371 public <T> T[] toArray(T[] a) {
372 final ReentrantLock lock = this.lock;
373 lock.lock();
374 try {
375 return q.toArray(a);
376 } finally {
377 lock.unlock();
378 }
379 }
380
381 /**
382 * Returns an iterator over the elements in this queue. The
383 * iterator does not return the elements in any particular order.
384 * The returned iterator is a thread-safe "fast-fail" iterator
385 * that will throw {@link
386 * java.util.ConcurrentModificationException} upon detected
387 * interference.
388 *
389 * @return an iterator over the elements in this queue.
390 */
391 public Iterator<E> iterator() {
392 final ReentrantLock lock = this.lock;
393 lock.lock();
394 try {
395 return new Itr(q.iterator());
396 } finally {
397 lock.unlock();
398 }
399 }
400
401 private class Itr<E> implements Iterator<E> {
402 private final Iterator<E> iter;
403 Itr(Iterator<E> i) {
404 iter = i;
405 }
406
407 public boolean hasNext() {
408 /*
409 * No sync -- we rely on underlying hasNext to be
410 * stateless, in which case we can return true by mistake
411 * only when next() will subsequently throw
412 * ConcurrentModificationException.
413 */
414 return iter.hasNext();
415 }
416
417 public E next() {
418 ReentrantLock lock = PriorityBlockingQueue.this.lock;
419 lock.lock();
420 try {
421 return iter.next();
422 } finally {
423 lock.unlock();
424 }
425 }
426
427 public void remove() {
428 ReentrantLock lock = PriorityBlockingQueue.this.lock;
429 lock.lock();
430 try {
431 iter.remove();
432 } finally {
433 lock.unlock();
434 }
435 }
436 }
437
438 /**
439 * Save the state to a stream (that is, serialize it). This
440 * merely wraps default serialization within lock. The
441 * serialization strategy for items is left to underlying
442 * Queue. Note that locking is not needed on deserialization, so
443 * readObject is not defined, just relying on default.
444 */
445 private void writeObject(java.io.ObjectOutputStream s)
446 throws java.io.IOException {
447 lock.lock();
448 try {
449 s.defaultWriteObject();
450 } finally {
451 lock.unlock();
452 }
453 }
454
455 }