ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.34
Committed: Tue Jan 20 04:35:02 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.33: +1 -0 lines
Log Message:
javadoc lint; Thread.interrupt shouldn't throw exception if thread dead

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18     * <tt>null</tt> elements. A priority queue relying on natural
19 dl 1.24 * ordering also does not permit insertion of non-comparable objects
20     * (doing so results in <tt>ClassCastException</tt>).
21 dl 1.20 *
22 dl 1.23 * <p>This class implements all of the <em>optional</em> methods
23     * of the {@link Collection} and {@link Iterator} interfaces.
24 dl 1.20 * <p>The Iterator provided in method {@link #iterator()} is
25     * <em>not</em> guaranteed to traverse the elements of the
26     * PriorityBlockingQueue in any particular order. If you need ordered
27     * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28     *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 dl 1.29 * @param <E> the type of elements held in this collection
32 dl 1.28 */
33 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
34 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
35 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
36 tim 1.1
37 dl 1.5 private final PriorityQueue<E> q;
38 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
39 dl 1.32 private final Condition notEmpty = lock.newCondition();
40 dl 1.5
41 dl 1.2 /**
42 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
43 dholmes 1.14 * capacity
44 dholmes 1.10 * (11) that orders its elements according to their natural
45 tim 1.18 * ordering (using <tt>Comparable</tt>).
46 dl 1.2 */
47     public PriorityBlockingQueue() {
48 dl 1.5 q = new PriorityQueue<E>();
49 dl 1.2 }
50    
51     /**
52 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
53 dholmes 1.10 * capacity
54     * that orders its elements according to their natural ordering
55 tim 1.18 * (using <tt>Comparable</tt>).
56 dl 1.2 *
57     * @param initialCapacity the initial capacity for this priority queue.
58 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
59     * than 1
60 dl 1.2 */
61     public PriorityBlockingQueue(int initialCapacity) {
62 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
63 dl 1.2 }
64    
65     /**
66 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
67 dholmes 1.10 * capacity
68 dl 1.2 * that orders its elements according to the specified comparator.
69     *
70     * @param initialCapacity the initial capacity for this priority queue.
71     * @param comparator the comparator used to order this priority queue.
72 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
73     * ordering.
74 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
75     * than 1
76 dl 1.2 */
77 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
78 dholmes 1.14 Comparator<? super E> comparator) {
79 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
80 dl 1.2 }
81    
82     /**
83 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
84 dl 1.15 * in the specified collection. The priority queue has an initial
85     * capacity of 110% of the size of the specified collection. If
86     * the specified collection is a {@link SortedSet} or a {@link
87     * PriorityQueue}, this priority queue will be sorted according to
88     * the same comparator, or according to its elements' natural
89     * order if the collection is sorted according to its elements'
90     * natural order. Otherwise, this priority queue is ordered
91     * according to its elements' natural order.
92 dl 1.2 *
93 dholmes 1.14 * @param c the collection whose elements are to be placed
94 dl 1.2 * into this priority queue.
95     * @throws ClassCastException if elements of the specified collection
96     * cannot be compared to one another according to the priority
97     * queue's ordering.
98 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
99     * is <tt>null</tt>
100 dl 1.2 */
101 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
102     q = new PriorityQueue<E>(c);
103 dl 1.7 }
104    
105 dholmes 1.10
106 dholmes 1.16 // these first few override just to update doc comments
107 dholmes 1.10
108     /**
109 dholmes 1.16 * Adds the specified element to this queue.
110 dl 1.34 * @param o the element to add
111 dholmes 1.16 * @return <tt>true</tt> (as per the general contract of
112     * <tt>Collection.add</tt>).
113     *
114 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
115 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
116     * with elements currently in the priority queue according
117     * to the priority queue's ordering.
118 dholmes 1.10 */
119 dholmes 1.16 public boolean add(E o) {
120     return super.add(o);
121 dholmes 1.10 }
122    
123 tim 1.13 /**
124 dholmes 1.16 * Returns the comparator used to order this collection, or <tt>null</tt>
125     * if this collection is sorted according to its elements natural ordering
126 tim 1.18 * (using <tt>Comparable</tt>).
127 dholmes 1.16 *
128     * @return the comparator used to order this collection, or <tt>null</tt>
129     * if this collection is sorted according to its elements natural ordering.
130     */
131 dl 1.7 public Comparator comparator() {
132     return q.comparator();
133 dl 1.5 }
134    
135 dholmes 1.16 /**
136 dl 1.24 * Inserts the specified element into this priority queue.
137 dholmes 1.16 *
138 dl 1.22 * @param o the element to add
139 dholmes 1.16 * @return <tt>true</tt>
140     * @throws ClassCastException if the specified element cannot be compared
141     * with elements currently in the priority queue according
142     * to the priority queue's ordering.
143 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
144 dholmes 1.16 */
145 dholmes 1.14 public boolean offer(E o) {
146     if (o == null) throw new NullPointerException();
147 dl 1.31 final ReentrantLock lock = this.lock;
148 dl 1.5 lock.lock();
149     try {
150 dholmes 1.14 boolean ok = q.offer(o);
151 dl 1.5 assert ok;
152     notEmpty.signal();
153     return true;
154 tim 1.19 } finally {
155 tim 1.13 lock.unlock();
156 dl 1.5 }
157     }
158    
159 dholmes 1.16 /**
160     * Adds the specified element to this priority queue. As the queue is
161     * unbounded this method will never block.
162 dl 1.22 * @param o the element to add
163 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
164     * with elements currently in the priority queue according
165     * to the priority queue's ordering.
166 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
167 dholmes 1.16 */
168     public void put(E o) {
169 dholmes 1.14 offer(o); // never need to block
170 dl 1.5 }
171    
172 dholmes 1.16 /**
173 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
174 dholmes 1.16 * unbounded this method will never block.
175 dl 1.22 * @param o the element to add
176 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
177     * @param unit This parameter is ignored as the method never blocks
178 dl 1.22 * @return <tt>true</tt>
179 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
180     * with elements currently in the priority queue according
181     * to the priority queue's ordering.
182 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
183 dholmes 1.16 */
184     public boolean offer(E o, long timeout, TimeUnit unit) {
185 dholmes 1.14 return offer(o); // never need to block
186 dl 1.5 }
187    
188     public E take() throws InterruptedException {
189 dl 1.31 final ReentrantLock lock = this.lock;
190 dl 1.5 lock.lockInterruptibly();
191     try {
192     try {
193     while (q.size() == 0)
194     notEmpty.await();
195 tim 1.19 } catch (InterruptedException ie) {
196 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
197     throw ie;
198     }
199     E x = q.poll();
200     assert x != null;
201     return x;
202 tim 1.19 } finally {
203 dl 1.5 lock.unlock();
204     }
205     }
206    
207    
208     public E poll() {
209 dl 1.31 final ReentrantLock lock = this.lock;
210 dl 1.5 lock.lock();
211     try {
212     return q.poll();
213 tim 1.19 } finally {
214 tim 1.13 lock.unlock();
215 dl 1.5 }
216     }
217    
218     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
219 dholmes 1.10 long nanos = unit.toNanos(timeout);
220 dl 1.31 final ReentrantLock lock = this.lock;
221 dl 1.5 lock.lockInterruptibly();
222     try {
223     for (;;) {
224     E x = q.poll();
225 tim 1.13 if (x != null)
226 dl 1.5 return x;
227     if (nanos <= 0)
228     return null;
229     try {
230     nanos = notEmpty.awaitNanos(nanos);
231 tim 1.19 } catch (InterruptedException ie) {
232 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
233     throw ie;
234     }
235     }
236 tim 1.19 } finally {
237 dl 1.5 lock.unlock();
238     }
239     }
240    
241     public E peek() {
242 dl 1.31 final ReentrantLock lock = this.lock;
243 dl 1.5 lock.lock();
244     try {
245     return q.peek();
246 tim 1.19 } finally {
247 tim 1.13 lock.unlock();
248 dl 1.5 }
249     }
250    
251     public int size() {
252 dl 1.31 final ReentrantLock lock = this.lock;
253 dl 1.5 lock.lock();
254     try {
255     return q.size();
256 tim 1.19 } finally {
257 dl 1.5 lock.unlock();
258     }
259     }
260    
261     /**
262     * Always returns <tt>Integer.MAX_VALUE</tt> because
263 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
264 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
265     */
266     public int remainingCapacity() {
267     return Integer.MAX_VALUE;
268     }
269    
270 dholmes 1.14 public boolean remove(Object o) {
271 dl 1.31 final ReentrantLock lock = this.lock;
272 dl 1.5 lock.lock();
273     try {
274 dholmes 1.14 return q.remove(o);
275 tim 1.19 } finally {
276 dl 1.5 lock.unlock();
277     }
278     }
279    
280 dholmes 1.14 public boolean contains(Object o) {
281 dl 1.31 final ReentrantLock lock = this.lock;
282 dl 1.5 lock.lock();
283     try {
284 dholmes 1.14 return q.contains(o);
285 tim 1.19 } finally {
286 dl 1.5 lock.unlock();
287     }
288     }
289    
290     public Object[] toArray() {
291 dl 1.31 final ReentrantLock lock = this.lock;
292 dl 1.5 lock.lock();
293     try {
294     return q.toArray();
295 tim 1.19 } finally {
296 dl 1.5 lock.unlock();
297     }
298     }
299    
300    
301     public String toString() {
302 dl 1.31 final ReentrantLock lock = this.lock;
303 dl 1.5 lock.lock();
304     try {
305     return q.toString();
306 tim 1.19 } finally {
307 dl 1.5 lock.unlock();
308     }
309     }
310    
311 dl 1.26 public int drainTo(Collection<? super E> c) {
312     if (c == null)
313     throw new NullPointerException();
314     if (c == this)
315     throw new IllegalArgumentException();
316 dl 1.31 final ReentrantLock lock = this.lock;
317 dl 1.26 lock.lock();
318     try {
319     int n = 0;
320     E e;
321     while ( (e = q.poll()) != null) {
322     c.add(e);
323     ++n;
324     }
325     return n;
326     } finally {
327     lock.unlock();
328     }
329     }
330    
331     public int drainTo(Collection<? super E> c, int maxElements) {
332     if (c == null)
333     throw new NullPointerException();
334     if (c == this)
335     throw new IllegalArgumentException();
336     if (maxElements <= 0)
337     return 0;
338 dl 1.31 final ReentrantLock lock = this.lock;
339 dl 1.26 lock.lock();
340     try {
341     int n = 0;
342     E e;
343     while (n < maxElements && (e = q.poll()) != null) {
344     c.add(e);
345     ++n;
346     }
347     return n;
348     } finally {
349     lock.unlock();
350     }
351     }
352    
353 dl 1.17 /**
354     * Atomically removes all of the elements from this delay queue.
355     * The queue will be empty after this call returns.
356     */
357     public void clear() {
358 dl 1.31 final ReentrantLock lock = this.lock;
359 dl 1.17 lock.lock();
360     try {
361     q.clear();
362 tim 1.19 } finally {
363 dl 1.17 lock.unlock();
364     }
365     }
366    
367 dl 1.5 public <T> T[] toArray(T[] a) {
368 dl 1.31 final ReentrantLock lock = this.lock;
369 dl 1.5 lock.lock();
370     try {
371     return q.toArray(a);
372 tim 1.19 } finally {
373 dl 1.5 lock.unlock();
374     }
375     }
376    
377 dholmes 1.16 /**
378 dl 1.23 * Returns an iterator over the elements in this queue. The
379     * iterator does not return the elements in any particular order.
380     * The returned iterator is a thread-safe "fast-fail" iterator
381     * that will throw {@link
382     * java.util.ConcurrentModificationException} upon detected
383     * interference.
384 dholmes 1.16 *
385     * @return an iterator over the elements in this queue.
386     */
387 dl 1.5 public Iterator<E> iterator() {
388 dl 1.31 final ReentrantLock lock = this.lock;
389 dl 1.5 lock.lock();
390     try {
391     return new Itr(q.iterator());
392 tim 1.19 } finally {
393 dl 1.5 lock.unlock();
394     }
395     }
396    
397     private class Itr<E> implements Iterator<E> {
398     private final Iterator<E> iter;
399 tim 1.13 Itr(Iterator<E> i) {
400     iter = i;
401 dl 1.5 }
402    
403 tim 1.13 public boolean hasNext() {
404 dl 1.5 /*
405     * No sync -- we rely on underlying hasNext to be
406     * stateless, in which case we can return true by mistake
407 dl 1.30 * only when next() will subsequently throw
408 dl 1.5 * ConcurrentModificationException.
409     */
410     return iter.hasNext();
411 tim 1.13 }
412    
413     public E next() {
414 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
415 dl 1.5 lock.lock();
416     try {
417     return iter.next();
418 tim 1.19 } finally {
419 dl 1.5 lock.unlock();
420     }
421 tim 1.13 }
422    
423     public void remove() {
424 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
425 dl 1.5 lock.lock();
426     try {
427     iter.remove();
428 tim 1.19 } finally {
429 dl 1.5 lock.unlock();
430     }
431 tim 1.13 }
432 dl 1.5 }
433    
434     /**
435     * Save the state to a stream (that is, serialize it). This
436     * merely wraps default serialization within lock. The
437     * serialization strategy for items is left to underlying
438     * Queue. Note that locking is not needed on deserialization, so
439     * readObject is not defined, just relying on default.
440     */
441     private void writeObject(java.io.ObjectOutputStream s)
442     throws java.io.IOException {
443     lock.lock();
444     try {
445     s.defaultWriteObject();
446 tim 1.19 } finally {
447 dl 1.5 lock.unlock();
448     }
449 tim 1.1 }
450    
451     }