ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.54
Committed: Sun May 18 23:47:56 2008 UTC (16 years ago) by jsr166
Branch: MAIN
Changes since 1.53: +3 -3 lines
Log Message:
Sync with OpenJDK; untabify

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.25 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14     * the same ordering rules as class {@link PriorityQueue} and supplies
15     * blocking retrieval operations. While this queue is logically
16 dl 1.24 * unbounded, attempted additions may fail due to resource exhaustion
17 dl 1.25 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 jsr166 1.42 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19     * Comparable natural ordering} also does not permit insertion of
20     * non-comparable objects (doing so results in
21     * <tt>ClassCastException</tt>).
22 dl 1.20 *
23 dl 1.38 * <p>This class and its iterator implement all of the
24     * <em>optional</em> methods of the {@link Collection} and {@link
25 dl 1.41 * Iterator} interfaces. The Iterator provided in method {@link
26     * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27     * the PriorityBlockingQueue in any particular order. If you need
28     * ordered traversal, consider using
29     * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30     * can be used to <em>remove</em> some or all elements in priority
31     * order and place them in another collection.
32     *
33     * <p>Operations on this class make no guarantees about the ordering
34     * of elements with equal priority. If you need to enforce an
35     * ordering, you can define custom classes or comparators that use a
36     * secondary key to break ties in primary priority values. For
37     * example, here is a class that applies first-in-first-out
38     * tie-breaking to comparable elements. To use it, you would insert a
39     * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40     *
41     * <pre>
42     * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
43 jsr166 1.47 * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
44     * final static AtomicLong seq = new AtomicLong();
45 dl 1.41 * final long seqNum;
46     * final E entry;
47     * public FIFOEntry(E entry) {
48     * seqNum = seq.getAndIncrement();
49     * this.entry = entry;
50     * }
51     * public E getEntry() { return entry; }
52     * public int compareTo(FIFOEntry&lt;E&gt; other) {
53     * int res = entry.compareTo(other.entry);
54 jsr166 1.42 * if (res == 0 &amp;&amp; other.entry != this.entry)
55 dl 1.41 * res = (seqNum &lt; other.seqNum ? -1 : 1);
56     * return res;
57     * }
58 jsr166 1.42 * }</pre>
59 dl 1.20 *
60 dl 1.35 * <p>This class is a member of the
61 jsr166 1.53 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
62 dl 1.35 * Java Collections Framework</a>.
63     *
64 dl 1.6 * @since 1.5
65     * @author Doug Lea
66 dl 1.29 * @param <E> the type of elements held in this collection
67 dl 1.28 */
68 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
70 dl 1.21 private static final long serialVersionUID = 5595510919245408276L;
71 tim 1.1
72 dl 1.5 private final PriorityQueue<E> q;
73 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
74 dl 1.32 private final Condition notEmpty = lock.newCondition();
75 dl 1.5
76 dl 1.2 /**
77 jsr166 1.42 * Creates a <tt>PriorityBlockingQueue</tt> with the default
78     * initial capacity (11) that orders its elements according to
79     * their {@linkplain Comparable natural ordering}.
80 dl 1.2 */
81     public PriorityBlockingQueue() {
82 dl 1.5 q = new PriorityQueue<E>();
83 dl 1.2 }
84    
85     /**
86 jsr166 1.42 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87     * initial capacity that orders its elements according to their
88     * {@linkplain Comparable natural ordering}.
89 dl 1.2 *
90 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
91 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92 jsr166 1.52 * than 1
93 dl 1.2 */
94     public PriorityBlockingQueue(int initialCapacity) {
95 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
96 dl 1.2 }
97    
98     /**
99 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100 jsr166 1.39 * capacity that orders its elements according to the specified
101     * comparator.
102 dl 1.2 *
103 jsr166 1.42 * @param initialCapacity the initial capacity for this priority queue
104 jsr166 1.52 * @param comparator the comparator that will be used to order this
105     * priority queue. If {@code null}, the {@linkplain Comparable
106     * natural ordering} of the elements will be used.
107 dholmes 1.16 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108 jsr166 1.52 * than 1
109 dl 1.2 */
110 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
111 dholmes 1.14 Comparator<? super E> comparator) {
112 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
113 dl 1.2 }
114    
115     /**
116 dholmes 1.16 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117 jsr166 1.52 * in the specified collection. If the specified collection is a
118     * {@link SortedSet} or a {@link PriorityQueue}, this
119     * priority queue will be ordered according to the same ordering.
120     * Otherwise, this priority queue will be ordered according to the
121     * {@linkplain Comparable natural ordering} of its elements.
122 dl 1.2 *
123 jsr166 1.52 * @param c the collection whose elements are to be placed
124     * into this priority queue
125 dl 1.2 * @throws ClassCastException if elements of the specified collection
126     * cannot be compared to one another according to the priority
127 jsr166 1.52 * queue's ordering
128 jsr166 1.42 * @throws NullPointerException if the specified collection or any
129     * of its elements are null
130 dl 1.2 */
131 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
132     q = new PriorityQueue<E>(c);
133 dl 1.7 }
134    
135 dholmes 1.10 /**
136 jsr166 1.42 * Inserts the specified element into this priority queue.
137     *
138 jsr166 1.40 * @param e the element to add
139 jsr166 1.48 * @return <tt>true</tt> (as specified by {@link Collection#add})
140 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
141 jsr166 1.42 * with elements currently in the priority queue according to the
142     * priority queue's ordering
143     * @throws NullPointerException if the specified element is null
144 dholmes 1.10 */
145 jsr166 1.40 public boolean add(E e) {
146 jsr166 1.42 return offer(e);
147 dl 1.5 }
148    
149 dholmes 1.16 /**
150 dl 1.24 * Inserts the specified element into this priority queue.
151 dholmes 1.16 *
152 jsr166 1.40 * @param e the element to add
153 jsr166 1.48 * @return <tt>true</tt> (as specified by {@link Queue#offer})
154 dholmes 1.16 * @throws ClassCastException if the specified element cannot be compared
155 jsr166 1.42 * with elements currently in the priority queue according to the
156     * priority queue's ordering
157     * @throws NullPointerException if the specified element is null
158 dholmes 1.16 */
159 jsr166 1.40 public boolean offer(E e) {
160 dl 1.31 final ReentrantLock lock = this.lock;
161 dl 1.5 lock.lock();
162     try {
163 jsr166 1.40 boolean ok = q.offer(e);
164 dl 1.5 assert ok;
165     notEmpty.signal();
166     return true;
167 tim 1.19 } finally {
168 tim 1.13 lock.unlock();
169 dl 1.5 }
170     }
171    
172 dholmes 1.16 /**
173 jsr166 1.42 * Inserts the specified element into this priority queue. As the queue is
174 dholmes 1.16 * unbounded this method will never block.
175 jsr166 1.42 *
176 jsr166 1.40 * @param e the element to add
177 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
178     * with elements currently in the priority queue according to the
179     * priority queue's ordering
180     * @throws NullPointerException if the specified element is null
181 dholmes 1.16 */
182 jsr166 1.40 public void put(E e) {
183     offer(e); // never need to block
184 dl 1.5 }
185    
186 dholmes 1.16 /**
187 dl 1.24 * Inserts the specified element into this priority queue. As the queue is
188 dholmes 1.16 * unbounded this method will never block.
189 jsr166 1.42 *
190 jsr166 1.40 * @param e the element to add
191 dholmes 1.16 * @param timeout This parameter is ignored as the method never blocks
192     * @param unit This parameter is ignored as the method never blocks
193 dl 1.22 * @return <tt>true</tt>
194 jsr166 1.42 * @throws ClassCastException if the specified element cannot be compared
195     * with elements currently in the priority queue according to the
196     * priority queue's ordering
197     * @throws NullPointerException if the specified element is null
198 dholmes 1.16 */
199 jsr166 1.40 public boolean offer(E e, long timeout, TimeUnit unit) {
200     return offer(e); // never need to block
201 dl 1.5 }
202    
203 jsr166 1.42 public E poll() {
204     final ReentrantLock lock = this.lock;
205     lock.lock();
206     try {
207     return q.poll();
208     } finally {
209     lock.unlock();
210     }
211     }
212    
213 dl 1.5 public E take() throws InterruptedException {
214 dl 1.31 final ReentrantLock lock = this.lock;
215 dl 1.5 lock.lockInterruptibly();
216     try {
217     try {
218     while (q.size() == 0)
219     notEmpty.await();
220 tim 1.19 } catch (InterruptedException ie) {
221 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
222     throw ie;
223     }
224     E x = q.poll();
225     assert x != null;
226     return x;
227 tim 1.19 } finally {
228 dl 1.5 lock.unlock();
229     }
230     }
231    
232     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
233 dholmes 1.10 long nanos = unit.toNanos(timeout);
234 dl 1.31 final ReentrantLock lock = this.lock;
235 dl 1.5 lock.lockInterruptibly();
236     try {
237     for (;;) {
238     E x = q.poll();
239 tim 1.13 if (x != null)
240 dl 1.5 return x;
241     if (nanos <= 0)
242     return null;
243     try {
244     nanos = notEmpty.awaitNanos(nanos);
245 tim 1.19 } catch (InterruptedException ie) {
246 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
247     throw ie;
248     }
249     }
250 tim 1.19 } finally {
251 dl 1.5 lock.unlock();
252     }
253     }
254    
255     public E peek() {
256 dl 1.31 final ReentrantLock lock = this.lock;
257 dl 1.5 lock.lock();
258     try {
259     return q.peek();
260 tim 1.19 } finally {
261 tim 1.13 lock.unlock();
262 dl 1.5 }
263     }
264    
265 jsr166 1.42 /**
266     * Returns the comparator used to order the elements in this queue,
267     * or <tt>null</tt> if this queue uses the {@linkplain Comparable
268     * natural ordering} of its elements.
269     *
270     * @return the comparator used to order the elements in this queue,
271     * or <tt>null</tt> if this queue uses the natural
272 jsr166 1.52 * ordering of its elements
273 jsr166 1.42 */
274     public Comparator<? super E> comparator() {
275     return q.comparator();
276     }
277    
278 dl 1.5 public int size() {
279 dl 1.31 final ReentrantLock lock = this.lock;
280 dl 1.5 lock.lock();
281     try {
282     return q.size();
283 tim 1.19 } finally {
284 dl 1.5 lock.unlock();
285     }
286     }
287    
288     /**
289     * Always returns <tt>Integer.MAX_VALUE</tt> because
290 dholmes 1.16 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
291 dl 1.5 * @return <tt>Integer.MAX_VALUE</tt>
292     */
293     public int remainingCapacity() {
294     return Integer.MAX_VALUE;
295     }
296    
297 dl 1.37 /**
298 jsr166 1.42 * Removes a single instance of the specified element from this queue,
299 jsr166 1.52 * if it is present. More formally, removes an element {@code e} such
300     * that {@code o.equals(e)}, if this queue contains one or more such
301     * elements. Returns {@code true} if and only if this queue contained
302     * the specified element (or equivalently, if this queue changed as a
303     * result of the call).
304 jsr166 1.42 *
305     * @param o element to be removed from this queue, if present
306     * @return <tt>true</tt> if this queue changed as a result of the call
307 dl 1.37 */
308 dholmes 1.14 public boolean remove(Object o) {
309 dl 1.31 final ReentrantLock lock = this.lock;
310 dl 1.5 lock.lock();
311     try {
312 dholmes 1.14 return q.remove(o);
313 tim 1.19 } finally {
314 dl 1.5 lock.unlock();
315     }
316     }
317    
318 jsr166 1.42 /**
319 jsr166 1.52 * Returns {@code true} if this queue contains the specified element.
320     * More formally, returns {@code true} if and only if this queue contains
321     * at least one element {@code e} such that {@code o.equals(e)}.
322 jsr166 1.42 *
323     * @param o object to be checked for containment in this queue
324     * @return <tt>true</tt> if this queue contains the specified element
325     */
326 dholmes 1.14 public boolean contains(Object o) {
327 dl 1.31 final ReentrantLock lock = this.lock;
328 dl 1.5 lock.lock();
329     try {
330 dholmes 1.14 return q.contains(o);
331 tim 1.19 } finally {
332 dl 1.5 lock.unlock();
333     }
334     }
335    
336 jsr166 1.42 /**
337     * Returns an array containing all of the elements in this queue.
338     * The returned array elements are in no particular order.
339     *
340     * <p>The returned array will be "safe" in that no references to it are
341     * maintained by this queue. (In other words, this method must allocate
342     * a new array). The caller is thus free to modify the returned array.
343 jsr166 1.43 *
344 jsr166 1.42 * <p>This method acts as bridge between array-based and collection-based
345     * APIs.
346     *
347     * @return an array containing all of the elements in this queue
348     */
349 dl 1.5 public Object[] toArray() {
350 dl 1.31 final ReentrantLock lock = this.lock;
351 dl 1.5 lock.lock();
352     try {
353     return q.toArray();
354 tim 1.19 } finally {
355 dl 1.5 lock.unlock();
356     }
357     }
358    
359 jsr166 1.52
360 dl 1.5 public String toString() {
361 dl 1.31 final ReentrantLock lock = this.lock;
362 dl 1.5 lock.lock();
363     try {
364     return q.toString();
365 tim 1.19 } finally {
366 dl 1.5 lock.unlock();
367     }
368     }
369    
370 jsr166 1.42 /**
371     * @throws UnsupportedOperationException {@inheritDoc}
372     * @throws ClassCastException {@inheritDoc}
373     * @throws NullPointerException {@inheritDoc}
374     * @throws IllegalArgumentException {@inheritDoc}
375     */
376 dl 1.26 public int drainTo(Collection<? super E> c) {
377     if (c == null)
378     throw new NullPointerException();
379     if (c == this)
380     throw new IllegalArgumentException();
381 dl 1.31 final ReentrantLock lock = this.lock;
382 dl 1.26 lock.lock();
383     try {
384     int n = 0;
385     E e;
386     while ( (e = q.poll()) != null) {
387     c.add(e);
388     ++n;
389     }
390     return n;
391     } finally {
392     lock.unlock();
393     }
394     }
395    
396 jsr166 1.42 /**
397     * @throws UnsupportedOperationException {@inheritDoc}
398     * @throws ClassCastException {@inheritDoc}
399     * @throws NullPointerException {@inheritDoc}
400     * @throws IllegalArgumentException {@inheritDoc}
401     */
402 dl 1.26 public int drainTo(Collection<? super E> c, int maxElements) {
403     if (c == null)
404     throw new NullPointerException();
405     if (c == this)
406     throw new IllegalArgumentException();
407     if (maxElements <= 0)
408     return 0;
409 dl 1.31 final ReentrantLock lock = this.lock;
410 dl 1.26 lock.lock();
411     try {
412     int n = 0;
413     E e;
414     while (n < maxElements && (e = q.poll()) != null) {
415     c.add(e);
416     ++n;
417     }
418     return n;
419     } finally {
420     lock.unlock();
421     }
422     }
423    
424 dl 1.17 /**
425 dl 1.37 * Atomically removes all of the elements from this queue.
426 dl 1.17 * The queue will be empty after this call returns.
427     */
428     public void clear() {
429 dl 1.31 final ReentrantLock lock = this.lock;
430 dl 1.17 lock.lock();
431     try {
432     q.clear();
433 tim 1.19 } finally {
434 dl 1.17 lock.unlock();
435     }
436     }
437    
438 jsr166 1.42 /**
439     * Returns an array containing all of the elements in this queue; the
440     * runtime type of the returned array is that of the specified array.
441     * The returned array elements are in no particular order.
442     * If the queue fits in the specified array, it is returned therein.
443     * Otherwise, a new array is allocated with the runtime type of the
444     * specified array and the size of this queue.
445     *
446     * <p>If this queue fits in the specified array with room to spare
447     * (i.e., the array has more elements than this queue), the element in
448     * the array immediately following the end of the queue is set to
449     * <tt>null</tt>.
450     *
451     * <p>Like the {@link #toArray()} method, this method acts as bridge between
452     * array-based and collection-based APIs. Further, this method allows
453     * precise control over the runtime type of the output array, and may,
454     * under certain circumstances, be used to save allocation costs.
455     *
456     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
457     * The following code can be used to dump the queue into a newly
458     * allocated array of <tt>String</tt>:
459     *
460     * <pre>
461     * String[] y = x.toArray(new String[0]);</pre>
462     *
463     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
464     * <tt>toArray()</tt>.
465     *
466     * @param a the array into which the elements of the queue are to
467     * be stored, if it is big enough; otherwise, a new array of the
468     * same runtime type is allocated for this purpose
469     * @return an array containing all of the elements in this queue
470     * @throws ArrayStoreException if the runtime type of the specified array
471     * is not a supertype of the runtime type of every element in
472     * this queue
473     * @throws NullPointerException if the specified array is null
474     */
475 dl 1.5 public <T> T[] toArray(T[] a) {
476 dl 1.31 final ReentrantLock lock = this.lock;
477 dl 1.5 lock.lock();
478     try {
479     return q.toArray(a);
480 tim 1.19 } finally {
481 dl 1.5 lock.unlock();
482     }
483     }
484    
485 dholmes 1.16 /**
486 dl 1.23 * Returns an iterator over the elements in this queue. The
487     * iterator does not return the elements in any particular order.
488 dl 1.51 * The returned <tt>Iterator</tt> is a "weakly consistent"
489     * iterator that will never throw {@link
490     * ConcurrentModificationException}, and guarantees to traverse
491     * elements as they existed upon construction of the iterator, and
492     * may (but is not guaranteed to) reflect any modifications
493     * subsequent to construction.
494 dholmes 1.16 *
495 jsr166 1.42 * @return an iterator over the elements in this queue
496 dholmes 1.16 */
497 dl 1.5 public Iterator<E> iterator() {
498 dl 1.51 return new Itr(toArray());
499 dl 1.5 }
500    
501 dl 1.49 /**
502     * Snapshot iterator that works off copy of underlying q array.
503     */
504 dl 1.51 private class Itr implements Iterator<E> {
505 dl 1.49 final Object[] array; // Array of all elements
506 jsr166 1.54 int cursor; // index of next element to return;
507     int lastRet; // index of last element, or -1 if no such
508 jsr166 1.50
509 dl 1.49 Itr(Object[] array) {
510     lastRet = -1;
511     this.array = array;
512 dl 1.5 }
513    
514 tim 1.13 public boolean hasNext() {
515 dl 1.49 return cursor < array.length;
516 tim 1.13 }
517    
518     public E next() {
519 dl 1.49 if (cursor >= array.length)
520     throw new NoSuchElementException();
521     lastRet = cursor;
522     return (E)array[cursor++];
523 tim 1.13 }
524    
525     public void remove() {
526 jsr166 1.50 if (lastRet < 0)
527 jsr166 1.54 throw new IllegalStateException();
528 dl 1.49 Object x = array[lastRet];
529     lastRet = -1;
530     // Traverse underlying queue to find == element,
531     // not just a .equals element.
532 dl 1.5 lock.lock();
533     try {
534 dl 1.49 for (Iterator it = q.iterator(); it.hasNext(); ) {
535     if (it.next() == x) {
536     it.remove();
537     return;
538     }
539     }
540 tim 1.19 } finally {
541 dl 1.5 lock.unlock();
542     }
543 tim 1.13 }
544 dl 1.5 }
545    
546     /**
547 jsr166 1.52 * Saves the state to a stream (that is, serializes it). This
548 dl 1.5 * merely wraps default serialization within lock. The
549     * serialization strategy for items is left to underlying
550     * Queue. Note that locking is not needed on deserialization, so
551     * readObject is not defined, just relying on default.
552     */
553     private void writeObject(java.io.ObjectOutputStream s)
554     throws java.io.IOException {
555     lock.lock();
556     try {
557     s.defaultWriteObject();
558 tim 1.19 } finally {
559 dl 1.5 lock.unlock();
560     }
561 tim 1.1 }
562    
563     }