ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.50
Committed: Wed Nov 23 05:33:03 2005 UTC (18 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.49: +2 -2 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19 * Comparable natural ordering} also does not permit insertion of
20 * non-comparable objects (doing so results in
21 * <tt>ClassCastException</tt>).
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces. The Iterator provided in method {@link
26 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27 * the PriorityBlockingQueue in any particular order. If you need
28 * ordered traversal, consider using
29 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30 * can be used to <em>remove</em> some or all elements in priority
31 * order and place them in another collection.
32 *
33 * <p>Operations on this class make no guarantees about the ordering
34 * of elements with equal priority. If you need to enforce an
35 * ordering, you can define custom classes or comparators that use a
36 * secondary key to break ties in primary priority values. For
37 * example, here is a class that applies first-in-first-out
38 * tie-breaking to comparable elements. To use it, you would insert a
39 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40 *
41 * <pre>
42 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
43 * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
44 * final static AtomicLong seq = new AtomicLong();
45 * final long seqNum;
46 * final E entry;
47 * public FIFOEntry(E entry) {
48 * seqNum = seq.getAndIncrement();
49 * this.entry = entry;
50 * }
51 * public E getEntry() { return entry; }
52 * public int compareTo(FIFOEntry&lt;E&gt; other) {
53 * int res = entry.compareTo(other.entry);
54 * if (res == 0 &amp;&amp; other.entry != this.entry)
55 * res = (seqNum &lt; other.seqNum ? -1 : 1);
56 * return res;
57 * }
58 * }</pre>
59 *
60 * <p>This class is a member of the
61 * <a href="{@docRoot}/../guide/collections/index.html">
62 * Java Collections Framework</a>.
63 *
64 * @since 1.5
65 * @author Doug Lea
66 * @param <E> the type of elements held in this collection
67 */
68 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 implements BlockingQueue<E>, java.io.Serializable {
70 private static final long serialVersionUID = 5595510919245408276L;
71
72 private final PriorityQueue<E> q;
73 private final ReentrantLock lock = new ReentrantLock(true);
74 private final Condition notEmpty = lock.newCondition();
75
76 /**
77 * Creates a <tt>PriorityBlockingQueue</tt> with the default
78 * initial capacity (11) that orders its elements according to
79 * their {@linkplain Comparable natural ordering}.
80 */
81 public PriorityBlockingQueue() {
82 q = new PriorityQueue<E>();
83 }
84
85 /**
86 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87 * initial capacity that orders its elements according to their
88 * {@linkplain Comparable natural ordering}.
89 *
90 * @param initialCapacity the initial capacity for this priority queue
91 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92 * than 1
93 */
94 public PriorityBlockingQueue(int initialCapacity) {
95 q = new PriorityQueue<E>(initialCapacity, null);
96 }
97
98 /**
99 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100 * capacity that orders its elements according to the specified
101 * comparator.
102 *
103 * @param initialCapacity the initial capacity for this priority queue
104 * @param comparator the comparator used to order this priority queue.
105 * If <tt>null</tt> then the order depends on the elements' natural
106 * ordering.
107 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108 * than 1
109 */
110 public PriorityBlockingQueue(int initialCapacity,
111 Comparator<? super E> comparator) {
112 q = new PriorityQueue<E>(initialCapacity, comparator);
113 }
114
115 /**
116 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117 * in the specified collection. The priority queue has an initial
118 * capacity of 110% of the size of the specified collection. If
119 * the specified collection is a {@link SortedSet} or a {@link
120 * PriorityQueue}, this priority queue will be sorted according to
121 * the same comparator, or according to the natural ordering of its
122 * elements if the collection is sorted according to the natural
123 * ordering of its elements. Otherwise, this priority queue is
124 * ordered according to the natural ordering of its elements.
125 *
126 * @param c the collection whose elements are to be placed
127 * into this priority queue.
128 * @throws ClassCastException if elements of the specified collection
129 * cannot be compared to one another according to the priority
130 * queue's ordering.
131 * @throws NullPointerException if the specified collection or any
132 * of its elements are null
133 */
134 public PriorityBlockingQueue(Collection<? extends E> c) {
135 q = new PriorityQueue<E>(c);
136 }
137
138 /**
139 * Inserts the specified element into this priority queue.
140 *
141 * @param e the element to add
142 * @return <tt>true</tt> (as specified by {@link Collection#add})
143 * @throws ClassCastException if the specified element cannot be compared
144 * with elements currently in the priority queue according to the
145 * priority queue's ordering
146 * @throws NullPointerException if the specified element is null
147 */
148 public boolean add(E e) {
149 return offer(e);
150 }
151
152 /**
153 * Inserts the specified element into this priority queue.
154 *
155 * @param e the element to add
156 * @return <tt>true</tt> (as specified by {@link Queue#offer})
157 * @throws ClassCastException if the specified element cannot be compared
158 * with elements currently in the priority queue according to the
159 * priority queue's ordering
160 * @throws NullPointerException if the specified element is null
161 */
162 public boolean offer(E e) {
163 final ReentrantLock lock = this.lock;
164 lock.lock();
165 try {
166 boolean ok = q.offer(e);
167 assert ok;
168 notEmpty.signal();
169 return true;
170 } finally {
171 lock.unlock();
172 }
173 }
174
175 /**
176 * Inserts the specified element into this priority queue. As the queue is
177 * unbounded this method will never block.
178 *
179 * @param e the element to add
180 * @throws ClassCastException if the specified element cannot be compared
181 * with elements currently in the priority queue according to the
182 * priority queue's ordering
183 * @throws NullPointerException if the specified element is null
184 */
185 public void put(E e) {
186 offer(e); // never need to block
187 }
188
189 /**
190 * Inserts the specified element into this priority queue. As the queue is
191 * unbounded this method will never block.
192 *
193 * @param e the element to add
194 * @param timeout This parameter is ignored as the method never blocks
195 * @param unit This parameter is ignored as the method never blocks
196 * @return <tt>true</tt>
197 * @throws ClassCastException if the specified element cannot be compared
198 * with elements currently in the priority queue according to the
199 * priority queue's ordering
200 * @throws NullPointerException if the specified element is null
201 */
202 public boolean offer(E e, long timeout, TimeUnit unit) {
203 return offer(e); // never need to block
204 }
205
206 public E poll() {
207 final ReentrantLock lock = this.lock;
208 lock.lock();
209 try {
210 return q.poll();
211 } finally {
212 lock.unlock();
213 }
214 }
215
216 public E take() throws InterruptedException {
217 final ReentrantLock lock = this.lock;
218 lock.lockInterruptibly();
219 try {
220 try {
221 while (q.size() == 0)
222 notEmpty.await();
223 } catch (InterruptedException ie) {
224 notEmpty.signal(); // propagate to non-interrupted thread
225 throw ie;
226 }
227 E x = q.poll();
228 assert x != null;
229 return x;
230 } finally {
231 lock.unlock();
232 }
233 }
234
235 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
236 long nanos = unit.toNanos(timeout);
237 final ReentrantLock lock = this.lock;
238 lock.lockInterruptibly();
239 try {
240 for (;;) {
241 E x = q.poll();
242 if (x != null)
243 return x;
244 if (nanos <= 0)
245 return null;
246 try {
247 nanos = notEmpty.awaitNanos(nanos);
248 } catch (InterruptedException ie) {
249 notEmpty.signal(); // propagate to non-interrupted thread
250 throw ie;
251 }
252 }
253 } finally {
254 lock.unlock();
255 }
256 }
257
258 public E peek() {
259 final ReentrantLock lock = this.lock;
260 lock.lock();
261 try {
262 return q.peek();
263 } finally {
264 lock.unlock();
265 }
266 }
267
268 /**
269 * Returns the comparator used to order the elements in this queue,
270 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
271 * natural ordering} of its elements.
272 *
273 * @return the comparator used to order the elements in this queue,
274 * or <tt>null</tt> if this queue uses the natural
275 * ordering of its elements.
276 */
277 public Comparator<? super E> comparator() {
278 return q.comparator();
279 }
280
281 public int size() {
282 final ReentrantLock lock = this.lock;
283 lock.lock();
284 try {
285 return q.size();
286 } finally {
287 lock.unlock();
288 }
289 }
290
291 /**
292 * Always returns <tt>Integer.MAX_VALUE</tt> because
293 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
294 * @return <tt>Integer.MAX_VALUE</tt>
295 */
296 public int remainingCapacity() {
297 return Integer.MAX_VALUE;
298 }
299
300 /**
301 * Removes a single instance of the specified element from this queue,
302 * if it is present. More formally, removes an element <tt>e</tt> such
303 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
304 * elements.
305 * Returns <tt>true</tt> if this queue contained the specified element
306 * (or equivalently, if this queue changed as a result of the call).
307 *
308 * @param o element to be removed from this queue, if present
309 * @return <tt>true</tt> if this queue changed as a result of the call
310 */
311 public boolean remove(Object o) {
312 final ReentrantLock lock = this.lock;
313 lock.lock();
314 try {
315 return q.remove(o);
316 } finally {
317 lock.unlock();
318 }
319 }
320
321 /**
322 * Returns <tt>true</tt> if this queue contains the specified element.
323 * More formally, returns <tt>true</tt> if and only if this queue contains
324 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
325 *
326 * @param o object to be checked for containment in this queue
327 * @return <tt>true</tt> if this queue contains the specified element
328 */
329 public boolean contains(Object o) {
330 final ReentrantLock lock = this.lock;
331 lock.lock();
332 try {
333 return q.contains(o);
334 } finally {
335 lock.unlock();
336 }
337 }
338
339 /**
340 * Returns an array containing all of the elements in this queue.
341 * The returned array elements are in no particular order.
342 *
343 * <p>The returned array will be "safe" in that no references to it are
344 * maintained by this queue. (In other words, this method must allocate
345 * a new array). The caller is thus free to modify the returned array.
346 *
347 * <p>This method acts as bridge between array-based and collection-based
348 * APIs.
349 *
350 * @return an array containing all of the elements in this queue
351 */
352 public Object[] toArray() {
353 final ReentrantLock lock = this.lock;
354 lock.lock();
355 try {
356 return q.toArray();
357 } finally {
358 lock.unlock();
359 }
360 }
361
362 public String toString() {
363 final ReentrantLock lock = this.lock;
364 lock.lock();
365 try {
366 return q.toString();
367 } finally {
368 lock.unlock();
369 }
370 }
371
372 /**
373 * @throws UnsupportedOperationException {@inheritDoc}
374 * @throws ClassCastException {@inheritDoc}
375 * @throws NullPointerException {@inheritDoc}
376 * @throws IllegalArgumentException {@inheritDoc}
377 */
378 public int drainTo(Collection<? super E> c) {
379 if (c == null)
380 throw new NullPointerException();
381 if (c == this)
382 throw new IllegalArgumentException();
383 final ReentrantLock lock = this.lock;
384 lock.lock();
385 try {
386 int n = 0;
387 E e;
388 while ( (e = q.poll()) != null) {
389 c.add(e);
390 ++n;
391 }
392 return n;
393 } finally {
394 lock.unlock();
395 }
396 }
397
398 /**
399 * @throws UnsupportedOperationException {@inheritDoc}
400 * @throws ClassCastException {@inheritDoc}
401 * @throws NullPointerException {@inheritDoc}
402 * @throws IllegalArgumentException {@inheritDoc}
403 */
404 public int drainTo(Collection<? super E> c, int maxElements) {
405 if (c == null)
406 throw new NullPointerException();
407 if (c == this)
408 throw new IllegalArgumentException();
409 if (maxElements <= 0)
410 return 0;
411 final ReentrantLock lock = this.lock;
412 lock.lock();
413 try {
414 int n = 0;
415 E e;
416 while (n < maxElements && (e = q.poll()) != null) {
417 c.add(e);
418 ++n;
419 }
420 return n;
421 } finally {
422 lock.unlock();
423 }
424 }
425
426 /**
427 * Atomically removes all of the elements from this queue.
428 * The queue will be empty after this call returns.
429 */
430 public void clear() {
431 final ReentrantLock lock = this.lock;
432 lock.lock();
433 try {
434 q.clear();
435 } finally {
436 lock.unlock();
437 }
438 }
439
440 /**
441 * Returns an array containing all of the elements in this queue; the
442 * runtime type of the returned array is that of the specified array.
443 * The returned array elements are in no particular order.
444 * If the queue fits in the specified array, it is returned therein.
445 * Otherwise, a new array is allocated with the runtime type of the
446 * specified array and the size of this queue.
447 *
448 * <p>If this queue fits in the specified array with room to spare
449 * (i.e., the array has more elements than this queue), the element in
450 * the array immediately following the end of the queue is set to
451 * <tt>null</tt>.
452 *
453 * <p>Like the {@link #toArray()} method, this method acts as bridge between
454 * array-based and collection-based APIs. Further, this method allows
455 * precise control over the runtime type of the output array, and may,
456 * under certain circumstances, be used to save allocation costs.
457 *
458 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
459 * The following code can be used to dump the queue into a newly
460 * allocated array of <tt>String</tt>:
461 *
462 * <pre>
463 * String[] y = x.toArray(new String[0]);</pre>
464 *
465 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
466 * <tt>toArray()</tt>.
467 *
468 * @param a the array into which the elements of the queue are to
469 * be stored, if it is big enough; otherwise, a new array of the
470 * same runtime type is allocated for this purpose
471 * @return an array containing all of the elements in this queue
472 * @throws ArrayStoreException if the runtime type of the specified array
473 * is not a supertype of the runtime type of every element in
474 * this queue
475 * @throws NullPointerException if the specified array is null
476 */
477 public <T> T[] toArray(T[] a) {
478 final ReentrantLock lock = this.lock;
479 lock.lock();
480 try {
481 return q.toArray(a);
482 } finally {
483 lock.unlock();
484 }
485 }
486
487 /**
488 * Returns an iterator over the elements in this queue. The
489 * iterator does not return the elements in any particular order.
490 *
491 * @return an iterator over the elements in this queue
492 */
493 public Iterator<E> iterator() {
494 return new Itr<E>(toArray());
495 }
496
497 /**
498 * Snapshot iterator that works off copy of underlying q array.
499 */
500 private class Itr<E> implements Iterator<E> {
501 final Object[] array; // Array of all elements
502 int cursor; // index of next element to return;
503 int lastRet; // index of last element, or -1 if no such
504
505 Itr(Object[] array) {
506 lastRet = -1;
507 this.array = array;
508 }
509
510 public boolean hasNext() {
511 return cursor < array.length;
512 }
513
514 public E next() {
515 if (cursor >= array.length)
516 throw new NoSuchElementException();
517 lastRet = cursor;
518 return (E)array[cursor++];
519 }
520
521 public void remove() {
522 if (lastRet < 0)
523 throw new IllegalStateException();
524 Object x = array[lastRet];
525 lastRet = -1;
526 // Traverse underlying queue to find == element,
527 // not just a .equals element.
528 lock.lock();
529 try {
530 for (Iterator it = q.iterator(); it.hasNext(); ) {
531 if (it.next() == x) {
532 it.remove();
533 return;
534 }
535 }
536 } finally {
537 lock.unlock();
538 }
539 }
540 }
541
542 /**
543 * Save the state to a stream (that is, serialize it). This
544 * merely wraps default serialization within lock. The
545 * serialization strategy for items is left to underlying
546 * Queue. Note that locking is not needed on deserialization, so
547 * readObject is not defined, just relying on default.
548 */
549 private void writeObject(java.io.ObjectOutputStream s)
550 throws java.io.IOException {
551 lock.lock();
552 try {
553 s.defaultWriteObject();
554 } finally {
555 lock.unlock();
556 }
557 }
558
559 }