ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.48
Committed: Mon Jul 18 19:14:17 2005 UTC (18 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.47: +2 -2 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 jsr166 1.42 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19     * Comparable natural ordering} also does not permit insertion of
20     * non-comparable objects (doing so results in
21     * <tt>ClassCastException</tt>).
22 dl 1.20 *
23 dl 1.38 * <p>This class and its iterator implement all of the
24     * <em>optional</em> methods of the {@link Collection} and {@link
25 dl 1.41 * Iterator} interfaces. The Iterator provided in method {@link
26     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27     * the PriorityBlockingQueue in any particular order. If you need
28     * ordered traversal, consider using
29     * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30     * can be used to <em>remove</em> some or all elements in priority
31     * order and place them in another collection.
32     *
33     * <p>Operations on this class make no guarantees about the ordering
34     * of elements with equal priority. If you need to enforce an
35     * ordering, you can define custom classes or comparators that use a
36     * secondary key to break ties in primary priority values. For
37     * example, here is a class that applies first-in-first-out
38     * tie-breaking to comparable elements. To use it, you would insert a
39     * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40     *
41     * <pre>
42     * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
43 jsr166 1.47 * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
44     * final static AtomicLong seq = new AtomicLong();
45 dl 1.41 * final long seqNum;
46     * final E entry;
47     * public FIFOEntry(E entry) {
48     * seqNum = seq.getAndIncrement();
49     * this.entry = entry;
50     * }
51     * public E getEntry() { return entry; }
52     * public int compareTo(FIFOEntry&lt;E&gt; other) {
53     * int res = entry.compareTo(other.entry);
54 jsr166 1.42 * if (res == 0 &amp;&amp; other.entry != this.entry)
55 dl 1.41 * res = (seqNum &lt; other.seqNum ? -1 : 1);
56     * return res;
57     * }
58 jsr166 1.42 * }</pre>
59 dl 1.20 *
60 dl 1.35 * <p>This class is a member of the
61     * <a href="{@docRoot}/../guide/collections/index.html">
62     * Java Collections Framework</a>.
63     *
64 dl 1.6 * @since 1.5
65     * @author Doug Lea
66 dl 1.29 * @param <E> the type of elements held in this collection
67 dl 1.28 */
68 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
70 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
71 tim 1.1
72 dl 1.5 private final PriorityQueue<E> q;
73 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
74 dl 1.32 private final Condition notEmpty = lock.newCondition();
75 dl 1.5
76 dl 1.2 /**
77 jsr166 1.42 * Creates a <tt>PriorityBlockingQueue</tt> with the default
78     * initial capacity (11) that orders its elements according to
79     * their {@linkplain Comparable natural ordering}.
80 dl 1.2 */
81     public PriorityBlockingQueue() {
82 dl 1.5 q = new PriorityQueue<E>();
83 dl 1.2 }
84    
85     /**
86 jsr166 1.42 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87     * initial capacity that orders its elements according to their
88     * {@linkplain Comparable natural ordering}.
89 dl 1.2 *
90 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
91 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92     * than 1
93 dl 1.2 */
94     public PriorityBlockingQueue(int initialCapacity) {
95 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
96 dl 1.2 }
97    
98     /**
99 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100 jsr166 1.39 * capacity that orders its elements according to the specified
101     * comparator.
102 dl 1.2 *
103 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
104 dl 1.2 * @param comparator the comparator used to order this priority queue.
105 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
106     * ordering.
107 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108     * than 1
109 dl 1.2 */
110 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
111 dholmes 1.14 Comparator<? super E> comparator) {
112 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
113 dl 1.2 }
114    
115     /**
116 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117 dl 1.15 * in the specified collection. The priority queue has an initial
118 jsr166 1.39 * capacity of 110% of the size of the specified collection. If
119 dl 1.15 * the specified collection is a {@link SortedSet} or a {@link
120     * PriorityQueue}, this priority queue will be sorted according to
121 jsr166 1.39 * the same comparator, or according to the natural ordering of its
122     * elements if the collection is sorted according to the natural
123     * ordering of its elements. Otherwise, this priority queue is
124     * ordered according to the natural ordering of its elements.
125 dl 1.2 *
126 dholmes 1.14 * @param c the collection whose elements are to be placed
127 dl 1.2 * into this priority queue.
128     * @throws ClassCastException if elements of the specified collection
129     * cannot be compared to one another according to the priority
130     * queue's ordering.
131 jsr166 1.42 * @throws NullPointerException if the specified collection or any
132     * of its elements are null
133 dl 1.2 */
134 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
135     q = new PriorityQueue<E>(c);
136 dl 1.7 }
137    
138 dholmes 1.10 /**
139 jsr166 1.42 * Inserts the specified element into this priority queue.
140     *
141 jsr166 1.40 * @param e the element to add
142 jsr166 1.48 * @return <tt>true</tt> (as specified by {@link Collection#add})
143 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
144 jsr166 1.42 * with elements currently in the priority queue according to the
145     * priority queue's ordering
146     * @throws NullPointerException if the specified element is null
147 dholmes 1.10 */
148 jsr166 1.40 public boolean add(E e) {
149 jsr166 1.42 return offer(e);
150 dl 1.5 }
151    
152 dholmes 1.16 /**
153 dl 1.24 * Inserts the specified element into this priority queue.
154 dholmes 1.16 *
155 jsr166 1.40 * @param e the element to add
156 jsr166 1.48 * @return <tt>true</tt> (as specified by {@link Queue#offer})
157 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
158 jsr166 1.42 * with elements currently in the priority queue according to the
159     * priority queue's ordering
160     * @throws NullPointerException if the specified element is null
161 dholmes 1.16 */
162 jsr166 1.40 public boolean offer(E e) {
163 dl 1.31 final ReentrantLock lock = this.lock;
164 dl 1.5 lock.lock();
165     try {
166 jsr166 1.40 boolean ok = q.offer(e);
167 dl 1.5 assert ok;
168     notEmpty.signal();
169     return true;
170 tim 1.19 } finally {
171 tim 1.13 lock.unlock();
172 dl 1.5 }
173     }
174    
175 dholmes 1.16 /**
176 jsr166 1.42 * Inserts the specified element into this priority queue. As the queue is
177 dholmes 1.16 * unbounded this method will never block.
178 jsr166 1.42 *
179 jsr166 1.40 * @param e the element to add
180 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
181     * with elements currently in the priority queue according to the
182     * priority queue's ordering
183     * @throws NullPointerException if the specified element is null
184 dholmes 1.16 */
185 jsr166 1.40 public void put(E e) {
186     offer(e); // never need to block
187 dl 1.5 }
188    
189 dholmes 1.16 /**
190 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
191 dholmes 1.16 * unbounded this method will never block.
192 jsr166 1.42 *
193 jsr166 1.40 * @param e the element to add
194 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
195     * @param unit This parameter is ignored as the method never blocks
196 dl 1.22 * @return <tt>true</tt>
197 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
198     * with elements currently in the priority queue according to the
199     * priority queue's ordering
200     * @throws NullPointerException if the specified element is null
201 dholmes 1.16 */
202 jsr166 1.40 public boolean offer(E e, long timeout, TimeUnit unit) {
203     return offer(e); // never need to block
204 dl 1.5 }
205    
206 jsr166 1.42 public E poll() {
207     final ReentrantLock lock = this.lock;
208     lock.lock();
209     try {
210     return q.poll();
211     } finally {
212     lock.unlock();
213     }
214     }
215    
216 dl 1.5 public E take() throws InterruptedException {
217 dl 1.31 final ReentrantLock lock = this.lock;
218 dl 1.5 lock.lockInterruptibly();
219     try {
220     try {
221     while (q.size() == 0)
222     notEmpty.await();
223 tim 1.19 } catch (InterruptedException ie) {
224 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
225     throw ie;
226     }
227     E x = q.poll();
228     assert x != null;
229     return x;
230 tim 1.19 } finally {
231 dl 1.5 lock.unlock();
232     }
233     }
234    
235     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
236 dholmes 1.10 long nanos = unit.toNanos(timeout);
237 dl 1.31 final ReentrantLock lock = this.lock;
238 dl 1.5 lock.lockInterruptibly();
239     try {
240     for (;;) {
241     E x = q.poll();
242 tim 1.13 if (x != null)
243 dl 1.5 return x;
244     if (nanos <= 0)
245     return null;
246     try {
247     nanos = notEmpty.awaitNanos(nanos);
248 tim 1.19 } catch (InterruptedException ie) {
249 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
250     throw ie;
251     }
252     }
253 tim 1.19 } finally {
254 dl 1.5 lock.unlock();
255     }
256     }
257    
258     public E peek() {
259 dl 1.31 final ReentrantLock lock = this.lock;
260 dl 1.5 lock.lock();
261     try {
262     return q.peek();
263 tim 1.19 } finally {
264 tim 1.13 lock.unlock();
265 dl 1.5 }
266     }
267    
268 jsr166 1.42 /**
269     * Returns the comparator used to order the elements in this queue,
270     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
271     * natural ordering} of its elements.
272     *
273     * @return the comparator used to order the elements in this queue,
274     * or <tt>null</tt> if this queue uses the natural
275     * ordering of its elements.
276     */
277     public Comparator<? super E> comparator() {
278     return q.comparator();
279     }
280    
281 dl 1.5 public int size() {
282 dl 1.31 final ReentrantLock lock = this.lock;
283 dl 1.5 lock.lock();
284     try {
285     return q.size();
286 tim 1.19 } finally {
287 dl 1.5 lock.unlock();
288     }
289     }
290    
291     /**
292     * Always returns <tt>Integer.MAX_VALUE</tt> because
293 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
294 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
295     */
296     public int remainingCapacity() {
297     return Integer.MAX_VALUE;
298     }
299    
300 dl 1.37 /**
301 jsr166 1.42 * Removes a single instance of the specified element from this queue,
302     * if it is present. More formally, removes an element <tt>e</tt> such
303     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
304     * elements.
305     * Returns <tt>true</tt> if this queue contained the specified element
306     * (or equivalently, if this queue changed as a result of the call).
307     *
308     * @param o element to be removed from this queue, if present
309     * @return <tt>true</tt> if this queue changed as a result of the call
310 dl 1.37 */
311 dholmes 1.14 public boolean remove(Object o) {
312 dl 1.31 final ReentrantLock lock = this.lock;
313 dl 1.5 lock.lock();
314     try {
315 dholmes 1.14 return q.remove(o);
316 tim 1.19 } finally {
317 dl 1.5 lock.unlock();
318     }
319     }
320    
321 jsr166 1.42 /**
322     * Returns <tt>true</tt> if this queue contains the specified element.
323     * More formally, returns <tt>true</tt> if and only if this queue contains
324     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
325     *
326     * @param o object to be checked for containment in this queue
327     * @return <tt>true</tt> if this queue contains the specified element
328     */
329 dholmes 1.14 public boolean contains(Object o) {
330 dl 1.31 final ReentrantLock lock = this.lock;
331 dl 1.5 lock.lock();
332     try {
333 dholmes 1.14 return q.contains(o);
334 tim 1.19 } finally {
335 dl 1.5 lock.unlock();
336     }
337     }
338    
339 jsr166 1.42 /**
340     * Returns an array containing all of the elements in this queue.
341     * The returned array elements are in no particular order.
342     *
343     * <p>The returned array will be "safe" in that no references to it are
344     * maintained by this queue. (In other words, this method must allocate
345     * a new array). The caller is thus free to modify the returned array.
346 jsr166 1.43 *
347 jsr166 1.42 * <p>This method acts as bridge between array-based and collection-based
348     * APIs.
349     *
350     * @return an array containing all of the elements in this queue
351     */
352 dl 1.5 public Object[] toArray() {
353 dl 1.31 final ReentrantLock lock = this.lock;
354 dl 1.5 lock.lock();
355     try {
356     return q.toArray();
357 tim 1.19 } finally {
358 dl 1.5 lock.unlock();
359     }
360     }
361    
362    
363     public String toString() {
364 dl 1.31 final ReentrantLock lock = this.lock;
365 dl 1.5 lock.lock();
366     try {
367     return q.toString();
368 tim 1.19 } finally {
369 dl 1.5 lock.unlock();
370     }
371     }
372    
373 jsr166 1.42 /**
374     * @throws UnsupportedOperationException {@inheritDoc}
375     * @throws ClassCastException {@inheritDoc}
376     * @throws NullPointerException {@inheritDoc}
377     * @throws IllegalArgumentException {@inheritDoc}
378     */
379 dl 1.26 public int drainTo(Collection<? super E> c) {
380     if (c == null)
381     throw new NullPointerException();
382     if (c == this)
383     throw new IllegalArgumentException();
384 dl 1.31 final ReentrantLock lock = this.lock;
385 dl 1.26 lock.lock();
386     try {
387     int n = 0;
388     E e;
389     while ( (e = q.poll()) != null) {
390     c.add(e);
391     ++n;
392     }
393     return n;
394     } finally {
395     lock.unlock();
396     }
397     }
398    
399 jsr166 1.42 /**
400     * @throws UnsupportedOperationException {@inheritDoc}
401     * @throws ClassCastException {@inheritDoc}
402     * @throws NullPointerException {@inheritDoc}
403     * @throws IllegalArgumentException {@inheritDoc}
404     */
405 dl 1.26 public int drainTo(Collection<? super E> c, int maxElements) {
406     if (c == null)
407     throw new NullPointerException();
408     if (c == this)
409     throw new IllegalArgumentException();
410     if (maxElements <= 0)
411     return 0;
412 dl 1.31 final ReentrantLock lock = this.lock;
413 dl 1.26 lock.lock();
414     try {
415     int n = 0;
416     E e;
417     while (n < maxElements && (e = q.poll()) != null) {
418     c.add(e);
419     ++n;
420     }
421     return n;
422     } finally {
423     lock.unlock();
424     }
425     }
426    
427 dl 1.17 /**
428 dl 1.37 * Atomically removes all of the elements from this queue.
429 dl 1.17 * The queue will be empty after this call returns.
430     */
431     public void clear() {
432 dl 1.31 final ReentrantLock lock = this.lock;
433 dl 1.17 lock.lock();
434     try {
435     q.clear();
436 tim 1.19 } finally {
437 dl 1.17 lock.unlock();
438     }
439     }
440    
441 jsr166 1.42 /**
442     * Returns an array containing all of the elements in this queue; the
443     * runtime type of the returned array is that of the specified array.
444     * The returned array elements are in no particular order.
445     * If the queue fits in the specified array, it is returned therein.
446     * Otherwise, a new array is allocated with the runtime type of the
447     * specified array and the size of this queue.
448     *
449     * <p>If this queue fits in the specified array with room to spare
450     * (i.e., the array has more elements than this queue), the element in
451     * the array immediately following the end of the queue is set to
452     * <tt>null</tt>.
453     *
454     * <p>Like the {@link #toArray()} method, this method acts as bridge between
455     * array-based and collection-based APIs. Further, this method allows
456     * precise control over the runtime type of the output array, and may,
457     * under certain circumstances, be used to save allocation costs.
458     *
459     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
460     * The following code can be used to dump the queue into a newly
461     * allocated array of <tt>String</tt>:
462     *
463     * <pre>
464     * String[] y = x.toArray(new String[0]);</pre>
465     *
466     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
467     * <tt>toArray()</tt>.
468     *
469     * @param a the array into which the elements of the queue are to
470     * be stored, if it is big enough; otherwise, a new array of the
471     * same runtime type is allocated for this purpose
472     * @return an array containing all of the elements in this queue
473     * @throws ArrayStoreException if the runtime type of the specified array
474     * is not a supertype of the runtime type of every element in
475     * this queue
476     * @throws NullPointerException if the specified array is null
477     */
478 dl 1.5 public <T> T[] toArray(T[] a) {
479 dl 1.31 final ReentrantLock lock = this.lock;
480 dl 1.5 lock.lock();
481     try {
482     return q.toArray(a);
483 tim 1.19 } finally {
484 dl 1.5 lock.unlock();
485     }
486     }
487    
488 dholmes 1.16 /**
489 dl 1.23 * Returns an iterator over the elements in this queue. The
490     * iterator does not return the elements in any particular order.
491     * The returned iterator is a thread-safe "fast-fail" iterator
492 jsr166 1.39 * that will throw {@link ConcurrentModificationException} upon
493     * detected interference.
494 dholmes 1.16 *
495 jsr166 1.42 * @return an iterator over the elements in this queue
496 dholmes 1.16 */
497 dl 1.5 public Iterator<E> iterator() {
498 dl 1.31 final ReentrantLock lock = this.lock;
499 dl 1.5 lock.lock();
500     try {
501 dl 1.44 return new Itr<E>(q.iterator());
502 tim 1.19 } finally {
503 dl 1.5 lock.unlock();
504     }
505     }
506    
507     private class Itr<E> implements Iterator<E> {
508     private final Iterator<E> iter;
509 tim 1.13 Itr(Iterator<E> i) {
510     iter = i;
511 dl 1.5 }
512    
513 tim 1.13 public boolean hasNext() {
514 dl 1.5 /*
515     * No sync -- we rely on underlying hasNext to be
516     * stateless, in which case we can return true by mistake
517 dl 1.30 * only when next() will subsequently throw
518 dl 1.5 * ConcurrentModificationException.
519     */
520     return iter.hasNext();
521 tim 1.13 }
522    
523     public E next() {
524 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
525 dl 1.5 lock.lock();
526     try {
527     return iter.next();
528 tim 1.19 } finally {
529 dl 1.5 lock.unlock();
530     }
531 tim 1.13 }
532    
533     public void remove() {
534 dl 1.31 ReentrantLock lock = PriorityBlockingQueue.this.lock;
535 dl 1.5 lock.lock();
536     try {
537     iter.remove();
538 tim 1.19 } finally {
539 dl 1.5 lock.unlock();
540     }
541 tim 1.13 }
542 dl 1.5 }
543    
544     /**
545     * Save the state to a stream (that is, serialize it). This
546     * merely wraps default serialization within lock. The
547     * serialization strategy for items is left to underlying
548     * Queue. Note that locking is not needed on deserialization, so
549     * readObject is not defined, just relying on default.
550     */
551     private void writeObject(java.io.ObjectOutputStream s)
552     throws java.io.IOException {
553     lock.lock();
554     try {
555     s.defaultWriteObject();
556 tim 1.19 } finally {
557 dl 1.5 lock.unlock();
558     }
559 tim 1.1 }
560    
561     }