ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.31
Committed: Tue Dec 23 19:38:09 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.30: +17 -0 lines
Log Message:
cache finals across volatiles; avoid readResolve; doc improvments; timed invokeAll interleaves

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18     * <tt>null</tt> elements. A priority queue relying on natural
19 dl 1.24 * ordering also does not permit insertion of non-comparable objects
20     * (doing so results in <tt>ClassCastException</tt>).
21 dl 1.20 *
22 dl 1.23 * <p>This class implements all of the <em>optional</em> methods
23     * of the {@link Collection} and {@link Iterator} interfaces.
24 dl 1.20 * <p>The Iterator provided in method {@link #iterator()} is
25     * <em>not</em> guaranteed to traverse the elements of the
26     * PriorityBlockingQueue in any particular order. If you need ordered
27     * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28     *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 dl 1.29 * @param <E> the type of elements held in this collection
32 dl 1.28 */
33 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
34 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
35 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
36 tim 1.1
37 dl 1.5 private final PriorityQueue<E> q;
38 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
39 dl 1.27 private final ReentrantLock.ConditionObject notEmpty = lock.newCondition();
40 dl 1.5
41 dl 1.2 /**
42 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
43 dholmes 1.14 * capacity
44 dholmes 1.10 * (11) that orders its elements according to their natural
45 tim 1.18 * ordering (using <tt>Comparable</tt>).
46 dl 1.2 */
47     public PriorityBlockingQueue() {
48 dl 1.5 q = new PriorityQueue<E>();
49 dl 1.2 }
50    
51     /**
52 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
53 dholmes 1.10 * capacity
54     * that orders its elements according to their natural ordering
55 tim 1.18 * (using <tt>Comparable</tt>).
56 dl 1.2 *
57     * @param initialCapacity the initial capacity for this priority queue.
58 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
59     * than 1
60 dl 1.2 */
61     public PriorityBlockingQueue(int initialCapacity) {
62 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
63 dl 1.2 }
64    
65     /**
66 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
67 dholmes 1.10 * capacity
68 dl 1.2 * that orders its elements according to the specified comparator.
69     *
70     * @param initialCapacity the initial capacity for this priority queue.
71     * @param comparator the comparator used to order this priority queue.
72 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
73     * ordering.
74 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
75     * than 1
76 dl 1.2 */
77 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
78 dholmes 1.14 Comparator<? super E> comparator) {
79 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
80 dl 1.2 }
81    
82     /**
83 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
84 dl 1.15 * in the specified collection. The priority queue has an initial
85     * capacity of 110% of the size of the specified collection. If
86     * the specified collection is a {@link SortedSet} or a {@link
87     * PriorityQueue}, this priority queue will be sorted according to
88     * the same comparator, or according to its elements' natural
89     * order if the collection is sorted according to its elements'
90     * natural order. Otherwise, this priority queue is ordered
91     * according to its elements' natural order.
92 dl 1.2 *
93 dholmes 1.14 * @param c the collection whose elements are to be placed
94 dl 1.2 * into this priority queue.
95     * @throws ClassCastException if elements of the specified collection
96     * cannot be compared to one another according to the priority
97     * queue's ordering.
98 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
99     * is <tt>null</tt>
100 dl 1.2 */
101 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
102     q = new PriorityQueue<E>(c);
103 dl 1.7 }
104    
105 dholmes 1.10
106 dholmes 1.16 // these first few override just to update doc comments
107 dholmes 1.10
108     /**
109 dholmes 1.16 * Adds the specified element to this queue.
110     * @return <tt>true</tt> (as per the general contract of
111     * <tt>Collection.add</tt>).
112     *
113 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
114 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
115     * with elements currently in the priority queue according
116     * to the priority queue's ordering.
117 dholmes 1.10 */
118 dholmes 1.16 public boolean add(E o) {
119     return super.add(o);
120 dholmes 1.10 }
121    
122 tim 1.13 /**
123 dholmes 1.16 * Returns the comparator used to order this collection, or <tt>null</tt>
124     * if this collection is sorted according to its elements natural ordering
125 tim 1.18 * (using <tt>Comparable</tt>).
126 dholmes 1.16 *
127     * @return the comparator used to order this collection, or <tt>null</tt>
128     * if this collection is sorted according to its elements natural ordering.
129     */
130 dl 1.7 public Comparator comparator() {
131     return q.comparator();
132 dl 1.5 }
133    
134 dholmes 1.16 /**
135 dl 1.24 * Inserts the specified element into this priority queue.
136 dholmes 1.16 *
137 dl 1.22 * @param o the element to add
138 dholmes 1.16 * @return <tt>true</tt>
139     * @throws ClassCastException if the specified element cannot be compared
140     * with elements currently in the priority queue according
141     * to the priority queue's ordering.
142 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
143 dholmes 1.16 */
144 dholmes 1.14 public boolean offer(E o) {
145     if (o == null) throw new NullPointerException();
146 dl 1.31 final ReentrantLock lock = this.lock;
147 dl 1.5 lock.lock();
148     try {
149 dholmes 1.14 boolean ok = q.offer(o);
150 dl 1.5 assert ok;
151     notEmpty.signal();
152     return true;
153 tim 1.19 } finally {
154 tim 1.13 lock.unlock();
155 dl 1.5 }
156     }
157    
158 dholmes 1.16 /**
159     * Adds the specified element to this priority queue. As the queue is
160     * unbounded this method will never block.
161 dl 1.22 * @param o the element to add
162 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
163     * with elements currently in the priority queue according
164     * to the priority queue's ordering.
165 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
166 dholmes 1.16 */
167     public void put(E o) {
168 dholmes 1.14 offer(o); // never need to block
169 dl 1.5 }
170    
171 dholmes 1.16 /**
172 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
173 dholmes 1.16 * unbounded this method will never block.
174 dl 1.22 * @param o the element to add
175 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
176     * @param unit This parameter is ignored as the method never blocks
177 dl 1.22 * @return <tt>true</tt>
178 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
179     * with elements currently in the priority queue according
180     * to the priority queue's ordering.
181 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
182 dholmes 1.16 */
183     public boolean offer(E o, long timeout, TimeUnit unit) {
184 dholmes 1.14 return offer(o); // never need to block
185 dl 1.5 }
186    
187     public E take() throws InterruptedException {
188 dl 1.31 final ReentrantLock lock = this.lock;
189 dl 1.5 lock.lockInterruptibly();
190     try {
191     try {
192     while (q.size() == 0)
193     notEmpty.await();
194 tim 1.19 } catch (InterruptedException ie) {
195 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
196     throw ie;
197     }
198     E x = q.poll();
199     assert x != null;
200     return x;
201 tim 1.19 } finally {
202 dl 1.5 lock.unlock();
203     }
204     }
205    
206    
207     public E poll() {
208 dl 1.31 final ReentrantLock lock = this.lock;
209 dl 1.5 lock.lock();
210     try {
211     return q.poll();
212 tim 1.19 } finally {
213 tim 1.13 lock.unlock();
214 dl 1.5 }
215     }
216    
217     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
218 dholmes 1.10 long nanos = unit.toNanos(timeout);
219 dl 1.31 final ReentrantLock lock = this.lock;
220 dl 1.5 lock.lockInterruptibly();
221     try {
222     for (;;) {
223     E x = q.poll();
224 tim 1.13 if (x != null)
225 dl 1.5 return x;
226     if (nanos <= 0)
227     return null;
228     try {
229     nanos = notEmpty.awaitNanos(nanos);
230 tim 1.19 } catch (InterruptedException ie) {
231 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
232     throw ie;
233     }
234     }
235 tim 1.19 } finally {
236 dl 1.5 lock.unlock();
237     }
238     }
239    
240     public E peek() {
241 dl 1.31 final ReentrantLock lock = this.lock;
242 dl 1.5 lock.lock();
243     try {
244     return q.peek();
245 tim 1.19 } finally {
246 tim 1.13 lock.unlock();
247 dl 1.5 }
248     }
249    
250     public int size() {
251 dl 1.31 final ReentrantLock lock = this.lock;
252 dl 1.5 lock.lock();
253     try {
254     return q.size();
255 tim 1.19 } finally {
256 dl 1.5 lock.unlock();
257     }
258     }
259    
260     /**
261     * Always returns <tt>Integer.MAX_VALUE</tt> because
262 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
263 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
264     */
265     public int remainingCapacity() {
266     return Integer.MAX_VALUE;
267     }
268    
269 dholmes 1.14 public boolean remove(Object o) {
270 dl 1.31 final ReentrantLock lock = this.lock;
271 dl 1.5 lock.lock();
272     try {
273 dholmes 1.14 return q.remove(o);
274 tim 1.19 } finally {
275 dl 1.5 lock.unlock();
276     }
277     }
278    
279 dholmes 1.14 public boolean contains(Object o) {
280 dl 1.31 final ReentrantLock lock = this.lock;
281 dl 1.5 lock.lock();
282     try {
283 dholmes 1.14 return q.contains(o);
284 tim 1.19 } finally {
285 dl 1.5 lock.unlock();
286     }
287     }
288    
289     public Object[] toArray() {
290 dl 1.31 final ReentrantLock lock = this.lock;
291 dl 1.5 lock.lock();
292     try {
293     return q.toArray();
294 tim 1.19 } finally {
295 dl 1.5 lock.unlock();
296     }
297     }
298    
299    
300     public String toString() {
301 dl 1.31 final ReentrantLock lock = this.lock;
302 dl 1.5 lock.lock();
303     try {
304     return q.toString();
305 tim 1.19 } finally {
306 dl 1.5 lock.unlock();
307     }
308     }
309    
310 dl 1.26 public int drainTo(Collection<? super E> c) {
311     if (c == null)
312     throw new NullPointerException();
313     if (c == this)
314     throw new IllegalArgumentException();
315 dl 1.31 final ReentrantLock lock = this.lock;
316 dl 1.26 lock.lock();
317     try {
318     int n = 0;
319     E e;
320     while ( (e = q.poll()) != null) {
321     c.add(e);
322     ++n;
323     }
324     return n;
325     } finally {
326     lock.unlock();
327     }
328     }
329    
330     public int drainTo(Collection<? super E> c, int maxElements) {
331     if (c == null)
332     throw new NullPointerException();
333     if (c == this)
334     throw new IllegalArgumentException();
335     if (maxElements <= 0)
336     return 0;
337 dl 1.31 final ReentrantLock lock = this.lock;
338 dl 1.26 lock.lock();
339     try {
340     int n = 0;
341     E e;
342     while (n < maxElements && (e = q.poll()) != null) {
343     c.add(e);
344     ++n;
345     }
346     return n;
347     } finally {
348     lock.unlock();
349     }
350     }
351    
352 dl 1.17 /**
353     * Atomically removes all of the elements from this delay queue.
354     * The queue will be empty after this call returns.
355     */
356     public void clear() {
357 dl 1.31 final ReentrantLock lock = this.lock;
358 dl 1.17 lock.lock();
359     try {
360     q.clear();
361 tim 1.19 } finally {
362 dl 1.17 lock.unlock();
363     }
364     }
365    
366 dl 1.5 public <T> T[] toArray(T[] a) {
367 dl 1.31 final ReentrantLock lock = this.lock;
368 dl 1.5 lock.lock();
369     try {
370     return q.toArray(a);
371 tim 1.19 } finally {
372 dl 1.5 lock.unlock();
373     }
374     }
375    
376 dholmes 1.16 /**
377 dl 1.23 * Returns an iterator over the elements in this queue. The
378     * iterator does not return the elements in any particular order.
379     * The returned iterator is a thread-safe "fast-fail" iterator
380     * that will throw {@link
381     * java.util.ConcurrentModificationException} upon detected
382     * interference.
383 dholmes 1.16 *
384     * @return an iterator over the elements in this queue.
385     */
386 dl 1.5 public Iterator<E> iterator() {
387 dl 1.31 final ReentrantLock lock = this.lock;
388 dl 1.5 lock.lock();
389     try {
390     return new Itr(q.iterator());
391 tim 1.19 } finally {
392 dl 1.5 lock.unlock();
393     }
394     }
395    
396     private class Itr<E> implements Iterator<E> {
397     private final Iterator<E> iter;
398 tim 1.13 Itr(Iterator<E> i) {
399     iter = i;
400 dl 1.5 }
401    
402 tim 1.13 public boolean hasNext() {
403 dl 1.5 /*
404     * No sync -- we rely on underlying hasNext to be
405     * stateless, in which case we can return true by mistake
406 dl 1.30 * only when next() will subsequently throw
407 dl 1.5 * ConcurrentModificationException.
408     */
409     return iter.hasNext();
410 tim 1.13 }
411    
412     public E next() {
413 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
414 dl 1.5 lock.lock();
415     try {
416     return iter.next();
417 tim 1.19 } finally {
418 dl 1.5 lock.unlock();
419     }
420 tim 1.13 }
421    
422     public void remove() {
423 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
424 dl 1.5 lock.lock();
425     try {
426     iter.remove();
427 tim 1.19 } finally {
428 dl 1.5 lock.unlock();
429     }
430 tim 1.13 }
431 dl 1.5 }
432    
433     /**
434     * Save the state to a stream (that is, serialize it). This
435     * merely wraps default serialization within lock. The
436     * serialization strategy for items is left to underlying
437     * Queue. Note that locking is not needed on deserialization, so
438     * readObject is not defined, just relying on default.
439     */
440     private void writeObject(java.io.ObjectOutputStream s)
441     throws java.io.IOException {
442     lock.lock();
443     try {
444     s.defaultWriteObject();
445 tim 1.19 } finally {
446 dl 1.5 lock.unlock();
447     }
448 tim 1.1 }
449    
450     }