ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.26
Committed: Sun Oct 5 23:00:18 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.25: +40 -0 lines
Log Message:
added drainTo; clarified various exception specs

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18     * <tt>null</tt> elements. A priority queue relying on natural
19 dl 1.24 * ordering also does not permit insertion of non-comparable objects
20     * (doing so results in <tt>ClassCastException</tt>).
21 dl 1.20 *
22 dl 1.23 * <p>This class implements all of the <em>optional</em> methods
23     * of the {@link Collection} and {@link Iterator} interfaces.
24 dl 1.20 * <p>The Iterator provided in method {@link #iterator()} is
25     * <em>not</em> guaranteed to traverse the elements of the
26     * PriorityBlockingQueue in any particular order. If you need ordered
27     * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28     *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 dholmes 1.14 */
32 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
33 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
34 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
35 tim 1.1
36 dl 1.5 private final PriorityQueue<E> q;
37 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
38 dl 1.5 private final Condition notEmpty = lock.newCondition();
39    
40 dl 1.2 /**
41 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
42 dholmes 1.14 * capacity
43 dholmes 1.10 * (11) that orders its elements according to their natural
44 tim 1.18 * ordering (using <tt>Comparable</tt>).
45 dl 1.2 */
46     public PriorityBlockingQueue() {
47 dl 1.5 q = new PriorityQueue<E>();
48 dl 1.2 }
49    
50     /**
51 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
52 dholmes 1.10 * capacity
53     * that orders its elements according to their natural ordering
54 tim 1.18 * (using <tt>Comparable</tt>).
55 dl 1.2 *
56     * @param initialCapacity the initial capacity for this priority queue.
57 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
58     * than 1
59 dl 1.2 */
60     public PriorityBlockingQueue(int initialCapacity) {
61 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
62 dl 1.2 }
63    
64     /**
65 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
66 dholmes 1.10 * capacity
67 dl 1.2 * that orders its elements according to the specified comparator.
68     *
69     * @param initialCapacity the initial capacity for this priority queue.
70     * @param comparator the comparator used to order this priority queue.
71 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
72     * ordering.
73 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
74     * than 1
75 dl 1.2 */
76 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
77 dholmes 1.14 Comparator<? super E> comparator) {
78 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
79 dl 1.2 }
80    
81     /**
82 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
83 dl 1.15 * in the specified collection. The priority queue has an initial
84     * capacity of 110% of the size of the specified collection. If
85     * the specified collection is a {@link SortedSet} or a {@link
86     * PriorityQueue}, this priority queue will be sorted according to
87     * the same comparator, or according to its elements' natural
88     * order if the collection is sorted according to its elements'
89     * natural order. Otherwise, this priority queue is ordered
90     * according to its elements' natural order.
91 dl 1.2 *
92 dholmes 1.14 * @param c the collection whose elements are to be placed
93 dl 1.2 * into this priority queue.
94     * @throws ClassCastException if elements of the specified collection
95     * cannot be compared to one another according to the priority
96     * queue's ordering.
97 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
98     * is <tt>null</tt>
99 dl 1.2 */
100 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
101     q = new PriorityQueue<E>(c);
102 dl 1.7 }
103    
104 dholmes 1.10
105 dholmes 1.16 // these first few override just to update doc comments
106 dholmes 1.10
107     /**
108 dholmes 1.16 * Adds the specified element to this queue.
109     * @return <tt>true</tt> (as per the general contract of
110     * <tt>Collection.add</tt>).
111     *
112 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
113 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
114     * with elements currently in the priority queue according
115     * to the priority queue's ordering.
116 dholmes 1.10 */
117 dholmes 1.16 public boolean add(E o) {
118     return super.add(o);
119 dholmes 1.10 }
120    
121 tim 1.13 /**
122 dholmes 1.16 * Returns the comparator used to order this collection, or <tt>null</tt>
123     * if this collection is sorted according to its elements natural ordering
124 tim 1.18 * (using <tt>Comparable</tt>).
125 dholmes 1.16 *
126     * @return the comparator used to order this collection, or <tt>null</tt>
127     * if this collection is sorted according to its elements natural ordering.
128     */
129 dl 1.7 public Comparator comparator() {
130     return q.comparator();
131 dl 1.5 }
132    
133 dholmes 1.16 /**
134 dl 1.24 * Inserts the specified element into this priority queue.
135 dholmes 1.16 *
136 dl 1.22 * @param o the element to add
137 dholmes 1.16 * @return <tt>true</tt>
138     * @throws ClassCastException if the specified element cannot be compared
139     * with elements currently in the priority queue according
140     * to the priority queue's ordering.
141 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
142 dholmes 1.16 */
143 dholmes 1.14 public boolean offer(E o) {
144     if (o == null) throw new NullPointerException();
145 dl 1.5 lock.lock();
146     try {
147 dholmes 1.14 boolean ok = q.offer(o);
148 dl 1.5 assert ok;
149     notEmpty.signal();
150     return true;
151 tim 1.19 } finally {
152 tim 1.13 lock.unlock();
153 dl 1.5 }
154     }
155    
156 dholmes 1.16 /**
157     * Adds the specified element to this priority queue. As the queue is
158     * unbounded this method will never block.
159 dl 1.22 * @param o the element to add
160 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
161     * with elements currently in the priority queue according
162     * to the priority queue's ordering.
163 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
164 dholmes 1.16 */
165     public void put(E o) {
166 dholmes 1.14 offer(o); // never need to block
167 dl 1.5 }
168    
169 dholmes 1.16 /**
170 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
171 dholmes 1.16 * unbounded this method will never block.
172 dl 1.22 * @param o the element to add
173 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
174     * @param unit This parameter is ignored as the method never blocks
175 dl 1.22 * @return <tt>true</tt>
176 dholmes 1.16 * @throws ClassCastException if the element cannot be compared
177     * with elements currently in the priority queue according
178     * to the priority queue's ordering.
179 dl 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
180 dholmes 1.16 */
181     public boolean offer(E o, long timeout, TimeUnit unit) {
182 dholmes 1.14 return offer(o); // never need to block
183 dl 1.5 }
184    
185     public E take() throws InterruptedException {
186     lock.lockInterruptibly();
187     try {
188     try {
189     while (q.size() == 0)
190     notEmpty.await();
191 tim 1.19 } catch (InterruptedException ie) {
192 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
193     throw ie;
194     }
195     E x = q.poll();
196     assert x != null;
197     return x;
198 tim 1.19 } finally {
199 dl 1.5 lock.unlock();
200     }
201     }
202    
203    
204     public E poll() {
205     lock.lock();
206     try {
207     return q.poll();
208 tim 1.19 } finally {
209 tim 1.13 lock.unlock();
210 dl 1.5 }
211     }
212    
213     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
214 dholmes 1.10 long nanos = unit.toNanos(timeout);
215 dl 1.5 lock.lockInterruptibly();
216     try {
217     for (;;) {
218     E x = q.poll();
219 tim 1.13 if (x != null)
220 dl 1.5 return x;
221     if (nanos <= 0)
222     return null;
223     try {
224     nanos = notEmpty.awaitNanos(nanos);
225 tim 1.19 } catch (InterruptedException ie) {
226 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
227     throw ie;
228     }
229     }
230 tim 1.19 } finally {
231 dl 1.5 lock.unlock();
232     }
233     }
234    
235     public E peek() {
236     lock.lock();
237     try {
238     return q.peek();
239 tim 1.19 } finally {
240 tim 1.13 lock.unlock();
241 dl 1.5 }
242     }
243    
244     public int size() {
245     lock.lock();
246     try {
247     return q.size();
248 tim 1.19 } finally {
249 dl 1.5 lock.unlock();
250     }
251     }
252    
253     /**
254     * Always returns <tt>Integer.MAX_VALUE</tt> because
255 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
256 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
257     */
258     public int remainingCapacity() {
259     return Integer.MAX_VALUE;
260     }
261    
262 dholmes 1.14 public boolean remove(Object o) {
263 dl 1.5 lock.lock();
264     try {
265 dholmes 1.14 return q.remove(o);
266 tim 1.19 } finally {
267 dl 1.5 lock.unlock();
268     }
269     }
270    
271 dholmes 1.14 public boolean contains(Object o) {
272 dl 1.5 lock.lock();
273     try {
274 dholmes 1.14 return q.contains(o);
275 tim 1.19 } finally {
276 dl 1.5 lock.unlock();
277     }
278     }
279    
280     public Object[] toArray() {
281     lock.lock();
282     try {
283     return q.toArray();
284 tim 1.19 } finally {
285 dl 1.5 lock.unlock();
286     }
287     }
288    
289    
290     public String toString() {
291     lock.lock();
292     try {
293     return q.toString();
294 tim 1.19 } finally {
295 dl 1.5 lock.unlock();
296     }
297     }
298    
299 dl 1.26 public int drainTo(Collection<? super E> c) {
300     if (c == null)
301     throw new NullPointerException();
302     if (c == this)
303     throw new IllegalArgumentException();
304     lock.lock();
305     try {
306     int n = 0;
307     E e;
308     while ( (e = q.poll()) != null) {
309     c.add(e);
310     ++n;
311     }
312     return n;
313     } finally {
314     lock.unlock();
315     }
316     }
317    
318     public int drainTo(Collection<? super E> c, int maxElements) {
319     if (c == null)
320     throw new NullPointerException();
321     if (c == this)
322     throw new IllegalArgumentException();
323     if (maxElements <= 0)
324     return 0;
325     lock.lock();
326     try {
327     int n = 0;
328     E e;
329     while (n < maxElements && (e = q.poll()) != null) {
330     c.add(e);
331     ++n;
332     }
333     return n;
334     } finally {
335     lock.unlock();
336     }
337     }
338    
339 dl 1.17 /**
340     * Atomically removes all of the elements from this delay queue.
341     * The queue will be empty after this call returns.
342     */
343     public void clear() {
344     lock.lock();
345     try {
346     q.clear();
347 tim 1.19 } finally {
348 dl 1.17 lock.unlock();
349     }
350     }
351    
352 dl 1.5 public <T> T[] toArray(T[] a) {
353     lock.lock();
354     try {
355     return q.toArray(a);
356 tim 1.19 } finally {
357 dl 1.5 lock.unlock();
358     }
359     }
360    
361 dholmes 1.16 /**
362 dl 1.23 * Returns an iterator over the elements in this queue. The
363     * iterator does not return the elements in any particular order.
364     * The returned iterator is a thread-safe "fast-fail" iterator
365     * that will throw {@link
366     * java.util.ConcurrentModificationException} upon detected
367     * interference.
368 dholmes 1.16 *
369     * @return an iterator over the elements in this queue.
370     */
371 dl 1.5 public Iterator<E> iterator() {
372     lock.lock();
373     try {
374     return new Itr(q.iterator());
375 tim 1.19 } finally {
376 dl 1.5 lock.unlock();
377     }
378     }
379    
380     private class Itr<E> implements Iterator<E> {
381     private final Iterator<E> iter;
382 tim 1.13 Itr(Iterator<E> i) {
383     iter = i;
384 dl 1.5 }
385    
386 tim 1.13 public boolean hasNext() {
387 dl 1.5 /*
388     * No sync -- we rely on underlying hasNext to be
389     * stateless, in which case we can return true by mistake
390     * only when next() willl subsequently throw
391     * ConcurrentModificationException.
392     */
393     return iter.hasNext();
394 tim 1.13 }
395    
396     public E next() {
397 dl 1.5 lock.lock();
398     try {
399     return iter.next();
400 tim 1.19 } finally {
401 dl 1.5 lock.unlock();
402     }
403 tim 1.13 }
404    
405     public void remove() {
406 dl 1.5 lock.lock();
407     try {
408     iter.remove();
409 tim 1.19 } finally {
410 dl 1.5 lock.unlock();
411     }
412 tim 1.13 }
413 dl 1.5 }
414    
415     /**
416     * Save the state to a stream (that is, serialize it). This
417     * merely wraps default serialization within lock. The
418     * serialization strategy for items is left to underlying
419     * Queue. Note that locking is not needed on deserialization, so
420     * readObject is not defined, just relying on default.
421     */
422     private void writeObject(java.io.ObjectOutputStream s)
423     throws java.io.IOException {
424     lock.lock();
425     try {
426     s.defaultWriteObject();
427 tim 1.19 } finally {
428 dl 1.5 lock.unlock();
429     }
430 tim 1.1 }
431    
432     }