ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.37
Committed: Thu May 27 11:06:11 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.36: +9 -1 lines
Log Message:
Override javadoc specs when overriding AbstractQueue implementations
Clarify atomicity in BlockingQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on natural
19 * ordering also does not permit insertion of non-comparable objects
20 * (doing so results in <tt>ClassCastException</tt>).
21 *
22 * <p>This class implements all of the <em>optional</em> methods
23 * of the {@link Collection} and {@link Iterator} interfaces.
24 * <p>The Iterator provided in method {@link #iterator()} is
25 * <em>not</em> guaranteed to traverse the elements of the
26 * PriorityBlockingQueue in any particular order. If you need ordered
27 * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28 *
29 * <p>This class and its iterator implement all of the
30 * <em>optional</em> methods of the {@link Collection} and {@link
31 * Iterator} interfaces.
32 *
33 * <p>This class is a member of the
34 * <a href="{@docRoot}/../guide/collections/index.html">
35 * Java Collections Framework</a>.
36 *
37 * @since 1.5
38 * @author Doug Lea
39 * @param <E> the type of elements held in this collection
40 */
41 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
42 implements BlockingQueue<E>, java.io.Serializable {
43 private static final long serialVersionUID = 5595510919245408276L;
44
45 private final PriorityQueue<E> q;
46 private final ReentrantLock lock = new ReentrantLock(true);
47 private final Condition notEmpty = lock.newCondition();
48
49 /**
50 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
51 * capacity
52 * (11) that orders its elements according to their natural
53 * ordering (using <tt>Comparable</tt>).
54 */
55 public PriorityBlockingQueue() {
56 q = new PriorityQueue<E>();
57 }
58
59 /**
60 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
61 * capacity
62 * that orders its elements according to their natural ordering
63 * (using <tt>Comparable</tt>).
64 *
65 * @param initialCapacity the initial capacity for this priority queue.
66 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
67 * than 1
68 */
69 public PriorityBlockingQueue(int initialCapacity) {
70 q = new PriorityQueue<E>(initialCapacity, null);
71 }
72
73 /**
74 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
75 * capacity
76 * that orders its elements according to the specified comparator.
77 *
78 * @param initialCapacity the initial capacity for this priority queue.
79 * @param comparator the comparator used to order this priority queue.
80 * If <tt>null</tt> then the order depends on the elements' natural
81 * ordering.
82 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
83 * than 1
84 */
85 public PriorityBlockingQueue(int initialCapacity,
86 Comparator<? super E> comparator) {
87 q = new PriorityQueue<E>(initialCapacity, comparator);
88 }
89
90 /**
91 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
92 * in the specified collection. The priority queue has an initial
93 * capacity of 110% of the size of the specified collection. If
94 * the specified collection is a {@link SortedSet} or a {@link
95 * PriorityQueue}, this priority queue will be sorted according to
96 * the same comparator, or according to its elements' natural
97 * order if the collection is sorted according to its elements'
98 * natural order. Otherwise, this priority queue is ordered
99 * according to its elements' natural order.
100 *
101 * @param c the collection whose elements are to be placed
102 * into this priority queue.
103 * @throws ClassCastException if elements of the specified collection
104 * cannot be compared to one another according to the priority
105 * queue's ordering.
106 * @throws NullPointerException if <tt>c</tt> or any element within it
107 * is <tt>null</tt>
108 */
109 public PriorityBlockingQueue(Collection<? extends E> c) {
110 q = new PriorityQueue<E>(c);
111 }
112
113
114 // these first few override just to update doc comments
115
116 /**
117 * Adds the specified element to this queue.
118 * @param o the element to add
119 * @return <tt>true</tt> (as per the general contract of
120 * <tt>Collection.add</tt>).
121 *
122 * @throws NullPointerException if the specified element is <tt>null</tt>.
123 * @throws ClassCastException if the specified element cannot be compared
124 * with elements currently in the priority queue according
125 * to the priority queue's ordering.
126 */
127 public boolean add(E o) {
128 return super.add(o);
129 }
130
131 /**
132 * Returns the comparator used to order this collection, or <tt>null</tt>
133 * if this collection is sorted according to its elements natural ordering
134 * (using <tt>Comparable</tt>).
135 *
136 * @return the comparator used to order this collection, or <tt>null</tt>
137 * if this collection is sorted according to its elements natural ordering.
138 */
139 public Comparator<? super E> comparator() {
140 return q.comparator();
141 }
142
143 /**
144 * Inserts the specified element into this priority queue.
145 *
146 * @param o the element to add
147 * @return <tt>true</tt>
148 * @throws ClassCastException if the specified element cannot be compared
149 * with elements currently in the priority queue according
150 * to the priority queue's ordering.
151 * @throws NullPointerException if the specified element is <tt>null</tt>.
152 */
153 public boolean offer(E o) {
154 if (o == null) throw new NullPointerException();
155 final ReentrantLock lock = this.lock;
156 lock.lock();
157 try {
158 boolean ok = q.offer(o);
159 assert ok;
160 notEmpty.signal();
161 return true;
162 } finally {
163 lock.unlock();
164 }
165 }
166
167 /**
168 * Adds the specified element to this priority queue. As the queue is
169 * unbounded this method will never block.
170 * @param o the element to add
171 * @throws ClassCastException if the element cannot be compared
172 * with elements currently in the priority queue according
173 * to the priority queue's ordering.
174 * @throws NullPointerException if the specified element is <tt>null</tt>.
175 */
176 public void put(E o) {
177 offer(o); // never need to block
178 }
179
180 /**
181 * Inserts the specified element into this priority queue. As the queue is
182 * unbounded this method will never block.
183 * @param o the element to add
184 * @param timeout This parameter is ignored as the method never blocks
185 * @param unit This parameter is ignored as the method never blocks
186 * @return <tt>true</tt>
187 * @throws ClassCastException if the element cannot be compared
188 * with elements currently in the priority queue according
189 * to the priority queue's ordering.
190 * @throws NullPointerException if the specified element is <tt>null</tt>.
191 */
192 public boolean offer(E o, long timeout, TimeUnit unit) {
193 return offer(o); // never need to block
194 }
195
196 public E take() throws InterruptedException {
197 final ReentrantLock lock = this.lock;
198 lock.lockInterruptibly();
199 try {
200 try {
201 while (q.size() == 0)
202 notEmpty.await();
203 } catch (InterruptedException ie) {
204 notEmpty.signal(); // propagate to non-interrupted thread
205 throw ie;
206 }
207 E x = q.poll();
208 assert x != null;
209 return x;
210 } finally {
211 lock.unlock();
212 }
213 }
214
215
216 public E poll() {
217 final ReentrantLock lock = this.lock;
218 lock.lock();
219 try {
220 return q.poll();
221 } finally {
222 lock.unlock();
223 }
224 }
225
226 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
227 long nanos = unit.toNanos(timeout);
228 final ReentrantLock lock = this.lock;
229 lock.lockInterruptibly();
230 try {
231 for (;;) {
232 E x = q.poll();
233 if (x != null)
234 return x;
235 if (nanos <= 0)
236 return null;
237 try {
238 nanos = notEmpty.awaitNanos(nanos);
239 } catch (InterruptedException ie) {
240 notEmpty.signal(); // propagate to non-interrupted thread
241 throw ie;
242 }
243 }
244 } finally {
245 lock.unlock();
246 }
247 }
248
249 public E peek() {
250 final ReentrantLock lock = this.lock;
251 lock.lock();
252 try {
253 return q.peek();
254 } finally {
255 lock.unlock();
256 }
257 }
258
259 public int size() {
260 final ReentrantLock lock = this.lock;
261 lock.lock();
262 try {
263 return q.size();
264 } finally {
265 lock.unlock();
266 }
267 }
268
269 /**
270 * Always returns <tt>Integer.MAX_VALUE</tt> because
271 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
272 * @return <tt>Integer.MAX_VALUE</tt>
273 */
274 public int remainingCapacity() {
275 return Integer.MAX_VALUE;
276 }
277
278 /**
279 * Removes a single instance of the specified element from this
280 * collection, if it is present.
281 */
282 public boolean remove(Object o) {
283 final ReentrantLock lock = this.lock;
284 lock.lock();
285 try {
286 return q.remove(o);
287 } finally {
288 lock.unlock();
289 }
290 }
291
292 public boolean contains(Object o) {
293 final ReentrantLock lock = this.lock;
294 lock.lock();
295 try {
296 return q.contains(o);
297 } finally {
298 lock.unlock();
299 }
300 }
301
302 public Object[] toArray() {
303 final ReentrantLock lock = this.lock;
304 lock.lock();
305 try {
306 return q.toArray();
307 } finally {
308 lock.unlock();
309 }
310 }
311
312
313 public String toString() {
314 final ReentrantLock lock = this.lock;
315 lock.lock();
316 try {
317 return q.toString();
318 } finally {
319 lock.unlock();
320 }
321 }
322
323 public int drainTo(Collection<? super E> c) {
324 if (c == null)
325 throw new NullPointerException();
326 if (c == this)
327 throw new IllegalArgumentException();
328 final ReentrantLock lock = this.lock;
329 lock.lock();
330 try {
331 int n = 0;
332 E e;
333 while ( (e = q.poll()) != null) {
334 c.add(e);
335 ++n;
336 }
337 return n;
338 } finally {
339 lock.unlock();
340 }
341 }
342
343 public int drainTo(Collection<? super E> c, int maxElements) {
344 if (c == null)
345 throw new NullPointerException();
346 if (c == this)
347 throw new IllegalArgumentException();
348 if (maxElements <= 0)
349 return 0;
350 final ReentrantLock lock = this.lock;
351 lock.lock();
352 try {
353 int n = 0;
354 E e;
355 while (n < maxElements && (e = q.poll()) != null) {
356 c.add(e);
357 ++n;
358 }
359 return n;
360 } finally {
361 lock.unlock();
362 }
363 }
364
365 /**
366 * Atomically removes all of the elements from this queue.
367 * The queue will be empty after this call returns.
368 */
369 public void clear() {
370 final ReentrantLock lock = this.lock;
371 lock.lock();
372 try {
373 q.clear();
374 } finally {
375 lock.unlock();
376 }
377 }
378
379 public <T> T[] toArray(T[] a) {
380 final ReentrantLock lock = this.lock;
381 lock.lock();
382 try {
383 return q.toArray(a);
384 } finally {
385 lock.unlock();
386 }
387 }
388
389 /**
390 * Returns an iterator over the elements in this queue. The
391 * iterator does not return the elements in any particular order.
392 * The returned iterator is a thread-safe "fast-fail" iterator
393 * that will throw {@link
394 * java.util.ConcurrentModificationException} upon detected
395 * interference.
396 *
397 * @return an iterator over the elements in this queue.
398 */
399 public Iterator<E> iterator() {
400 final ReentrantLock lock = this.lock;
401 lock.lock();
402 try {
403 return new Itr(q.iterator());
404 } finally {
405 lock.unlock();
406 }
407 }
408
409 private class Itr<E> implements Iterator<E> {
410 private final Iterator<E> iter;
411 Itr(Iterator<E> i) {
412 iter = i;
413 }
414
415 public boolean hasNext() {
416 /*
417 * No sync -- we rely on underlying hasNext to be
418 * stateless, in which case we can return true by mistake
419 * only when next() will subsequently throw
420 * ConcurrentModificationException.
421 */
422 return iter.hasNext();
423 }
424
425 public E next() {
426 ReentrantLock lock = PriorityBlockingQueue.this.lock;
427 lock.lock();
428 try {
429 return iter.next();
430 } finally {
431 lock.unlock();
432 }
433 }
434
435 public void remove() {
436 ReentrantLock lock = PriorityBlockingQueue.this.lock;
437 lock.lock();
438 try {
439 iter.remove();
440 } finally {
441 lock.unlock();
442 }
443 }
444 }
445
446 /**
447 * Save the state to a stream (that is, serialize it). This
448 * merely wraps default serialization within lock. The
449 * serialization strategy for items is left to underlying
450 * Queue. Note that locking is not needed on deserialization, so
451 * readObject is not defined, just relying on default.
452 */
453 private void writeObject(java.io.ObjectOutputStream s)
454 throws java.io.IOException {
455 lock.lock();
456 try {
457 s.defaultWriteObject();
458 } finally {
459 lock.unlock();
460 }
461 }
462
463 }