ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.34
Committed: Tue Jan 20 04:35:02 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.33: +1 -0 lines
Log Message:
javadoc lint; Thread.interrupt shouldn't throw exception if thread dead

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on natural
19 * ordering also does not permit insertion of non-comparable objects
20 * (doing so results in <tt>ClassCastException</tt>).
21 *
22 * <p>This class implements all of the <em>optional</em> methods
23 * of the {@link Collection} and {@link Iterator} interfaces.
24 * <p>The Iterator provided in method {@link #iterator()} is
25 * <em>not</em> guaranteed to traverse the elements of the
26 * PriorityBlockingQueue in any particular order. If you need ordered
27 * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 * @param <E> the type of elements held in this collection
32 */
33 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35 private static final long serialVersionUID = 5595510919245408276L;
36
37 private final PriorityQueue<E> q;
38 private final ReentrantLock lock = new ReentrantLock(true);
39 private final Condition notEmpty = lock.newCondition();
40
41 /**
42 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
43 * capacity
44 * (11) that orders its elements according to their natural
45 * ordering (using <tt>Comparable</tt>).
46 */
47 public PriorityBlockingQueue() {
48 q = new PriorityQueue<E>();
49 }
50
51 /**
52 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
53 * capacity
54 * that orders its elements according to their natural ordering
55 * (using <tt>Comparable</tt>).
56 *
57 * @param initialCapacity the initial capacity for this priority queue.
58 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
59 * than 1
60 */
61 public PriorityBlockingQueue(int initialCapacity) {
62 q = new PriorityQueue<E>(initialCapacity, null);
63 }
64
65 /**
66 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
67 * capacity
68 * that orders its elements according to the specified comparator.
69 *
70 * @param initialCapacity the initial capacity for this priority queue.
71 * @param comparator the comparator used to order this priority queue.
72 * If <tt>null</tt> then the order depends on the elements' natural
73 * ordering.
74 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
75 * than 1
76 */
77 public PriorityBlockingQueue(int initialCapacity,
78 Comparator<? super E> comparator) {
79 q = new PriorityQueue<E>(initialCapacity, comparator);
80 }
81
82 /**
83 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
84 * in the specified collection. The priority queue has an initial
85 * capacity of 110% of the size of the specified collection. If
86 * the specified collection is a {@link SortedSet} or a {@link
87 * PriorityQueue}, this priority queue will be sorted according to
88 * the same comparator, or according to its elements' natural
89 * order if the collection is sorted according to its elements'
90 * natural order. Otherwise, this priority queue is ordered
91 * according to its elements' natural order.
92 *
93 * @param c the collection whose elements are to be placed
94 * into this priority queue.
95 * @throws ClassCastException if elements of the specified collection
96 * cannot be compared to one another according to the priority
97 * queue's ordering.
98 * @throws NullPointerException if <tt>c</tt> or any element within it
99 * is <tt>null</tt>
100 */
101 public PriorityBlockingQueue(Collection<? extends E> c) {
102 q = new PriorityQueue<E>(c);
103 }
104
105
106 // these first few override just to update doc comments
107
108 /**
109 * Adds the specified element to this queue.
110 * @param o the element to add
111 * @return <tt>true</tt> (as per the general contract of
112 * <tt>Collection.add</tt>).
113 *
114 * @throws NullPointerException if the specified element is <tt>null</tt>.
115 * @throws ClassCastException if the specified element cannot be compared
116 * with elements currently in the priority queue according
117 * to the priority queue's ordering.
118 */
119 public boolean add(E o) {
120 return super.add(o);
121 }
122
123 /**
124 * Returns the comparator used to order this collection, or <tt>null</tt>
125 * if this collection is sorted according to its elements natural ordering
126 * (using <tt>Comparable</tt>).
127 *
128 * @return the comparator used to order this collection, or <tt>null</tt>
129 * if this collection is sorted according to its elements natural ordering.
130 */
131 public Comparator comparator() {
132 return q.comparator();
133 }
134
135 /**
136 * Inserts the specified element into this priority queue.
137 *
138 * @param o the element to add
139 * @return <tt>true</tt>
140 * @throws ClassCastException if the specified element cannot be compared
141 * with elements currently in the priority queue according
142 * to the priority queue's ordering.
143 * @throws NullPointerException if the specified element is <tt>null</tt>.
144 */
145 public boolean offer(E o) {
146 if (o == null) throw new NullPointerException();
147 final ReentrantLock lock = this.lock;
148 lock.lock();
149 try {
150 boolean ok = q.offer(o);
151 assert ok;
152 notEmpty.signal();
153 return true;
154 } finally {
155 lock.unlock();
156 }
157 }
158
159 /**
160 * Adds the specified element to this priority queue. As the queue is
161 * unbounded this method will never block.
162 * @param o the element to add
163 * @throws ClassCastException if the element cannot be compared
164 * with elements currently in the priority queue according
165 * to the priority queue's ordering.
166 * @throws NullPointerException if the specified element is <tt>null</tt>.
167 */
168 public void put(E o) {
169 offer(o); // never need to block
170 }
171
172 /**
173 * Inserts the specified element into this priority queue. As the queue is
174 * unbounded this method will never block.
175 * @param o the element to add
176 * @param timeout This parameter is ignored as the method never blocks
177 * @param unit This parameter is ignored as the method never blocks
178 * @return <tt>true</tt>
179 * @throws ClassCastException if the element cannot be compared
180 * with elements currently in the priority queue according
181 * to the priority queue's ordering.
182 * @throws NullPointerException if the specified element is <tt>null</tt>.
183 */
184 public boolean offer(E o, long timeout, TimeUnit unit) {
185 return offer(o); // never need to block
186 }
187
188 public E take() throws InterruptedException {
189 final ReentrantLock lock = this.lock;
190 lock.lockInterruptibly();
191 try {
192 try {
193 while (q.size() == 0)
194 notEmpty.await();
195 } catch (InterruptedException ie) {
196 notEmpty.signal(); // propagate to non-interrupted thread
197 throw ie;
198 }
199 E x = q.poll();
200 assert x != null;
201 return x;
202 } finally {
203 lock.unlock();
204 }
205 }
206
207
208 public E poll() {
209 final ReentrantLock lock = this.lock;
210 lock.lock();
211 try {
212 return q.poll();
213 } finally {
214 lock.unlock();
215 }
216 }
217
218 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
219 long nanos = unit.toNanos(timeout);
220 final ReentrantLock lock = this.lock;
221 lock.lockInterruptibly();
222 try {
223 for (;;) {
224 E x = q.poll();
225 if (x != null)
226 return x;
227 if (nanos <= 0)
228 return null;
229 try {
230 nanos = notEmpty.awaitNanos(nanos);
231 } catch (InterruptedException ie) {
232 notEmpty.signal(); // propagate to non-interrupted thread
233 throw ie;
234 }
235 }
236 } finally {
237 lock.unlock();
238 }
239 }
240
241 public E peek() {
242 final ReentrantLock lock = this.lock;
243 lock.lock();
244 try {
245 return q.peek();
246 } finally {
247 lock.unlock();
248 }
249 }
250
251 public int size() {
252 final ReentrantLock lock = this.lock;
253 lock.lock();
254 try {
255 return q.size();
256 } finally {
257 lock.unlock();
258 }
259 }
260
261 /**
262 * Always returns <tt>Integer.MAX_VALUE</tt> because
263 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
264 * @return <tt>Integer.MAX_VALUE</tt>
265 */
266 public int remainingCapacity() {
267 return Integer.MAX_VALUE;
268 }
269
270 public boolean remove(Object o) {
271 final ReentrantLock lock = this.lock;
272 lock.lock();
273 try {
274 return q.remove(o);
275 } finally {
276 lock.unlock();
277 }
278 }
279
280 public boolean contains(Object o) {
281 final ReentrantLock lock = this.lock;
282 lock.lock();
283 try {
284 return q.contains(o);
285 } finally {
286 lock.unlock();
287 }
288 }
289
290 public Object[] toArray() {
291 final ReentrantLock lock = this.lock;
292 lock.lock();
293 try {
294 return q.toArray();
295 } finally {
296 lock.unlock();
297 }
298 }
299
300
301 public String toString() {
302 final ReentrantLock lock = this.lock;
303 lock.lock();
304 try {
305 return q.toString();
306 } finally {
307 lock.unlock();
308 }
309 }
310
311 public int drainTo(Collection<? super E> c) {
312 if (c == null)
313 throw new NullPointerException();
314 if (c == this)
315 throw new IllegalArgumentException();
316 final ReentrantLock lock = this.lock;
317 lock.lock();
318 try {
319 int n = 0;
320 E e;
321 while ( (e = q.poll()) != null) {
322 c.add(e);
323 ++n;
324 }
325 return n;
326 } finally {
327 lock.unlock();
328 }
329 }
330
331 public int drainTo(Collection<? super E> c, int maxElements) {
332 if (c == null)
333 throw new NullPointerException();
334 if (c == this)
335 throw new IllegalArgumentException();
336 if (maxElements <= 0)
337 return 0;
338 final ReentrantLock lock = this.lock;
339 lock.lock();
340 try {
341 int n = 0;
342 E e;
343 while (n < maxElements && (e = q.poll()) != null) {
344 c.add(e);
345 ++n;
346 }
347 return n;
348 } finally {
349 lock.unlock();
350 }
351 }
352
353 /**
354 * Atomically removes all of the elements from this delay queue.
355 * The queue will be empty after this call returns.
356 */
357 public void clear() {
358 final ReentrantLock lock = this.lock;
359 lock.lock();
360 try {
361 q.clear();
362 } finally {
363 lock.unlock();
364 }
365 }
366
367 public <T> T[] toArray(T[] a) {
368 final ReentrantLock lock = this.lock;
369 lock.lock();
370 try {
371 return q.toArray(a);
372 } finally {
373 lock.unlock();
374 }
375 }
376
377 /**
378 * Returns an iterator over the elements in this queue. The
379 * iterator does not return the elements in any particular order.
380 * The returned iterator is a thread-safe "fast-fail" iterator
381 * that will throw {@link
382 * java.util.ConcurrentModificationException} upon detected
383 * interference.
384 *
385 * @return an iterator over the elements in this queue.
386 */
387 public Iterator<E> iterator() {
388 final ReentrantLock lock = this.lock;
389 lock.lock();
390 try {
391 return new Itr(q.iterator());
392 } finally {
393 lock.unlock();
394 }
395 }
396
397 private class Itr<E> implements Iterator<E> {
398 private final Iterator<E> iter;
399 Itr(Iterator<E> i) {
400 iter = i;
401 }
402
403 public boolean hasNext() {
404 /*
405 * No sync -- we rely on underlying hasNext to be
406 * stateless, in which case we can return true by mistake
407 * only when next() will subsequently throw
408 * ConcurrentModificationException.
409 */
410 return iter.hasNext();
411 }
412
413 public E next() {
414 ReentrantLock lock = PriorityBlockingQueue.this.lock;
415 lock.lock();
416 try {
417 return iter.next();
418 } finally {
419 lock.unlock();
420 }
421 }
422
423 public void remove() {
424 ReentrantLock lock = PriorityBlockingQueue.this.lock;
425 lock.lock();
426 try {
427 iter.remove();
428 } finally {
429 lock.unlock();
430 }
431 }
432 }
433
434 /**
435 * Save the state to a stream (that is, serialize it). This
436 * merely wraps default serialization within lock. The
437 * serialization strategy for items is left to underlying
438 * Queue. Note that locking is not needed on deserialization, so
439 * readObject is not defined, just relying on default.
440 */
441 private void writeObject(java.io.ObjectOutputStream s)
442 throws java.io.IOException {
443 lock.lock();
444 try {
445 s.defaultWriteObject();
446 } finally {
447 lock.unlock();
448 }
449 }
450
451 }