ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.39
Committed: Tue Apr 26 01:43:01 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.38: +19 -20 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18     * <tt>null</tt> elements. A priority queue relying on natural
19 dl 1.24 * ordering also does not permit insertion of non-comparable objects
20     * (doing so results in <tt>ClassCastException</tt>).
21 dl 1.20 *
22 dl 1.38 * <p>This class and its iterator implement all of the
23     * <em>optional</em> methods of the {@link Collection} and {@link
24 jsr166 1.39 * Iterator} interfaces.
25 dl 1.38 * The Iterator provided in method {@link #iterator()} is
26 dl 1.20 * <em>not</em> guaranteed to traverse the elements of the
27     * PriorityBlockingQueue in any particular order. If you need ordered
28     * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
29     *
30 dl 1.35 * <p>This class is a member of the
31     * <a href="{@docRoot}/../guide/collections/index.html">
32     * Java Collections Framework</a>.
33     *
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.29 * @param <E> the type of elements held in this collection
37 dl 1.28 */
38 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
39 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
41 tim 1.1
42 dl 1.5 private final PriorityQueue<E> q;
43 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
44 dl 1.32 private final Condition notEmpty = lock.newCondition();
45 dl 1.5
46 dl 1.2 /**
47 jsr166 1.39 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
48 dholmes 1.14 * capacity
49 dholmes 1.10 * (11) that orders its elements according to their natural
50 tim 1.18 * ordering (using <tt>Comparable</tt>).
51 dl 1.2 */
52     public PriorityBlockingQueue() {
53 dl 1.5 q = new PriorityQueue<E>();
54 dl 1.2 }
55    
56     /**
57 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
58 jsr166 1.39 * capacity that orders its elements according to their {@linkplain
59     * Comparable natural ordering}.
60 dl 1.2 *
61     * @param initialCapacity the initial capacity for this priority queue.
62 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
63     * than 1
64 dl 1.2 */
65     public PriorityBlockingQueue(int initialCapacity) {
66 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
67 dl 1.2 }
68    
69     /**
70 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
71 jsr166 1.39 * capacity that orders its elements according to the specified
72     * comparator.
73 dl 1.2 *
74     * @param initialCapacity the initial capacity for this priority queue.
75     * @param comparator the comparator used to order this priority queue.
76 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
77     * ordering.
78 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
79     * than 1
80 dl 1.2 */
81 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
82 dholmes 1.14 Comparator<? super E> comparator) {
83 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
84 dl 1.2 }
85    
86     /**
87 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
88 dl 1.15 * in the specified collection. The priority queue has an initial
89 jsr166 1.39 * capacity of 110% of the size of the specified collection. If
90 dl 1.15 * the specified collection is a {@link SortedSet} or a {@link
91     * PriorityQueue}, this priority queue will be sorted according to
92 jsr166 1.39 * the same comparator, or according to the natural ordering of its
93     * elements if the collection is sorted according to the natural
94     * ordering of its elements. Otherwise, this priority queue is
95     * ordered according to the natural ordering of its elements.
96 dl 1.2 *
97 dholmes 1.14 * @param c the collection whose elements are to be placed
98 dl 1.2 * into this priority queue.
99     * @throws ClassCastException if elements of the specified collection
100     * cannot be compared to one another according to the priority
101     * queue's ordering.
102 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
103     * is <tt>null</tt>
104 dl 1.2 */
105 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
106     q = new PriorityQueue<E>(c);
107 dl 1.7 }
108    
109 dholmes 1.10
110 dholmes 1.16 // these first few override just to update doc comments
111 dholmes 1.10
112     /**
113 dholmes 1.16 * Adds the specified element to this queue.
114 dl 1.34 * @param o the element to add
115 dholmes 1.16 * @return <tt>true</tt> (as per the general contract of
116     * <tt>Collection.add</tt>).
117     *
118 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
119 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
120     * with elements currently in the priority queue according
121     * to the priority queue's ordering.
122 dholmes 1.10 */
123 dholmes 1.16 public boolean add(E o) {
124     return super.add(o);
125 dholmes 1.10 }
126    
127 tim 1.13 /**
128 jsr166 1.39 * Returns the comparator used to order the elements in this queue,
129     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
130     * natural ordering} of its elements.
131 dholmes 1.16 *
132 jsr166 1.39 * @return the comparator used to order the elements in this queue,
133     * or <tt>null</tt> if this queue uses the natural
134     * ordering of its elements.
135 dholmes 1.16 */
136 dl 1.36 public Comparator<? super E> comparator() {
137 dl 1.7 return q.comparator();
138 dl 1.5 }
139    
140 dholmes 1.16 /**
141 dl 1.24 * Inserts the specified element into this priority queue.
142 dholmes 1.16 *
143 dl 1.22 * @param o the element to add
144 dholmes 1.16 * @return <tt>true</tt>
145     * @throws ClassCastException if the specified element cannot be compared
146     * with elements currently in the priority queue according
147     * to the priority queue's ordering.
148 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
149 dholmes 1.16 */
150 dholmes 1.14 public boolean offer(E o) {
151     if (o == null) throw new NullPointerException();
152 dl 1.31 final ReentrantLock lock = this.lock;
153 dl 1.5 lock.lock();
154     try {
155 dholmes 1.14 boolean ok = q.offer(o);
156 dl 1.5 assert ok;
157     notEmpty.signal();
158     return true;
159 tim 1.19 } finally {
160 tim 1.13 lock.unlock();
161 dl 1.5 }
162     }
163    
164 dholmes 1.16 /**
165     * Adds the specified element to this priority queue. As the queue is
166     * unbounded this method will never block.
167 dl 1.22 * @param o the element to add
168 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
169     * with elements currently in the priority queue according
170     * to the priority queue's ordering.
171 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
172 dholmes 1.16 */
173     public void put(E o) {
174 dholmes 1.14 offer(o); // never need to block
175 dl 1.5 }
176    
177 dholmes 1.16 /**
178 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
179 dholmes 1.16 * unbounded this method will never block.
180 dl 1.22 * @param o the element to add
181 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
182     * @param unit This parameter is ignored as the method never blocks
183 dl 1.22 * @return <tt>true</tt>
184 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
185     * with elements currently in the priority queue according
186     * to the priority queue's ordering.
187 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
188 dholmes 1.16 */
189     public boolean offer(E o, long timeout, TimeUnit unit) {
190 dholmes 1.14 return offer(o); // never need to block
191 dl 1.5 }
192    
193     public E take() throws InterruptedException {
194 dl 1.31 final ReentrantLock lock = this.lock;
195 dl 1.5 lock.lockInterruptibly();
196     try {
197     try {
198     while (q.size() == 0)
199     notEmpty.await();
200 tim 1.19 } catch (InterruptedException ie) {
201 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
202     throw ie;
203     }
204     E x = q.poll();
205     assert x != null;
206     return x;
207 tim 1.19 } finally {
208 dl 1.5 lock.unlock();
209     }
210     }
211    
212    
213     public E poll() {
214 dl 1.31 final ReentrantLock lock = this.lock;
215 dl 1.5 lock.lock();
216     try {
217     return q.poll();
218 tim 1.19 } finally {
219 tim 1.13 lock.unlock();
220 dl 1.5 }
221     }
222    
223     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
224 dholmes 1.10 long nanos = unit.toNanos(timeout);
225 dl 1.31 final ReentrantLock lock = this.lock;
226 dl 1.5 lock.lockInterruptibly();
227     try {
228     for (;;) {
229     E x = q.poll();
230 tim 1.13 if (x != null)
231 dl 1.5 return x;
232     if (nanos <= 0)
233     return null;
234     try {
235     nanos = notEmpty.awaitNanos(nanos);
236 tim 1.19 } catch (InterruptedException ie) {
237 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
238     throw ie;
239     }
240     }
241 tim 1.19 } finally {
242 dl 1.5 lock.unlock();
243     }
244     }
245    
246     public E peek() {
247 dl 1.31 final ReentrantLock lock = this.lock;
248 dl 1.5 lock.lock();
249     try {
250     return q.peek();
251 tim 1.19 } finally {
252 tim 1.13 lock.unlock();
253 dl 1.5 }
254     }
255    
256     public int size() {
257 dl 1.31 final ReentrantLock lock = this.lock;
258 dl 1.5 lock.lock();
259     try {
260     return q.size();
261 tim 1.19 } finally {
262 dl 1.5 lock.unlock();
263     }
264     }
265    
266     /**
267     * Always returns <tt>Integer.MAX_VALUE</tt> because
268 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
269 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
270     */
271     public int remainingCapacity() {
272     return Integer.MAX_VALUE;
273     }
274    
275 dl 1.37 /**
276     * Removes a single instance of the specified element from this
277 dl 1.38 * queue, if it is present.
278 dl 1.37 */
279 dholmes 1.14 public boolean remove(Object o) {
280 dl 1.31 final ReentrantLock lock = this.lock;
281 dl 1.5 lock.lock();
282     try {
283 dholmes 1.14 return q.remove(o);
284 tim 1.19 } finally {
285 dl 1.5 lock.unlock();
286     }
287     }
288    
289 dholmes 1.14 public boolean contains(Object o) {
290 dl 1.31 final ReentrantLock lock = this.lock;
291 dl 1.5 lock.lock();
292     try {
293 dholmes 1.14 return q.contains(o);
294 tim 1.19 } finally {
295 dl 1.5 lock.unlock();
296     }
297     }
298    
299     public Object[] toArray() {
300 dl 1.31 final ReentrantLock lock = this.lock;
301 dl 1.5 lock.lock();
302     try {
303     return q.toArray();
304 tim 1.19 } finally {
305 dl 1.5 lock.unlock();
306     }
307     }
308    
309    
310     public String toString() {
311 dl 1.31 final ReentrantLock lock = this.lock;
312 dl 1.5 lock.lock();
313     try {
314     return q.toString();
315 tim 1.19 } finally {
316 dl 1.5 lock.unlock();
317     }
318     }
319    
320 dl 1.26 public int drainTo(Collection<? super E> c) {
321     if (c == null)
322     throw new NullPointerException();
323     if (c == this)
324     throw new IllegalArgumentException();
325 dl 1.31 final ReentrantLock lock = this.lock;
326 dl 1.26 lock.lock();
327     try {
328     int n = 0;
329     E e;
330     while ( (e = q.poll()) != null) {
331     c.add(e);
332     ++n;
333     }
334     return n;
335     } finally {
336     lock.unlock();
337     }
338     }
339    
340     public int drainTo(Collection<? super E> c, int maxElements) {
341     if (c == null)
342     throw new NullPointerException();
343     if (c == this)
344     throw new IllegalArgumentException();
345     if (maxElements <= 0)
346     return 0;
347 dl 1.31 final ReentrantLock lock = this.lock;
348 dl 1.26 lock.lock();
349     try {
350     int n = 0;
351     E e;
352     while (n < maxElements && (e = q.poll()) != null) {
353     c.add(e);
354     ++n;
355     }
356     return n;
357     } finally {
358     lock.unlock();
359     }
360     }
361    
362 dl 1.17 /**
363 dl 1.37 * Atomically removes all of the elements from this queue.
364 dl 1.17 * The queue will be empty after this call returns.
365     */
366     public void clear() {
367 dl 1.31 final ReentrantLock lock = this.lock;
368 dl 1.17 lock.lock();
369     try {
370     q.clear();
371 tim 1.19 } finally {
372 dl 1.17 lock.unlock();
373     }
374     }
375    
376 dl 1.5 public <T> T[] toArray(T[] a) {
377 dl 1.31 final ReentrantLock lock = this.lock;
378 dl 1.5 lock.lock();
379     try {
380     return q.toArray(a);
381 tim 1.19 } finally {
382 dl 1.5 lock.unlock();
383     }
384     }
385    
386 dholmes 1.16 /**
387 dl 1.23 * Returns an iterator over the elements in this queue. The
388     * iterator does not return the elements in any particular order.
389     * The returned iterator is a thread-safe "fast-fail" iterator
390 jsr166 1.39 * that will throw {@link ConcurrentModificationException} upon
391     * detected interference.
392 dholmes 1.16 *
393     * @return an iterator over the elements in this queue.
394     */
395 dl 1.5 public Iterator<E> iterator() {
396 dl 1.31 final ReentrantLock lock = this.lock;
397 dl 1.5 lock.lock();
398     try {
399     return new Itr(q.iterator());
400 tim 1.19 } finally {
401 dl 1.5 lock.unlock();
402     }
403     }
404    
405     private class Itr<E> implements Iterator<E> {
406     private final Iterator<E> iter;
407 tim 1.13 Itr(Iterator<E> i) {
408     iter = i;
409 dl 1.5 }
410    
411 tim 1.13 public boolean hasNext() {
412 dl 1.5 /*
413     * No sync -- we rely on underlying hasNext to be
414     * stateless, in which case we can return true by mistake
415 dl 1.30 * only when next() will subsequently throw
416 dl 1.5 * ConcurrentModificationException.
417     */
418     return iter.hasNext();
419 tim 1.13 }
420    
421     public E next() {
422 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
423 dl 1.5 lock.lock();
424     try {
425     return iter.next();
426 tim 1.19 } finally {
427 dl 1.5 lock.unlock();
428     }
429 tim 1.13 }
430    
431     public void remove() {
432 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
433 dl 1.5 lock.lock();
434     try {
435     iter.remove();
436 tim 1.19 } finally {
437 dl 1.5 lock.unlock();
438     }
439 tim 1.13 }
440 dl 1.5 }
441    
442     /**
443     * Save the state to a stream (that is, serialize it). This
444     * merely wraps default serialization within lock. The
445     * serialization strategy for items is left to underlying
446     * Queue. Note that locking is not needed on deserialization, so
447     * readObject is not defined, just relying on default.
448     */
449     private void writeObject(java.io.ObjectOutputStream s)
450     throws java.io.IOException {
451     lock.lock();
452     try {
453     s.defaultWriteObject();
454 tim 1.19 } finally {
455 dl 1.5 lock.unlock();
456     }
457 tim 1.1 }
458    
459     }