ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.37
Committed: Thu May 27 11:06:11 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.36: +9 -1 lines
Log Message:
Override javadoc specs when overriding AbstractQueue implementations
Clarify atomicity in BlockingQueue

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18     * <tt>null</tt> elements. A priority queue relying on natural
19 dl 1.24 * ordering also does not permit insertion of non-comparable objects
20     * (doing so results in <tt>ClassCastException</tt>).
21 dl 1.20 *
22 dl 1.23 * <p>This class implements all of the <em>optional</em> methods
23     * of the {@link Collection} and {@link Iterator} interfaces.
24 dl 1.20 * <p>The Iterator provided in method {@link #iterator()} is
25     * <em>not</em> guaranteed to traverse the elements of the
26     * PriorityBlockingQueue in any particular order. If you need ordered
27     * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28     *
29 dl 1.37 * <p>This class and its iterator implement all of the
30     * <em>optional</em> methods of the {@link Collection} and {@link
31     * Iterator} interfaces.
32     *
33 dl 1.35 * <p>This class is a member of the
34     * <a href="{@docRoot}/../guide/collections/index.html">
35     * Java Collections Framework</a>.
36     *
37 dl 1.6 * @since 1.5
38     * @author Doug Lea
39 dl 1.29 * @param <E> the type of elements held in this collection
40 dl 1.28 */
41 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
42 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
43 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
44 tim 1.1
45 dl 1.5 private final PriorityQueue<E> q;
46 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
47 dl 1.32 private final Condition notEmpty = lock.newCondition();
48 dl 1.5
49 dl 1.2 /**
50 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
51 dholmes 1.14 * capacity
52 dholmes 1.10 * (11) that orders its elements according to their natural
53 tim 1.18 * ordering (using <tt>Comparable</tt>).
54 dl 1.2 */
55     public PriorityBlockingQueue() {
56 dl 1.5 q = new PriorityQueue<E>();
57 dl 1.2 }
58    
59     /**
60 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
61 dholmes 1.10 * capacity
62     * that orders its elements according to their natural ordering
63 tim 1.18 * (using <tt>Comparable</tt>).
64 dl 1.2 *
65     * @param initialCapacity the initial capacity for this priority queue.
66 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
67     * than 1
68 dl 1.2 */
69     public PriorityBlockingQueue(int initialCapacity) {
70 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
71 dl 1.2 }
72    
73     /**
74 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
75 dholmes 1.10 * capacity
76 dl 1.2 * that orders its elements according to the specified comparator.
77     *
78     * @param initialCapacity the initial capacity for this priority queue.
79     * @param comparator the comparator used to order this priority queue.
80 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
81     * ordering.
82 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
83     * than 1
84 dl 1.2 */
85 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
86 dholmes 1.14 Comparator<? super E> comparator) {
87 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
88 dl 1.2 }
89    
90     /**
91 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
92 dl 1.15 * in the specified collection. The priority queue has an initial
93     * capacity of 110% of the size of the specified collection. If
94     * the specified collection is a {@link SortedSet} or a {@link
95     * PriorityQueue}, this priority queue will be sorted according to
96     * the same comparator, or according to its elements' natural
97     * order if the collection is sorted according to its elements'
98     * natural order. Otherwise, this priority queue is ordered
99     * according to its elements' natural order.
100 dl 1.2 *
101 dholmes 1.14 * @param c the collection whose elements are to be placed
102 dl 1.2 * into this priority queue.
103     * @throws ClassCastException if elements of the specified collection
104     * cannot be compared to one another according to the priority
105     * queue's ordering.
106 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
107     * is <tt>null</tt>
108 dl 1.2 */
109 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
110     q = new PriorityQueue<E>(c);
111 dl 1.7 }
112    
113 dholmes 1.10
114 dholmes 1.16 // these first few override just to update doc comments
115 dholmes 1.10
116     /**
117 dholmes 1.16 * Adds the specified element to this queue.
118 dl 1.34 * @param o the element to add
119 dholmes 1.16 * @return <tt>true</tt> (as per the general contract of
120     * <tt>Collection.add</tt>).
121     *
122 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
123 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
124     * with elements currently in the priority queue according
125     * to the priority queue's ordering.
126 dholmes 1.10 */
127 dholmes 1.16 public boolean add(E o) {
128     return super.add(o);
129 dholmes 1.10 }
130    
131 tim 1.13 /**
132 dholmes 1.16 * Returns the comparator used to order this collection, or <tt>null</tt>
133     * if this collection is sorted according to its elements natural ordering
134 tim 1.18 * (using <tt>Comparable</tt>).
135 dholmes 1.16 *
136     * @return the comparator used to order this collection, or <tt>null</tt>
137     * if this collection is sorted according to its elements natural ordering.
138     */
139 dl 1.36 public Comparator<? super E> comparator() {
140 dl 1.7 return q.comparator();
141 dl 1.5 }
142    
143 dholmes 1.16 /**
144 dl 1.24 * Inserts the specified element into this priority queue.
145 dholmes 1.16 *
146 dl 1.22 * @param o the element to add
147 dholmes 1.16 * @return <tt>true</tt>
148     * @throws ClassCastException if the specified element cannot be compared
149     * with elements currently in the priority queue according
150     * to the priority queue's ordering.
151 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
152 dholmes 1.16 */
153 dholmes 1.14 public boolean offer(E o) {
154     if (o == null) throw new NullPointerException();
155 dl 1.31 final ReentrantLock lock = this.lock;
156 dl 1.5 lock.lock();
157     try {
158 dholmes 1.14 boolean ok = q.offer(o);
159 dl 1.5 assert ok;
160     notEmpty.signal();
161     return true;
162 tim 1.19 } finally {
163 tim 1.13 lock.unlock();
164 dl 1.5 }
165     }
166    
167 dholmes 1.16 /**
168     * Adds the specified element to this priority queue. As the queue is
169     * unbounded this method will never block.
170 dl 1.22 * @param o the element to add
171 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
172     * with elements currently in the priority queue according
173     * to the priority queue's ordering.
174 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
175 dholmes 1.16 */
176     public void put(E o) {
177 dholmes 1.14 offer(o); // never need to block
178 dl 1.5 }
179    
180 dholmes 1.16 /**
181 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
182 dholmes 1.16 * unbounded this method will never block.
183 dl 1.22 * @param o the element to add
184 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
185     * @param unit This parameter is ignored as the method never blocks
186 dl 1.22 * @return <tt>true</tt>
187 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
188     * with elements currently in the priority queue according
189     * to the priority queue's ordering.
190 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
191 dholmes 1.16 */
192     public boolean offer(E o, long timeout, TimeUnit unit) {
193 dholmes 1.14 return offer(o); // never need to block
194 dl 1.5 }
195    
196     public E take() throws InterruptedException {
197 dl 1.31 final ReentrantLock lock = this.lock;
198 dl 1.5 lock.lockInterruptibly();
199     try {
200     try {
201     while (q.size() == 0)
202     notEmpty.await();
203 tim 1.19 } catch (InterruptedException ie) {
204 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
205     throw ie;
206     }
207     E x = q.poll();
208     assert x != null;
209     return x;
210 tim 1.19 } finally {
211 dl 1.5 lock.unlock();
212     }
213     }
214    
215    
216     public E poll() {
217 dl 1.31 final ReentrantLock lock = this.lock;
218 dl 1.5 lock.lock();
219     try {
220     return q.poll();
221 tim 1.19 } finally {
222 tim 1.13 lock.unlock();
223 dl 1.5 }
224     }
225    
226     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
227 dholmes 1.10 long nanos = unit.toNanos(timeout);
228 dl 1.31 final ReentrantLock lock = this.lock;
229 dl 1.5 lock.lockInterruptibly();
230     try {
231     for (;;) {
232     E x = q.poll();
233 tim 1.13 if (x != null)
234 dl 1.5 return x;
235     if (nanos <= 0)
236     return null;
237     try {
238     nanos = notEmpty.awaitNanos(nanos);
239 tim 1.19 } catch (InterruptedException ie) {
240 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
241     throw ie;
242     }
243     }
244 tim 1.19 } finally {
245 dl 1.5 lock.unlock();
246     }
247     }
248    
249     public E peek() {
250 dl 1.31 final ReentrantLock lock = this.lock;
251 dl 1.5 lock.lock();
252     try {
253     return q.peek();
254 tim 1.19 } finally {
255 tim 1.13 lock.unlock();
256 dl 1.5 }
257     }
258    
259     public int size() {
260 dl 1.31 final ReentrantLock lock = this.lock;
261 dl 1.5 lock.lock();
262     try {
263     return q.size();
264 tim 1.19 } finally {
265 dl 1.5 lock.unlock();
266     }
267     }
268    
269     /**
270     * Always returns <tt>Integer.MAX_VALUE</tt> because
271 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
272 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
273     */
274     public int remainingCapacity() {
275     return Integer.MAX_VALUE;
276     }
277    
278 dl 1.37 /**
279     * Removes a single instance of the specified element from this
280     * collection, if it is present.
281     */
282 dholmes 1.14 public boolean remove(Object o) {
283 dl 1.31 final ReentrantLock lock = this.lock;
284 dl 1.5 lock.lock();
285     try {
286 dholmes 1.14 return q.remove(o);
287 tim 1.19 } finally {
288 dl 1.5 lock.unlock();
289     }
290     }
291    
292 dholmes 1.14 public boolean contains(Object o) {
293 dl 1.31 final ReentrantLock lock = this.lock;
294 dl 1.5 lock.lock();
295     try {
296 dholmes 1.14 return q.contains(o);
297 tim 1.19 } finally {
298 dl 1.5 lock.unlock();
299     }
300     }
301    
302     public Object[] toArray() {
303 dl 1.31 final ReentrantLock lock = this.lock;
304 dl 1.5 lock.lock();
305     try {
306     return q.toArray();
307 tim 1.19 } finally {
308 dl 1.5 lock.unlock();
309     }
310     }
311    
312    
313     public String toString() {
314 dl 1.31 final ReentrantLock lock = this.lock;
315 dl 1.5 lock.lock();
316     try {
317     return q.toString();
318 tim 1.19 } finally {
319 dl 1.5 lock.unlock();
320     }
321     }
322    
323 dl 1.26 public int drainTo(Collection<? super E> c) {
324     if (c == null)
325     throw new NullPointerException();
326     if (c == this)
327     throw new IllegalArgumentException();
328 dl 1.31 final ReentrantLock lock = this.lock;
329 dl 1.26 lock.lock();
330     try {
331     int n = 0;
332     E e;
333     while ( (e = q.poll()) != null) {
334     c.add(e);
335     ++n;
336     }
337     return n;
338     } finally {
339     lock.unlock();
340     }
341     }
342    
343     public int drainTo(Collection<? super E> c, int maxElements) {
344     if (c == null)
345     throw new NullPointerException();
346     if (c == this)
347     throw new IllegalArgumentException();
348     if (maxElements <= 0)
349     return 0;
350 dl 1.31 final ReentrantLock lock = this.lock;
351 dl 1.26 lock.lock();
352     try {
353     int n = 0;
354     E e;
355     while (n < maxElements && (e = q.poll()) != null) {
356     c.add(e);
357     ++n;
358     }
359     return n;
360     } finally {
361     lock.unlock();
362     }
363     }
364    
365 dl 1.17 /**
366 dl 1.37 * Atomically removes all of the elements from this queue.
367 dl 1.17 * The queue will be empty after this call returns.
368     */
369     public void clear() {
370 dl 1.31 final ReentrantLock lock = this.lock;
371 dl 1.17 lock.lock();
372     try {
373     q.clear();
374 tim 1.19 } finally {
375 dl 1.17 lock.unlock();
376     }
377     }
378    
379 dl 1.5 public <T> T[] toArray(T[] a) {
380 dl 1.31 final ReentrantLock lock = this.lock;
381 dl 1.5 lock.lock();
382     try {
383     return q.toArray(a);
384 tim 1.19 } finally {
385 dl 1.5 lock.unlock();
386     }
387     }
388    
389 dholmes 1.16 /**
390 dl 1.23 * Returns an iterator over the elements in this queue. The
391     * iterator does not return the elements in any particular order.
392     * The returned iterator is a thread-safe "fast-fail" iterator
393     * that will throw {@link
394     * java.util.ConcurrentModificationException} upon detected
395     * interference.
396 dholmes 1.16 *
397     * @return an iterator over the elements in this queue.
398     */
399 dl 1.5 public Iterator<E> iterator() {
400 dl 1.31 final ReentrantLock lock = this.lock;
401 dl 1.5 lock.lock();
402     try {
403     return new Itr(q.iterator());
404 tim 1.19 } finally {
405 dl 1.5 lock.unlock();
406     }
407     }
408    
409     private class Itr<E> implements Iterator<E> {
410     private final Iterator<E> iter;
411 tim 1.13 Itr(Iterator<E> i) {
412     iter = i;
413 dl 1.5 }
414    
415 tim 1.13 public boolean hasNext() {
416 dl 1.5 /*
417     * No sync -- we rely on underlying hasNext to be
418     * stateless, in which case we can return true by mistake
419 dl 1.30 * only when next() will subsequently throw
420 dl 1.5 * ConcurrentModificationException.
421     */
422     return iter.hasNext();
423 tim 1.13 }
424    
425     public E next() {
426 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
427 dl 1.5 lock.lock();
428     try {
429     return iter.next();
430 tim 1.19 } finally {
431 dl 1.5 lock.unlock();
432     }
433 tim 1.13 }
434    
435     public void remove() {
436 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
437 dl 1.5 lock.lock();
438     try {
439     iter.remove();
440 tim 1.19 } finally {
441 dl 1.5 lock.unlock();
442     }
443 tim 1.13 }
444 dl 1.5 }
445    
446     /**
447     * Save the state to a stream (that is, serialize it). This
448     * merely wraps default serialization within lock. The
449     * serialization strategy for items is left to underlying
450     * Queue. Note that locking is not needed on deserialization, so
451     * readObject is not defined, just relying on default.
452     */
453     private void writeObject(java.io.ObjectOutputStream s)
454     throws java.io.IOException {
455     lock.lock();
456     try {
457     s.defaultWriteObject();
458 tim 1.19 } finally {
459 dl 1.5 lock.unlock();
460     }
461 tim 1.1 }
462    
463     }