ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.43
Committed: Wed May 18 01:41:16 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.42: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 jsr166 1.42 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19     * Comparable natural ordering} also does not permit insertion of
20     * non-comparable objects (doing so results in
21     * <tt>ClassCastException</tt>).
22 dl 1.20 *
23 dl 1.38 * <p>This class and its iterator implement all of the
24     * <em>optional</em> methods of the {@link Collection} and {@link
25 dl 1.41 * Iterator} interfaces. The Iterator provided in method {@link
26     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27     * the PriorityBlockingQueue in any particular order. If you need
28     * ordered traversal, consider using
29     * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30     * can be used to <em>remove</em> some or all elements in priority
31     * order and place them in another collection.
32     *
33     * <p>Operations on this class make no guarantees about the ordering
34     * of elements with equal priority. If you need to enforce an
35     * ordering, you can define custom classes or comparators that use a
36     * secondary key to break ties in primary priority values. For
37     * example, here is a class that applies first-in-first-out
38     * tie-breaking to comparable elements. To use it, you would insert a
39     * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40     *
41     * <pre>
42     * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
43     * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
44     * static AtomicLong seq = new AtomicLong();
45     * final long seqNum;
46     * final E entry;
47     * public FIFOEntry(E entry) {
48     * seqNum = seq.getAndIncrement();
49     * this.entry = entry;
50     * }
51     * public E getEntry() { return entry; }
52     * public int compareTo(FIFOEntry&lt;E&gt; other) {
53     * int res = entry.compareTo(other.entry);
54 jsr166 1.42 * if (res == 0 &amp;&amp; other.entry != this.entry)
55 dl 1.41 * res = (seqNum &lt; other.seqNum ? -1 : 1);
56     * return res;
57     * }
58 jsr166 1.42 * }</pre>
59 dl 1.20 *
60 dl 1.35 * <p>This class is a member of the
61     * <a href="{@docRoot}/../guide/collections/index.html">
62     * Java Collections Framework</a>.
63     *
64 dl 1.6 * @since 1.5
65     * @author Doug Lea
66 dl 1.29 * @param <E> the type of elements held in this collection
67 dl 1.28 */
68 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
70 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
71 tim 1.1
72 dl 1.5 private final PriorityQueue<E> q;
73 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
74 dl 1.32 private final Condition notEmpty = lock.newCondition();
75 dl 1.5
76 dl 1.2 /**
77 jsr166 1.42 * Creates a <tt>PriorityBlockingQueue</tt> with the default
78     * initial capacity (11) that orders its elements according to
79     * their {@linkplain Comparable natural ordering}.
80 dl 1.2 */
81     public PriorityBlockingQueue() {
82 dl 1.5 q = new PriorityQueue<E>();
83 dl 1.2 }
84    
85     /**
86 jsr166 1.42 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87     * initial capacity that orders its elements according to their
88     * {@linkplain Comparable natural ordering}.
89 dl 1.2 *
90 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
91 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92     * than 1
93 dl 1.2 */
94     public PriorityBlockingQueue(int initialCapacity) {
95 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
96 dl 1.2 }
97    
98     /**
99 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100 jsr166 1.39 * capacity that orders its elements according to the specified
101     * comparator.
102 dl 1.2 *
103 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
104 dl 1.2 * @param comparator the comparator used to order this priority queue.
105 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
106     * ordering.
107 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108     * than 1
109 dl 1.2 */
110 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
111 dholmes 1.14 Comparator<? super E> comparator) {
112 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
113 dl 1.2 }
114    
115     /**
116 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117 dl 1.15 * in the specified collection. The priority queue has an initial
118 jsr166 1.39 * capacity of 110% of the size of the specified collection. If
119 dl 1.15 * the specified collection is a {@link SortedSet} or a {@link
120     * PriorityQueue}, this priority queue will be sorted according to
121 jsr166 1.39 * the same comparator, or according to the natural ordering of its
122     * elements if the collection is sorted according to the natural
123     * ordering of its elements. Otherwise, this priority queue is
124     * ordered according to the natural ordering of its elements.
125 dl 1.2 *
126 dholmes 1.14 * @param c the collection whose elements are to be placed
127 dl 1.2 * into this priority queue.
128     * @throws ClassCastException if elements of the specified collection
129     * cannot be compared to one another according to the priority
130     * queue's ordering.
131 jsr166 1.42 * @throws NullPointerException if the specified collection or any
132     * of its elements are null
133 dl 1.2 */
134 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
135     q = new PriorityQueue<E>(c);
136 dl 1.7 }
137    
138 dholmes 1.10 /**
139 jsr166 1.42 * Inserts the specified element into this priority queue.
140     *
141 jsr166 1.40 * @param e the element to add
142 jsr166 1.42 * @return <tt>true</tt> (as per the spec for {@link Collection#add})
143 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
144 jsr166 1.42 * with elements currently in the priority queue according to the
145     * priority queue's ordering
146     * @throws NullPointerException if the specified element is null
147 dholmes 1.10 */
148 jsr166 1.40 public boolean add(E e) {
149 jsr166 1.42 return offer(e);
150 dl 1.5 }
151    
152 dholmes 1.16 /**
153 dl 1.24 * Inserts the specified element into this priority queue.
154 dholmes 1.16 *
155 jsr166 1.40 * @param e the element to add
156 dholmes 1.16 * @return <tt>true</tt>
157     * @throws ClassCastException if the specified element cannot be compared
158 jsr166 1.42 * with elements currently in the priority queue according to the
159     * priority queue's ordering
160     * @throws NullPointerException if the specified element is null
161 dholmes 1.16 */
162 jsr166 1.40 public boolean offer(E e) {
163     if (e == null) throw new NullPointerException();
164 dl 1.31 final ReentrantLock lock = this.lock;
165 dl 1.5 lock.lock();
166     try {
167 jsr166 1.40 boolean ok = q.offer(e);
168 dl 1.5 assert ok;
169     notEmpty.signal();
170     return true;
171 tim 1.19 } finally {
172 tim 1.13 lock.unlock();
173 dl 1.5 }
174     }
175    
176 dholmes 1.16 /**
177 jsr166 1.42 * Inserts the specified element into this priority queue. As the queue is
178 dholmes 1.16 * unbounded this method will never block.
179 jsr166 1.42 *
180 jsr166 1.40 * @param e the element to add
181 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
182     * with elements currently in the priority queue according to the
183     * priority queue's ordering
184     * @throws NullPointerException if the specified element is null
185 dholmes 1.16 */
186 jsr166 1.40 public void put(E e) {
187     offer(e); // never need to block
188 dl 1.5 }
189    
190 dholmes 1.16 /**
191 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
192 dholmes 1.16 * unbounded this method will never block.
193 jsr166 1.42 *
194 jsr166 1.40 * @param e the element to add
195 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
196     * @param unit This parameter is ignored as the method never blocks
197 dl 1.22 * @return <tt>true</tt>
198 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
199     * with elements currently in the priority queue according to the
200     * priority queue's ordering
201     * @throws NullPointerException if the specified element is null
202 dholmes 1.16 */
203 jsr166 1.40 public boolean offer(E e, long timeout, TimeUnit unit) {
204     return offer(e); // never need to block
205 dl 1.5 }
206    
207 jsr166 1.42 public E poll() {
208     final ReentrantLock lock = this.lock;
209     lock.lock();
210     try {
211     return q.poll();
212     } finally {
213     lock.unlock();
214     }
215     }
216    
217 dl 1.5 public E take() throws InterruptedException {
218 dl 1.31 final ReentrantLock lock = this.lock;
219 dl 1.5 lock.lockInterruptibly();
220     try {
221     try {
222     while (q.size() == 0)
223     notEmpty.await();
224 tim 1.19 } catch (InterruptedException ie) {
225 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
226     throw ie;
227     }
228     E x = q.poll();
229     assert x != null;
230     return x;
231 tim 1.19 } finally {
232 dl 1.5 lock.unlock();
233     }
234     }
235    
236     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
237 dholmes 1.10 long nanos = unit.toNanos(timeout);
238 dl 1.31 final ReentrantLock lock = this.lock;
239 dl 1.5 lock.lockInterruptibly();
240     try {
241     for (;;) {
242     E x = q.poll();
243 tim 1.13 if (x != null)
244 dl 1.5 return x;
245     if (nanos <= 0)
246     return null;
247     try {
248     nanos = notEmpty.awaitNanos(nanos);
249 tim 1.19 } catch (InterruptedException ie) {
250 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
251     throw ie;
252     }
253     }
254 tim 1.19 } finally {
255 dl 1.5 lock.unlock();
256     }
257     }
258    
259     public E peek() {
260 dl 1.31 final ReentrantLock lock = this.lock;
261 dl 1.5 lock.lock();
262     try {
263     return q.peek();
264 tim 1.19 } finally {
265 tim 1.13 lock.unlock();
266 dl 1.5 }
267     }
268    
269 jsr166 1.42 /**
270     * Returns the comparator used to order the elements in this queue,
271     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
272     * natural ordering} of its elements.
273     *
274     * @return the comparator used to order the elements in this queue,
275     * or <tt>null</tt> if this queue uses the natural
276     * ordering of its elements.
277     */
278     public Comparator<? super E> comparator() {
279     return q.comparator();
280     }
281    
282 dl 1.5 public int size() {
283 dl 1.31 final ReentrantLock lock = this.lock;
284 dl 1.5 lock.lock();
285     try {
286     return q.size();
287 tim 1.19 } finally {
288 dl 1.5 lock.unlock();
289     }
290     }
291    
292     /**
293     * Always returns <tt>Integer.MAX_VALUE</tt> because
294 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
295 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
296     */
297     public int remainingCapacity() {
298     return Integer.MAX_VALUE;
299     }
300    
301 dl 1.37 /**
302 jsr166 1.42 * Removes a single instance of the specified element from this queue,
303     * if it is present. More formally, removes an element <tt>e</tt> such
304     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
305     * elements.
306     * Returns <tt>true</tt> if this queue contained the specified element
307     * (or equivalently, if this queue changed as a result of the call).
308     *
309     * @param o element to be removed from this queue, if present
310     * @return <tt>true</tt> if this queue changed as a result of the call
311 dl 1.37 */
312 dholmes 1.14 public boolean remove(Object o) {
313 dl 1.31 final ReentrantLock lock = this.lock;
314 dl 1.5 lock.lock();
315     try {
316 dholmes 1.14 return q.remove(o);
317 tim 1.19 } finally {
318 dl 1.5 lock.unlock();
319     }
320     }
321    
322 jsr166 1.42 /**
323     * Returns <tt>true</tt> if this queue contains the specified element.
324     * More formally, returns <tt>true</tt> if and only if this queue contains
325     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
326     *
327     * @param o object to be checked for containment in this queue
328     * @return <tt>true</tt> if this queue contains the specified element
329     */
330 dholmes 1.14 public boolean contains(Object o) {
331 dl 1.31 final ReentrantLock lock = this.lock;
332 dl 1.5 lock.lock();
333     try {
334 dholmes 1.14 return q.contains(o);
335 tim 1.19 } finally {
336 dl 1.5 lock.unlock();
337     }
338     }
339    
340 jsr166 1.42 /**
341     * Returns an array containing all of the elements in this queue.
342     * The returned array elements are in no particular order.
343     *
344     * <p>The returned array will be "safe" in that no references to it are
345     * maintained by this queue. (In other words, this method must allocate
346     * a new array). The caller is thus free to modify the returned array.
347 jsr166 1.43 *
348 jsr166 1.42 * <p>This method acts as bridge between array-based and collection-based
349     * APIs.
350     *
351     * @return an array containing all of the elements in this queue
352     */
353 dl 1.5 public Object[] toArray() {
354 dl 1.31 final ReentrantLock lock = this.lock;
355 dl 1.5 lock.lock();
356     try {
357     return q.toArray();
358 tim 1.19 } finally {
359 dl 1.5 lock.unlock();
360     }
361     }
362    
363    
364     public String toString() {
365 dl 1.31 final ReentrantLock lock = this.lock;
366 dl 1.5 lock.lock();
367     try {
368     return q.toString();
369 tim 1.19 } finally {
370 dl 1.5 lock.unlock();
371     }
372     }
373    
374 jsr166 1.42 /**
375     * @throws UnsupportedOperationException {@inheritDoc}
376     * @throws ClassCastException {@inheritDoc}
377     * @throws NullPointerException {@inheritDoc}
378     * @throws IllegalArgumentException {@inheritDoc}
379     */
380 dl 1.26 public int drainTo(Collection<? super E> c) {
381     if (c == null)
382     throw new NullPointerException();
383     if (c == this)
384     throw new IllegalArgumentException();
385 dl 1.31 final ReentrantLock lock = this.lock;
386 dl 1.26 lock.lock();
387     try {
388     int n = 0;
389     E e;
390     while ( (e = q.poll()) != null) {
391     c.add(e);
392     ++n;
393     }
394     return n;
395     } finally {
396     lock.unlock();
397     }
398     }
399    
400 jsr166 1.42 /**
401     * @throws UnsupportedOperationException {@inheritDoc}
402     * @throws ClassCastException {@inheritDoc}
403     * @throws NullPointerException {@inheritDoc}
404     * @throws IllegalArgumentException {@inheritDoc}
405     */
406 dl 1.26 public int drainTo(Collection<? super E> c, int maxElements) {
407     if (c == null)
408     throw new NullPointerException();
409     if (c == this)
410     throw new IllegalArgumentException();
411     if (maxElements <= 0)
412     return 0;
413 dl 1.31 final ReentrantLock lock = this.lock;
414 dl 1.26 lock.lock();
415     try {
416     int n = 0;
417     E e;
418     while (n < maxElements && (e = q.poll()) != null) {
419     c.add(e);
420     ++n;
421     }
422     return n;
423     } finally {
424     lock.unlock();
425     }
426     }
427    
428 dl 1.17 /**
429 dl 1.37 * Atomically removes all of the elements from this queue.
430 dl 1.17 * The queue will be empty after this call returns.
431     */
432     public void clear() {
433 dl 1.31 final ReentrantLock lock = this.lock;
434 dl 1.17 lock.lock();
435     try {
436     q.clear();
437 tim 1.19 } finally {
438 dl 1.17 lock.unlock();
439     }
440     }
441    
442 jsr166 1.42 /**
443     * Returns an array containing all of the elements in this queue; the
444     * runtime type of the returned array is that of the specified array.
445     * The returned array elements are in no particular order.
446     * If the queue fits in the specified array, it is returned therein.
447     * Otherwise, a new array is allocated with the runtime type of the
448     * specified array and the size of this queue.
449     *
450     * <p>If this queue fits in the specified array with room to spare
451     * (i.e., the array has more elements than this queue), the element in
452     * the array immediately following the end of the queue is set to
453     * <tt>null</tt>.
454     *
455     * <p>Like the {@link #toArray()} method, this method acts as bridge between
456     * array-based and collection-based APIs. Further, this method allows
457     * precise control over the runtime type of the output array, and may,
458     * under certain circumstances, be used to save allocation costs.
459     *
460     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
461     * The following code can be used to dump the queue into a newly
462     * allocated array of <tt>String</tt>:
463     *
464     * <pre>
465     * String[] y = x.toArray(new String[0]);</pre>
466     *
467     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
468     * <tt>toArray()</tt>.
469     *
470     * @param a the array into which the elements of the queue are to
471     * be stored, if it is big enough; otherwise, a new array of the
472     * same runtime type is allocated for this purpose
473     * @return an array containing all of the elements in this queue
474     * @throws ArrayStoreException if the runtime type of the specified array
475     * is not a supertype of the runtime type of every element in
476     * this queue
477     * @throws NullPointerException if the specified array is null
478     */
479 dl 1.5 public <T> T[] toArray(T[] a) {
480 dl 1.31 final ReentrantLock lock = this.lock;
481 dl 1.5 lock.lock();
482     try {
483     return q.toArray(a);
484 tim 1.19 } finally {
485 dl 1.5 lock.unlock();
486     }
487     }
488    
489 dholmes 1.16 /**
490 dl 1.23 * Returns an iterator over the elements in this queue. The
491     * iterator does not return the elements in any particular order.
492     * The returned iterator is a thread-safe "fast-fail" iterator
493 jsr166 1.39 * that will throw {@link ConcurrentModificationException} upon
494     * detected interference.
495 dholmes 1.16 *
496 jsr166 1.42 * @return an iterator over the elements in this queue
497 dholmes 1.16 */
498 dl 1.5 public Iterator<E> iterator() {
499 dl 1.31 final ReentrantLock lock = this.lock;
500 dl 1.5 lock.lock();
501     try {
502     return new Itr(q.iterator());
503 tim 1.19 } finally {
504 dl 1.5 lock.unlock();
505     }
506     }
507    
508     private class Itr<E> implements Iterator<E> {
509     private final Iterator<E> iter;
510 tim 1.13 Itr(Iterator<E> i) {
511     iter = i;
512 dl 1.5 }
513    
514 tim 1.13 public boolean hasNext() {
515 dl 1.5 /*
516     * No sync -- we rely on underlying hasNext to be
517     * stateless, in which case we can return true by mistake
518 dl 1.30 * only when next() will subsequently throw
519 dl 1.5 * ConcurrentModificationException.
520     */
521     return iter.hasNext();
522 tim 1.13 }
523    
524     public E next() {
525 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
526 dl 1.5 lock.lock();
527     try {
528     return iter.next();
529 tim 1.19 } finally {
530 dl 1.5 lock.unlock();
531     }
532 tim 1.13 }
533    
534     public void remove() {
535 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
536 dl 1.5 lock.lock();
537     try {
538     iter.remove();
539 tim 1.19 } finally {
540 dl 1.5 lock.unlock();
541     }
542 tim 1.13 }
543 dl 1.5 }
544    
545     /**
546     * Save the state to a stream (that is, serialize it). This
547     * merely wraps default serialization within lock. The
548     * serialization strategy for items is left to underlying
549     * Queue. Note that locking is not needed on deserialization, so
550     * readObject is not defined, just relying on default.
551     */
552     private void writeObject(java.io.ObjectOutputStream s)
553     throws java.io.IOException {
554     lock.lock();
555     try {
556     s.defaultWriteObject();
557 tim 1.19 } finally {
558 dl 1.5 lock.unlock();
559     }
560 tim 1.1 }
561    
562     }