ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.54
Committed: Sun May 18 23:47:56 2008 UTC (16 years ago) by jsr166
Branch: MAIN
Changes since 1.53: +3 -3 lines
Log Message:
Sync with OpenJDK; untabify

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19 * Comparable natural ordering} also does not permit insertion of
20 * non-comparable objects (doing so results in
21 * <tt>ClassCastException</tt>).
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces. The Iterator provided in method {@link
26 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27 * the PriorityBlockingQueue in any particular order. If you need
28 * ordered traversal, consider using
29 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30 * can be used to <em>remove</em> some or all elements in priority
31 * order and place them in another collection.
32 *
33 * <p>Operations on this class make no guarantees about the ordering
34 * of elements with equal priority. If you need to enforce an
35 * ordering, you can define custom classes or comparators that use a
36 * secondary key to break ties in primary priority values. For
37 * example, here is a class that applies first-in-first-out
38 * tie-breaking to comparable elements. To use it, you would insert a
39 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40 *
41 * <pre>
42 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
43 * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
44 * final static AtomicLong seq = new AtomicLong();
45 * final long seqNum;
46 * final E entry;
47 * public FIFOEntry(E entry) {
48 * seqNum = seq.getAndIncrement();
49 * this.entry = entry;
50 * }
51 * public E getEntry() { return entry; }
52 * public int compareTo(FIFOEntry&lt;E&gt; other) {
53 * int res = entry.compareTo(other.entry);
54 * if (res == 0 &amp;&amp; other.entry != this.entry)
55 * res = (seqNum &lt; other.seqNum ? -1 : 1);
56 * return res;
57 * }
58 * }</pre>
59 *
60 * <p>This class is a member of the
61 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
62 * Java Collections Framework</a>.
63 *
64 * @since 1.5
65 * @author Doug Lea
66 * @param <E> the type of elements held in this collection
67 */
68 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 implements BlockingQueue<E>, java.io.Serializable {
70 private static final long serialVersionUID = 5595510919245408276L;
71
72 private final PriorityQueue<E> q;
73 private final ReentrantLock lock = new ReentrantLock(true);
74 private final Condition notEmpty = lock.newCondition();
75
76 /**
77 * Creates a <tt>PriorityBlockingQueue</tt> with the default
78 * initial capacity (11) that orders its elements according to
79 * their {@linkplain Comparable natural ordering}.
80 */
81 public PriorityBlockingQueue() {
82 q = new PriorityQueue<E>();
83 }
84
85 /**
86 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87 * initial capacity that orders its elements according to their
88 * {@linkplain Comparable natural ordering}.
89 *
90 * @param initialCapacity the initial capacity for this priority queue
91 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92 * than 1
93 */
94 public PriorityBlockingQueue(int initialCapacity) {
95 q = new PriorityQueue<E>(initialCapacity, null);
96 }
97
98 /**
99 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100 * capacity that orders its elements according to the specified
101 * comparator.
102 *
103 * @param initialCapacity the initial capacity for this priority queue
104 * @param comparator the comparator that will be used to order this
105 * priority queue. If {@code null}, the {@linkplain Comparable
106 * natural ordering} of the elements will be used.
107 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108 * than 1
109 */
110 public PriorityBlockingQueue(int initialCapacity,
111 Comparator<? super E> comparator) {
112 q = new PriorityQueue<E>(initialCapacity, comparator);
113 }
114
115 /**
116 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117 * in the specified collection. If the specified collection is a
118 * {@link SortedSet} or a {@link PriorityQueue}, this
119 * priority queue will be ordered according to the same ordering.
120 * Otherwise, this priority queue will be ordered according to the
121 * {@linkplain Comparable natural ordering} of its elements.
122 *
123 * @param c the collection whose elements are to be placed
124 * into this priority queue
125 * @throws ClassCastException if elements of the specified collection
126 * cannot be compared to one another according to the priority
127 * queue's ordering
128 * @throws NullPointerException if the specified collection or any
129 * of its elements are null
130 */
131 public PriorityBlockingQueue(Collection<? extends E> c) {
132 q = new PriorityQueue<E>(c);
133 }
134
135 /**
136 * Inserts the specified element into this priority queue.
137 *
138 * @param e the element to add
139 * @return <tt>true</tt> (as specified by {@link Collection#add})
140 * @throws ClassCastException if the specified element cannot be compared
141 * with elements currently in the priority queue according to the
142 * priority queue's ordering
143 * @throws NullPointerException if the specified element is null
144 */
145 public boolean add(E e) {
146 return offer(e);
147 }
148
149 /**
150 * Inserts the specified element into this priority queue.
151 *
152 * @param e the element to add
153 * @return <tt>true</tt> (as specified by {@link Queue#offer})
154 * @throws ClassCastException if the specified element cannot be compared
155 * with elements currently in the priority queue according to the
156 * priority queue's ordering
157 * @throws NullPointerException if the specified element is null
158 */
159 public boolean offer(E e) {
160 final ReentrantLock lock = this.lock;
161 lock.lock();
162 try {
163 boolean ok = q.offer(e);
164 assert ok;
165 notEmpty.signal();
166 return true;
167 } finally {
168 lock.unlock();
169 }
170 }
171
172 /**
173 * Inserts the specified element into this priority queue. As the queue is
174 * unbounded this method will never block.
175 *
176 * @param e the element to add
177 * @throws ClassCastException if the specified element cannot be compared
178 * with elements currently in the priority queue according to the
179 * priority queue's ordering
180 * @throws NullPointerException if the specified element is null
181 */
182 public void put(E e) {
183 offer(e); // never need to block
184 }
185
186 /**
187 * Inserts the specified element into this priority queue. As the queue is
188 * unbounded this method will never block.
189 *
190 * @param e the element to add
191 * @param timeout This parameter is ignored as the method never blocks
192 * @param unit This parameter is ignored as the method never blocks
193 * @return <tt>true</tt>
194 * @throws ClassCastException if the specified element cannot be compared
195 * with elements currently in the priority queue according to the
196 * priority queue's ordering
197 * @throws NullPointerException if the specified element is null
198 */
199 public boolean offer(E e, long timeout, TimeUnit unit) {
200 return offer(e); // never need to block
201 }
202
203 public E poll() {
204 final ReentrantLock lock = this.lock;
205 lock.lock();
206 try {
207 return q.poll();
208 } finally {
209 lock.unlock();
210 }
211 }
212
213 public E take() throws InterruptedException {
214 final ReentrantLock lock = this.lock;
215 lock.lockInterruptibly();
216 try {
217 try {
218 while (q.size() == 0)
219 notEmpty.await();
220 } catch (InterruptedException ie) {
221 notEmpty.signal(); // propagate to non-interrupted thread
222 throw ie;
223 }
224 E x = q.poll();
225 assert x != null;
226 return x;
227 } finally {
228 lock.unlock();
229 }
230 }
231
232 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
233 long nanos = unit.toNanos(timeout);
234 final ReentrantLock lock = this.lock;
235 lock.lockInterruptibly();
236 try {
237 for (;;) {
238 E x = q.poll();
239 if (x != null)
240 return x;
241 if (nanos <= 0)
242 return null;
243 try {
244 nanos = notEmpty.awaitNanos(nanos);
245 } catch (InterruptedException ie) {
246 notEmpty.signal(); // propagate to non-interrupted thread
247 throw ie;
248 }
249 }
250 } finally {
251 lock.unlock();
252 }
253 }
254
255 public E peek() {
256 final ReentrantLock lock = this.lock;
257 lock.lock();
258 try {
259 return q.peek();
260 } finally {
261 lock.unlock();
262 }
263 }
264
265 /**
266 * Returns the comparator used to order the elements in this queue,
267 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
268 * natural ordering} of its elements.
269 *
270 * @return the comparator used to order the elements in this queue,
271 * or <tt>null</tt> if this queue uses the natural
272 * ordering of its elements
273 */
274 public Comparator<? super E> comparator() {
275 return q.comparator();
276 }
277
278 public int size() {
279 final ReentrantLock lock = this.lock;
280 lock.lock();
281 try {
282 return q.size();
283 } finally {
284 lock.unlock();
285 }
286 }
287
288 /**
289 * Always returns <tt>Integer.MAX_VALUE</tt> because
290 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
291 * @return <tt>Integer.MAX_VALUE</tt>
292 */
293 public int remainingCapacity() {
294 return Integer.MAX_VALUE;
295 }
296
297 /**
298 * Removes a single instance of the specified element from this queue,
299 * if it is present. More formally, removes an element {@code e} such
300 * that {@code o.equals(e)}, if this queue contains one or more such
301 * elements. Returns {@code true} if and only if this queue contained
302 * the specified element (or equivalently, if this queue changed as a
303 * result of the call).
304 *
305 * @param o element to be removed from this queue, if present
306 * @return <tt>true</tt> if this queue changed as a result of the call
307 */
308 public boolean remove(Object o) {
309 final ReentrantLock lock = this.lock;
310 lock.lock();
311 try {
312 return q.remove(o);
313 } finally {
314 lock.unlock();
315 }
316 }
317
318 /**
319 * Returns {@code true} if this queue contains the specified element.
320 * More formally, returns {@code true} if and only if this queue contains
321 * at least one element {@code e} such that {@code o.equals(e)}.
322 *
323 * @param o object to be checked for containment in this queue
324 * @return <tt>true</tt> if this queue contains the specified element
325 */
326 public boolean contains(Object o) {
327 final ReentrantLock lock = this.lock;
328 lock.lock();
329 try {
330 return q.contains(o);
331 } finally {
332 lock.unlock();
333 }
334 }
335
336 /**
337 * Returns an array containing all of the elements in this queue.
338 * The returned array elements are in no particular order.
339 *
340 * <p>The returned array will be "safe" in that no references to it are
341 * maintained by this queue. (In other words, this method must allocate
342 * a new array). The caller is thus free to modify the returned array.
343 *
344 * <p>This method acts as bridge between array-based and collection-based
345 * APIs.
346 *
347 * @return an array containing all of the elements in this queue
348 */
349 public Object[] toArray() {
350 final ReentrantLock lock = this.lock;
351 lock.lock();
352 try {
353 return q.toArray();
354 } finally {
355 lock.unlock();
356 }
357 }
358
359
360 public String toString() {
361 final ReentrantLock lock = this.lock;
362 lock.lock();
363 try {
364 return q.toString();
365 } finally {
366 lock.unlock();
367 }
368 }
369
370 /**
371 * @throws UnsupportedOperationException {@inheritDoc}
372 * @throws ClassCastException {@inheritDoc}
373 * @throws NullPointerException {@inheritDoc}
374 * @throws IllegalArgumentException {@inheritDoc}
375 */
376 public int drainTo(Collection<? super E> c) {
377 if (c == null)
378 throw new NullPointerException();
379 if (c == this)
380 throw new IllegalArgumentException();
381 final ReentrantLock lock = this.lock;
382 lock.lock();
383 try {
384 int n = 0;
385 E e;
386 while ( (e = q.poll()) != null) {
387 c.add(e);
388 ++n;
389 }
390 return n;
391 } finally {
392 lock.unlock();
393 }
394 }
395
396 /**
397 * @throws UnsupportedOperationException {@inheritDoc}
398 * @throws ClassCastException {@inheritDoc}
399 * @throws NullPointerException {@inheritDoc}
400 * @throws IllegalArgumentException {@inheritDoc}
401 */
402 public int drainTo(Collection<? super E> c, int maxElements) {
403 if (c == null)
404 throw new NullPointerException();
405 if (c == this)
406 throw new IllegalArgumentException();
407 if (maxElements <= 0)
408 return 0;
409 final ReentrantLock lock = this.lock;
410 lock.lock();
411 try {
412 int n = 0;
413 E e;
414 while (n < maxElements && (e = q.poll()) != null) {
415 c.add(e);
416 ++n;
417 }
418 return n;
419 } finally {
420 lock.unlock();
421 }
422 }
423
424 /**
425 * Atomically removes all of the elements from this queue.
426 * The queue will be empty after this call returns.
427 */
428 public void clear() {
429 final ReentrantLock lock = this.lock;
430 lock.lock();
431 try {
432 q.clear();
433 } finally {
434 lock.unlock();
435 }
436 }
437
438 /**
439 * Returns an array containing all of the elements in this queue; the
440 * runtime type of the returned array is that of the specified array.
441 * The returned array elements are in no particular order.
442 * If the queue fits in the specified array, it is returned therein.
443 * Otherwise, a new array is allocated with the runtime type of the
444 * specified array and the size of this queue.
445 *
446 * <p>If this queue fits in the specified array with room to spare
447 * (i.e., the array has more elements than this queue), the element in
448 * the array immediately following the end of the queue is set to
449 * <tt>null</tt>.
450 *
451 * <p>Like the {@link #toArray()} method, this method acts as bridge between
452 * array-based and collection-based APIs. Further, this method allows
453 * precise control over the runtime type of the output array, and may,
454 * under certain circumstances, be used to save allocation costs.
455 *
456 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
457 * The following code can be used to dump the queue into a newly
458 * allocated array of <tt>String</tt>:
459 *
460 * <pre>
461 * String[] y = x.toArray(new String[0]);</pre>
462 *
463 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
464 * <tt>toArray()</tt>.
465 *
466 * @param a the array into which the elements of the queue are to
467 * be stored, if it is big enough; otherwise, a new array of the
468 * same runtime type is allocated for this purpose
469 * @return an array containing all of the elements in this queue
470 * @throws ArrayStoreException if the runtime type of the specified array
471 * is not a supertype of the runtime type of every element in
472 * this queue
473 * @throws NullPointerException if the specified array is null
474 */
475 public <T> T[] toArray(T[] a) {
476 final ReentrantLock lock = this.lock;
477 lock.lock();
478 try {
479 return q.toArray(a);
480 } finally {
481 lock.unlock();
482 }
483 }
484
485 /**
486 * Returns an iterator over the elements in this queue. The
487 * iterator does not return the elements in any particular order.
488 * The returned <tt>Iterator</tt> is a "weakly consistent"
489 * iterator that will never throw {@link
490 * ConcurrentModificationException}, and guarantees to traverse
491 * elements as they existed upon construction of the iterator, and
492 * may (but is not guaranteed to) reflect any modifications
493 * subsequent to construction.
494 *
495 * @return an iterator over the elements in this queue
496 */
497 public Iterator<E> iterator() {
498 return new Itr(toArray());
499 }
500
501 /**
502 * Snapshot iterator that works off copy of underlying q array.
503 */
504 private class Itr implements Iterator<E> {
505 final Object[] array; // Array of all elements
506 int cursor; // index of next element to return;
507 int lastRet; // index of last element, or -1 if no such
508
509 Itr(Object[] array) {
510 lastRet = -1;
511 this.array = array;
512 }
513
514 public boolean hasNext() {
515 return cursor < array.length;
516 }
517
518 public E next() {
519 if (cursor >= array.length)
520 throw new NoSuchElementException();
521 lastRet = cursor;
522 return (E)array[cursor++];
523 }
524
525 public void remove() {
526 if (lastRet < 0)
527 throw new IllegalStateException();
528 Object x = array[lastRet];
529 lastRet = -1;
530 // Traverse underlying queue to find == element,
531 // not just a .equals element.
532 lock.lock();
533 try {
534 for (Iterator it = q.iterator(); it.hasNext(); ) {
535 if (it.next() == x) {
536 it.remove();
537 return;
538 }
539 }
540 } finally {
541 lock.unlock();
542 }
543 }
544 }
545
546 /**
547 * Saves the state to a stream (that is, serializes it). This
548 * merely wraps default serialization within lock. The
549 * serialization strategy for items is left to underlying
550 * Queue. Note that locking is not needed on deserialization, so
551 * readObject is not defined, just relying on default.
552 */
553 private void writeObject(java.io.ObjectOutputStream s)
554 throws java.io.IOException {
555 lock.lock();
556 try {
557 s.defaultWriteObject();
558 } finally {
559 lock.unlock();
560 }
561 }
562
563 }