ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.35
Committed: Tue Jan 27 11:36:31 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PFD
Changes since 1.34: +4 -0 lines
Log Message:
Add Collection framework membership doc

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18     * <tt>null</tt> elements. A priority queue relying on natural
19 dl 1.24 * ordering also does not permit insertion of non-comparable objects
20     * (doing so results in <tt>ClassCastException</tt>).
21 dl 1.20 *
22 dl 1.23 * <p>This class implements all of the <em>optional</em> methods
23     * of the {@link Collection} and {@link Iterator} interfaces.
24 dl 1.20 * <p>The Iterator provided in method {@link #iterator()} is
25     * <em>not</em> guaranteed to traverse the elements of the
26     * PriorityBlockingQueue in any particular order. If you need ordered
27     * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28     *
29 dl 1.35 * <p>This class is a member of the
30     * <a href="{@docRoot}/../guide/collections/index.html">
31     * Java Collections Framework</a>.
32     *
33 dl 1.6 * @since 1.5
34     * @author Doug Lea
35 dl 1.29 * @param <E> the type of elements held in this collection
36 dl 1.28 */
37 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
38 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
39 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
40 tim 1.1
41 dl 1.5 private final PriorityQueue<E> q;
42 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
43 dl 1.32 private final Condition notEmpty = lock.newCondition();
44 dl 1.5
45 dl 1.2 /**
46 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
47 dholmes 1.14 * capacity
48 dholmes 1.10 * (11) that orders its elements according to their natural
49 tim 1.18 * ordering (using <tt>Comparable</tt>).
50 dl 1.2 */
51     public PriorityBlockingQueue() {
52 dl 1.5 q = new PriorityQueue<E>();
53 dl 1.2 }
54    
55     /**
56 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
57 dholmes 1.10 * capacity
58     * that orders its elements according to their natural ordering
59 tim 1.18 * (using <tt>Comparable</tt>).
60 dl 1.2 *
61     * @param initialCapacity the initial capacity for this priority queue.
62 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
63     * than 1
64 dl 1.2 */
65     public PriorityBlockingQueue(int initialCapacity) {
66 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
67 dl 1.2 }
68    
69     /**
70 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
71 dholmes 1.10 * capacity
72 dl 1.2 * that orders its elements according to the specified comparator.
73     *
74     * @param initialCapacity the initial capacity for this priority queue.
75     * @param comparator the comparator used to order this priority queue.
76 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
77     * ordering.
78 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
79     * than 1
80 dl 1.2 */
81 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
82 dholmes 1.14 Comparator<? super E> comparator) {
83 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
84 dl 1.2 }
85    
86     /**
87 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
88 dl 1.15 * in the specified collection. The priority queue has an initial
89     * capacity of 110% of the size of the specified collection. If
90     * the specified collection is a {@link SortedSet} or a {@link
91     * PriorityQueue}, this priority queue will be sorted according to
92     * the same comparator, or according to its elements' natural
93     * order if the collection is sorted according to its elements'
94     * natural order. Otherwise, this priority queue is ordered
95     * according to its elements' natural order.
96 dl 1.2 *
97 dholmes 1.14 * @param c the collection whose elements are to be placed
98 dl 1.2 * into this priority queue.
99     * @throws ClassCastException if elements of the specified collection
100     * cannot be compared to one another according to the priority
101     * queue's ordering.
102 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
103     * is <tt>null</tt>
104 dl 1.2 */
105 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
106     q = new PriorityQueue<E>(c);
107 dl 1.7 }
108    
109 dholmes 1.10
110 dholmes 1.16 // these first few override just to update doc comments
111 dholmes 1.10
112     /**
113 dholmes 1.16 * Adds the specified element to this queue.
114 dl 1.34 * @param o the element to add
115 dholmes 1.16 * @return <tt>true</tt> (as per the general contract of
116     * <tt>Collection.add</tt>).
117     *
118 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
119 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
120     * with elements currently in the priority queue according
121     * to the priority queue's ordering.
122 dholmes 1.10 */
123 dholmes 1.16 public boolean add(E o) {
124     return super.add(o);
125 dholmes 1.10 }
126    
127 tim 1.13 /**
128 dholmes 1.16 * Returns the comparator used to order this collection, or <tt>null</tt>
129     * if this collection is sorted according to its elements natural ordering
130 tim 1.18 * (using <tt>Comparable</tt>).
131 dholmes 1.16 *
132     * @return the comparator used to order this collection, or <tt>null</tt>
133     * if this collection is sorted according to its elements natural ordering.
134     */
135 dl 1.7 public Comparator comparator() {
136     return q.comparator();
137 dl 1.5 }
138    
139 dholmes 1.16 /**
140 dl 1.24 * Inserts the specified element into this priority queue.
141 dholmes 1.16 *
142 dl 1.22 * @param o the element to add
143 dholmes 1.16 * @return <tt>true</tt>
144     * @throws ClassCastException if the specified element cannot be compared
145     * with elements currently in the priority queue according
146     * to the priority queue's ordering.
147 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
148 dholmes 1.16 */
149 dholmes 1.14 public boolean offer(E o) {
150     if (o == null) throw new NullPointerException();
151 dl 1.31 final ReentrantLock lock = this.lock;
152 dl 1.5 lock.lock();
153     try {
154 dholmes 1.14 boolean ok = q.offer(o);
155 dl 1.5 assert ok;
156     notEmpty.signal();
157     return true;
158 tim 1.19 } finally {
159 tim 1.13 lock.unlock();
160 dl 1.5 }
161     }
162    
163 dholmes 1.16 /**
164     * Adds the specified element to this priority queue. As the queue is
165     * unbounded this method will never block.
166 dl 1.22 * @param o the element to add
167 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
168     * with elements currently in the priority queue according
169     * to the priority queue's ordering.
170 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
171 dholmes 1.16 */
172     public void put(E o) {
173 dholmes 1.14 offer(o); // never need to block
174 dl 1.5 }
175    
176 dholmes 1.16 /**
177 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
178 dholmes 1.16 * unbounded this method will never block.
179 dl 1.22 * @param o the element to add
180 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
181     * @param unit This parameter is ignored as the method never blocks
182 dl 1.22 * @return <tt>true</tt>
183 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
184     * with elements currently in the priority queue according
185     * to the priority queue's ordering.
186 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
187 dholmes 1.16 */
188     public boolean offer(E o, long timeout, TimeUnit unit) {
189 dholmes 1.14 return offer(o); // never need to block
190 dl 1.5 }
191    
192     public E take() throws InterruptedException {
193 dl 1.31 final ReentrantLock lock = this.lock;
194 dl 1.5 lock.lockInterruptibly();
195     try {
196     try {
197     while (q.size() == 0)
198     notEmpty.await();
199 tim 1.19 } catch (InterruptedException ie) {
200 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
201     throw ie;
202     }
203     E x = q.poll();
204     assert x != null;
205     return x;
206 tim 1.19 } finally {
207 dl 1.5 lock.unlock();
208     }
209     }
210    
211    
212     public E poll() {
213 dl 1.31 final ReentrantLock lock = this.lock;
214 dl 1.5 lock.lock();
215     try {
216     return q.poll();
217 tim 1.19 } finally {
218 tim 1.13 lock.unlock();
219 dl 1.5 }
220     }
221    
222     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
223 dholmes 1.10 long nanos = unit.toNanos(timeout);
224 dl 1.31 final ReentrantLock lock = this.lock;
225 dl 1.5 lock.lockInterruptibly();
226     try {
227     for (;;) {
228     E x = q.poll();
229 tim 1.13 if (x != null)
230 dl 1.5 return x;
231     if (nanos <= 0)
232     return null;
233     try {
234     nanos = notEmpty.awaitNanos(nanos);
235 tim 1.19 } catch (InterruptedException ie) {
236 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
237     throw ie;
238     }
239     }
240 tim 1.19 } finally {
241 dl 1.5 lock.unlock();
242     }
243     }
244    
245     public E peek() {
246 dl 1.31 final ReentrantLock lock = this.lock;
247 dl 1.5 lock.lock();
248     try {
249     return q.peek();
250 tim 1.19 } finally {
251 tim 1.13 lock.unlock();
252 dl 1.5 }
253     }
254    
255     public int size() {
256 dl 1.31 final ReentrantLock lock = this.lock;
257 dl 1.5 lock.lock();
258     try {
259     return q.size();
260 tim 1.19 } finally {
261 dl 1.5 lock.unlock();
262     }
263     }
264    
265     /**
266     * Always returns <tt>Integer.MAX_VALUE</tt> because
267 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
268 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
269     */
270     public int remainingCapacity() {
271     return Integer.MAX_VALUE;
272     }
273    
274 dholmes 1.14 public boolean remove(Object o) {
275 dl 1.31 final ReentrantLock lock = this.lock;
276 dl 1.5 lock.lock();
277     try {
278 dholmes 1.14 return q.remove(o);
279 tim 1.19 } finally {
280 dl 1.5 lock.unlock();
281     }
282     }
283    
284 dholmes 1.14 public boolean contains(Object o) {
285 dl 1.31 final ReentrantLock lock = this.lock;
286 dl 1.5 lock.lock();
287     try {
288 dholmes 1.14 return q.contains(o);
289 tim 1.19 } finally {
290 dl 1.5 lock.unlock();
291     }
292     }
293    
294     public Object[] toArray() {
295 dl 1.31 final ReentrantLock lock = this.lock;
296 dl 1.5 lock.lock();
297     try {
298     return q.toArray();
299 tim 1.19 } finally {
300 dl 1.5 lock.unlock();
301     }
302     }
303    
304    
305     public String toString() {
306 dl 1.31 final ReentrantLock lock = this.lock;
307 dl 1.5 lock.lock();
308     try {
309     return q.toString();
310 tim 1.19 } finally {
311 dl 1.5 lock.unlock();
312     }
313     }
314    
315 dl 1.26 public int drainTo(Collection<? super E> c) {
316     if (c == null)
317     throw new NullPointerException();
318     if (c == this)
319     throw new IllegalArgumentException();
320 dl 1.31 final ReentrantLock lock = this.lock;
321 dl 1.26 lock.lock();
322     try {
323     int n = 0;
324     E e;
325     while ( (e = q.poll()) != null) {
326     c.add(e);
327     ++n;
328     }
329     return n;
330     } finally {
331     lock.unlock();
332     }
333     }
334    
335     public int drainTo(Collection<? super E> c, int maxElements) {
336     if (c == null)
337     throw new NullPointerException();
338     if (c == this)
339     throw new IllegalArgumentException();
340     if (maxElements <= 0)
341     return 0;
342 dl 1.31 final ReentrantLock lock = this.lock;
343 dl 1.26 lock.lock();
344     try {
345     int n = 0;
346     E e;
347     while (n < maxElements && (e = q.poll()) != null) {
348     c.add(e);
349     ++n;
350     }
351     return n;
352     } finally {
353     lock.unlock();
354     }
355     }
356    
357 dl 1.17 /**
358     * Atomically removes all of the elements from this delay queue.
359     * The queue will be empty after this call returns.
360     */
361     public void clear() {
362 dl 1.31 final ReentrantLock lock = this.lock;
363 dl 1.17 lock.lock();
364     try {
365     q.clear();
366 tim 1.19 } finally {
367 dl 1.17 lock.unlock();
368     }
369     }
370    
371 dl 1.5 public <T> T[] toArray(T[] a) {
372 dl 1.31 final ReentrantLock lock = this.lock;
373 dl 1.5 lock.lock();
374     try {
375     return q.toArray(a);
376 tim 1.19 } finally {
377 dl 1.5 lock.unlock();
378     }
379     }
380    
381 dholmes 1.16 /**
382 dl 1.23 * Returns an iterator over the elements in this queue. The
383     * iterator does not return the elements in any particular order.
384     * The returned iterator is a thread-safe "fast-fail" iterator
385     * that will throw {@link
386     * java.util.ConcurrentModificationException} upon detected
387     * interference.
388 dholmes 1.16 *
389     * @return an iterator over the elements in this queue.
390     */
391 dl 1.5 public Iterator<E> iterator() {
392 dl 1.31 final ReentrantLock lock = this.lock;
393 dl 1.5 lock.lock();
394     try {
395     return new Itr(q.iterator());
396 tim 1.19 } finally {
397 dl 1.5 lock.unlock();
398     }
399     }
400    
401     private class Itr<E> implements Iterator<E> {
402     private final Iterator<E> iter;
403 tim 1.13 Itr(Iterator<E> i) {
404     iter = i;
405 dl 1.5 }
406    
407 tim 1.13 public boolean hasNext() {
408 dl 1.5 /*
409     * No sync -- we rely on underlying hasNext to be
410     * stateless, in which case we can return true by mistake
411 dl 1.30 * only when next() will subsequently throw
412 dl 1.5 * ConcurrentModificationException.
413     */
414     return iter.hasNext();
415 tim 1.13 }
416    
417     public E next() {
418 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
419 dl 1.5 lock.lock();
420     try {
421     return iter.next();
422 tim 1.19 } finally {
423 dl 1.5 lock.unlock();
424     }
425 tim 1.13 }
426    
427     public void remove() {
428 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
429 dl 1.5 lock.lock();
430     try {
431     iter.remove();
432 tim 1.19 } finally {
433 dl 1.5 lock.unlock();
434     }
435 tim 1.13 }
436 dl 1.5 }
437    
438     /**
439     * Save the state to a stream (that is, serialize it). This
440     * merely wraps default serialization within lock. The
441     * serialization strategy for items is left to underlying
442     * Queue. Note that locking is not needed on deserialization, so
443     * readObject is not defined, just relying on default.
444     */
445     private void writeObject(java.io.ObjectOutputStream s)
446     throws java.io.IOException {
447     lock.lock();
448     try {
449     s.defaultWriteObject();
450 tim 1.19 } finally {
451 dl 1.5 lock.unlock();
452     }
453 tim 1.1 }
454    
455     }