ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.43
Committed: Wed May 18 01:41:16 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.42: +1 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on {@linkplain
19 * Comparable natural ordering} also does not permit insertion of
20 * non-comparable objects (doing so results in
21 * <tt>ClassCastException</tt>).
22 *
23 * <p>This class and its iterator implement all of the
24 * <em>optional</em> methods of the {@link Collection} and {@link
25 * Iterator} interfaces. The Iterator provided in method {@link
26 * #iterator()} is <em>not</em> guaranteed to traverse the elements of
27 * the PriorityBlockingQueue in any particular order. If you need
28 * ordered traversal, consider using
29 * <tt>Arrays.sort(pq.toArray())</tt>. Also, method <tt>drainTo</tt>
30 * can be used to <em>remove</em> some or all elements in priority
31 * order and place them in another collection.
32 *
33 * <p>Operations on this class make no guarantees about the ordering
34 * of elements with equal priority. If you need to enforce an
35 * ordering, you can define custom classes or comparators that use a
36 * secondary key to break ties in primary priority values. For
37 * example, here is a class that applies first-in-first-out
38 * tie-breaking to comparable elements. To use it, you would insert a
39 * <tt>new FIFOEntry(anEntry)</tt> instead of a plain entry object.
40 *
41 * <pre>
42 * class FIFOEntry&lt;E extends Comparable&lt;? super E&gt;&gt;
43 * implements Comparable&lt;FIFOEntry&lt;E&gt;&gt; {
44 * static AtomicLong seq = new AtomicLong();
45 * final long seqNum;
46 * final E entry;
47 * public FIFOEntry(E entry) {
48 * seqNum = seq.getAndIncrement();
49 * this.entry = entry;
50 * }
51 * public E getEntry() { return entry; }
52 * public int compareTo(FIFOEntry&lt;E&gt; other) {
53 * int res = entry.compareTo(other.entry);
54 * if (res == 0 &amp;&amp; other.entry != this.entry)
55 * res = (seqNum &lt; other.seqNum ? -1 : 1);
56 * return res;
57 * }
58 * }</pre>
59 *
60 * <p>This class is a member of the
61 * <a href="{@docRoot}/../guide/collections/index.html">
62 * Java Collections Framework</a>.
63 *
64 * @since 1.5
65 * @author Doug Lea
66 * @param <E> the type of elements held in this collection
67 */
68 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
69 implements BlockingQueue<E>, java.io.Serializable {
70 private static final long serialVersionUID = 5595510919245408276L;
71
72 private final PriorityQueue<E> q;
73 private final ReentrantLock lock = new ReentrantLock(true);
74 private final Condition notEmpty = lock.newCondition();
75
76 /**
77 * Creates a <tt>PriorityBlockingQueue</tt> with the default
78 * initial capacity (11) that orders its elements according to
79 * their {@linkplain Comparable natural ordering}.
80 */
81 public PriorityBlockingQueue() {
82 q = new PriorityQueue<E>();
83 }
84
85 /**
86 * Creates a <tt>PriorityBlockingQueue</tt> with the specified
87 * initial capacity that orders its elements according to their
88 * {@linkplain Comparable natural ordering}.
89 *
90 * @param initialCapacity the initial capacity for this priority queue
91 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
92 * than 1
93 */
94 public PriorityBlockingQueue(int initialCapacity) {
95 q = new PriorityQueue<E>(initialCapacity, null);
96 }
97
98 /**
99 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
100 * capacity that orders its elements according to the specified
101 * comparator.
102 *
103 * @param initialCapacity the initial capacity for this priority queue
104 * @param comparator the comparator used to order this priority queue.
105 * If <tt>null</tt> then the order depends on the elements' natural
106 * ordering.
107 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
108 * than 1
109 */
110 public PriorityBlockingQueue(int initialCapacity,
111 Comparator<? super E> comparator) {
112 q = new PriorityQueue<E>(initialCapacity, comparator);
113 }
114
115 /**
116 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
117 * in the specified collection. The priority queue has an initial
118 * capacity of 110% of the size of the specified collection. If
119 * the specified collection is a {@link SortedSet} or a {@link
120 * PriorityQueue}, this priority queue will be sorted according to
121 * the same comparator, or according to the natural ordering of its
122 * elements if the collection is sorted according to the natural
123 * ordering of its elements. Otherwise, this priority queue is
124 * ordered according to the natural ordering of its elements.
125 *
126 * @param c the collection whose elements are to be placed
127 * into this priority queue.
128 * @throws ClassCastException if elements of the specified collection
129 * cannot be compared to one another according to the priority
130 * queue's ordering.
131 * @throws NullPointerException if the specified collection or any
132 * of its elements are null
133 */
134 public PriorityBlockingQueue(Collection<? extends E> c) {
135 q = new PriorityQueue<E>(c);
136 }
137
138 /**
139 * Inserts the specified element into this priority queue.
140 *
141 * @param e the element to add
142 * @return <tt>true</tt> (as per the spec for {@link Collection#add})
143 * @throws ClassCastException if the specified element cannot be compared
144 * with elements currently in the priority queue according to the
145 * priority queue's ordering
146 * @throws NullPointerException if the specified element is null
147 */
148 public boolean add(E e) {
149 return offer(e);
150 }
151
152 /**
153 * Inserts the specified element into this priority queue.
154 *
155 * @param e the element to add
156 * @return <tt>true</tt>
157 * @throws ClassCastException if the specified element cannot be compared
158 * with elements currently in the priority queue according to the
159 * priority queue's ordering
160 * @throws NullPointerException if the specified element is null
161 */
162 public boolean offer(E e) {
163 if (e == null) throw new NullPointerException();
164 final ReentrantLock lock = this.lock;
165 lock.lock();
166 try {
167 boolean ok = q.offer(e);
168 assert ok;
169 notEmpty.signal();
170 return true;
171 } finally {
172 lock.unlock();
173 }
174 }
175
176 /**
177 * Inserts the specified element into this priority queue. As the queue is
178 * unbounded this method will never block.
179 *
180 * @param e the element to add
181 * @throws ClassCastException if the specified element cannot be compared
182 * with elements currently in the priority queue according to the
183 * priority queue's ordering
184 * @throws NullPointerException if the specified element is null
185 */
186 public void put(E e) {
187 offer(e); // never need to block
188 }
189
190 /**
191 * Inserts the specified element into this priority queue. As the queue is
192 * unbounded this method will never block.
193 *
194 * @param e the element to add
195 * @param timeout This parameter is ignored as the method never blocks
196 * @param unit This parameter is ignored as the method never blocks
197 * @return <tt>true</tt>
198 * @throws ClassCastException if the specified element cannot be compared
199 * with elements currently in the priority queue according to the
200 * priority queue's ordering
201 * @throws NullPointerException if the specified element is null
202 */
203 public boolean offer(E e, long timeout, TimeUnit unit) {
204 return offer(e); // never need to block
205 }
206
207 public E poll() {
208 final ReentrantLock lock = this.lock;
209 lock.lock();
210 try {
211 return q.poll();
212 } finally {
213 lock.unlock();
214 }
215 }
216
217 public E take() throws InterruptedException {
218 final ReentrantLock lock = this.lock;
219 lock.lockInterruptibly();
220 try {
221 try {
222 while (q.size() == 0)
223 notEmpty.await();
224 } catch (InterruptedException ie) {
225 notEmpty.signal(); // propagate to non-interrupted thread
226 throw ie;
227 }
228 E x = q.poll();
229 assert x != null;
230 return x;
231 } finally {
232 lock.unlock();
233 }
234 }
235
236 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
237 long nanos = unit.toNanos(timeout);
238 final ReentrantLock lock = this.lock;
239 lock.lockInterruptibly();
240 try {
241 for (;;) {
242 E x = q.poll();
243 if (x != null)
244 return x;
245 if (nanos <= 0)
246 return null;
247 try {
248 nanos = notEmpty.awaitNanos(nanos);
249 } catch (InterruptedException ie) {
250 notEmpty.signal(); // propagate to non-interrupted thread
251 throw ie;
252 }
253 }
254 } finally {
255 lock.unlock();
256 }
257 }
258
259 public E peek() {
260 final ReentrantLock lock = this.lock;
261 lock.lock();
262 try {
263 return q.peek();
264 } finally {
265 lock.unlock();
266 }
267 }
268
269 /**
270 * Returns the comparator used to order the elements in this queue,
271 * or <tt>null</tt> if this queue uses the {@linkplain Comparable
272 * natural ordering} of its elements.
273 *
274 * @return the comparator used to order the elements in this queue,
275 * or <tt>null</tt> if this queue uses the natural
276 * ordering of its elements.
277 */
278 public Comparator<? super E> comparator() {
279 return q.comparator();
280 }
281
282 public int size() {
283 final ReentrantLock lock = this.lock;
284 lock.lock();
285 try {
286 return q.size();
287 } finally {
288 lock.unlock();
289 }
290 }
291
292 /**
293 * Always returns <tt>Integer.MAX_VALUE</tt> because
294 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
295 * @return <tt>Integer.MAX_VALUE</tt>
296 */
297 public int remainingCapacity() {
298 return Integer.MAX_VALUE;
299 }
300
301 /**
302 * Removes a single instance of the specified element from this queue,
303 * if it is present. More formally, removes an element <tt>e</tt> such
304 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
305 * elements.
306 * Returns <tt>true</tt> if this queue contained the specified element
307 * (or equivalently, if this queue changed as a result of the call).
308 *
309 * @param o element to be removed from this queue, if present
310 * @return <tt>true</tt> if this queue changed as a result of the call
311 */
312 public boolean remove(Object o) {
313 final ReentrantLock lock = this.lock;
314 lock.lock();
315 try {
316 return q.remove(o);
317 } finally {
318 lock.unlock();
319 }
320 }
321
322 /**
323 * Returns <tt>true</tt> if this queue contains the specified element.
324 * More formally, returns <tt>true</tt> if and only if this queue contains
325 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
326 *
327 * @param o object to be checked for containment in this queue
328 * @return <tt>true</tt> if this queue contains the specified element
329 */
330 public boolean contains(Object o) {
331 final ReentrantLock lock = this.lock;
332 lock.lock();
333 try {
334 return q.contains(o);
335 } finally {
336 lock.unlock();
337 }
338 }
339
340 /**
341 * Returns an array containing all of the elements in this queue.
342 * The returned array elements are in no particular order.
343 *
344 * <p>The returned array will be "safe" in that no references to it are
345 * maintained by this queue. (In other words, this method must allocate
346 * a new array). The caller is thus free to modify the returned array.
347 *
348 * <p>This method acts as bridge between array-based and collection-based
349 * APIs.
350 *
351 * @return an array containing all of the elements in this queue
352 */
353 public Object[] toArray() {
354 final ReentrantLock lock = this.lock;
355 lock.lock();
356 try {
357 return q.toArray();
358 } finally {
359 lock.unlock();
360 }
361 }
362
363
364 public String toString() {
365 final ReentrantLock lock = this.lock;
366 lock.lock();
367 try {
368 return q.toString();
369 } finally {
370 lock.unlock();
371 }
372 }
373
374 /**
375 * @throws UnsupportedOperationException {@inheritDoc}
376 * @throws ClassCastException {@inheritDoc}
377 * @throws NullPointerException {@inheritDoc}
378 * @throws IllegalArgumentException {@inheritDoc}
379 */
380 public int drainTo(Collection<? super E> c) {
381 if (c == null)
382 throw new NullPointerException();
383 if (c == this)
384 throw new IllegalArgumentException();
385 final ReentrantLock lock = this.lock;
386 lock.lock();
387 try {
388 int n = 0;
389 E e;
390 while ( (e = q.poll()) != null) {
391 c.add(e);
392 ++n;
393 }
394 return n;
395 } finally {
396 lock.unlock();
397 }
398 }
399
400 /**
401 * @throws UnsupportedOperationException {@inheritDoc}
402 * @throws ClassCastException {@inheritDoc}
403 * @throws NullPointerException {@inheritDoc}
404 * @throws IllegalArgumentException {@inheritDoc}
405 */
406 public int drainTo(Collection<? super E> c, int maxElements) {
407 if (c == null)
408 throw new NullPointerException();
409 if (c == this)
410 throw new IllegalArgumentException();
411 if (maxElements <= 0)
412 return 0;
413 final ReentrantLock lock = this.lock;
414 lock.lock();
415 try {
416 int n = 0;
417 E e;
418 while (n < maxElements && (e = q.poll()) != null) {
419 c.add(e);
420 ++n;
421 }
422 return n;
423 } finally {
424 lock.unlock();
425 }
426 }
427
428 /**
429 * Atomically removes all of the elements from this queue.
430 * The queue will be empty after this call returns.
431 */
432 public void clear() {
433 final ReentrantLock lock = this.lock;
434 lock.lock();
435 try {
436 q.clear();
437 } finally {
438 lock.unlock();
439 }
440 }
441
442 /**
443 * Returns an array containing all of the elements in this queue; the
444 * runtime type of the returned array is that of the specified array.
445 * The returned array elements are in no particular order.
446 * If the queue fits in the specified array, it is returned therein.
447 * Otherwise, a new array is allocated with the runtime type of the
448 * specified array and the size of this queue.
449 *
450 * <p>If this queue fits in the specified array with room to spare
451 * (i.e., the array has more elements than this queue), the element in
452 * the array immediately following the end of the queue is set to
453 * <tt>null</tt>.
454 *
455 * <p>Like the {@link #toArray()} method, this method acts as bridge between
456 * array-based and collection-based APIs. Further, this method allows
457 * precise control over the runtime type of the output array, and may,
458 * under certain circumstances, be used to save allocation costs.
459 *
460 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
461 * The following code can be used to dump the queue into a newly
462 * allocated array of <tt>String</tt>:
463 *
464 * <pre>
465 * String[] y = x.toArray(new String[0]);</pre>
466 *
467 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
468 * <tt>toArray()</tt>.
469 *
470 * @param a the array into which the elements of the queue are to
471 * be stored, if it is big enough; otherwise, a new array of the
472 * same runtime type is allocated for this purpose
473 * @return an array containing all of the elements in this queue
474 * @throws ArrayStoreException if the runtime type of the specified array
475 * is not a supertype of the runtime type of every element in
476 * this queue
477 * @throws NullPointerException if the specified array is null
478 */
479 public <T> T[] toArray(T[] a) {
480 final ReentrantLock lock = this.lock;
481 lock.lock();
482 try {
483 return q.toArray(a);
484 } finally {
485 lock.unlock();
486 }
487 }
488
489 /**
490 * Returns an iterator over the elements in this queue. The
491 * iterator does not return the elements in any particular order.
492 * The returned iterator is a thread-safe "fast-fail" iterator
493 * that will throw {@link ConcurrentModificationException} upon
494 * detected interference.
495 *
496 * @return an iterator over the elements in this queue
497 */
498 public Iterator<E> iterator() {
499 final ReentrantLock lock = this.lock;
500 lock.lock();
501 try {
502 return new Itr(q.iterator());
503 } finally {
504 lock.unlock();
505 }
506 }
507
508 private class Itr<E> implements Iterator<E> {
509 private final Iterator<E> iter;
510 Itr(Iterator<E> i) {
511 iter = i;
512 }
513
514 public boolean hasNext() {
515 /*
516 * No sync -- we rely on underlying hasNext to be
517 * stateless, in which case we can return true by mistake
518 * only when next() will subsequently throw
519 * ConcurrentModificationException.
520 */
521 return iter.hasNext();
522 }
523
524 public E next() {
525 ReentrantLock lock = PriorityBlockingQueue.this.lock;
526 lock.lock();
527 try {
528 return iter.next();
529 } finally {
530 lock.unlock();
531 }
532 }
533
534 public void remove() {
535 ReentrantLock lock = PriorityBlockingQueue.this.lock;
536 lock.lock();
537 try {
538 iter.remove();
539 } finally {
540 lock.unlock();
541 }
542 }
543 }
544
545 /**
546 * Save the state to a stream (that is, serialize it). This
547 * merely wraps default serialization within lock. The
548 * serialization strategy for items is left to underlying
549 * Queue. Note that locking is not needed on deserialization, so
550 * readObject is not defined, just relying on default.
551 */
552 private void writeObject(java.io.ObjectOutputStream s)
553 throws java.io.IOException {
554 lock.lock();
555 try {
556 s.defaultWriteObject();
557 } finally {
558 lock.unlock();
559 }
560 }
561
562 }