ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.33
Committed: Sat Dec 27 19:26:26 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.32: +2 -2 lines
Log Message:
Headers reference Creative Commons

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on natural
19 * ordering also does not permit insertion of non-comparable objects
20 * (doing so results in <tt>ClassCastException</tt>).
21 *
22 * <p>This class implements all of the <em>optional</em> methods
23 * of the {@link Collection} and {@link Iterator} interfaces.
24 * <p>The Iterator provided in method {@link #iterator()} is
25 * <em>not</em> guaranteed to traverse the elements of the
26 * PriorityBlockingQueue in any particular order. If you need ordered
27 * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 * @param <E> the type of elements held in this collection
32 */
33 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35 private static final long serialVersionUID = 5595510919245408276L;
36
37 private final PriorityQueue<E> q;
38 private final ReentrantLock lock = new ReentrantLock(true);
39 private final Condition notEmpty = lock.newCondition();
40
41 /**
42 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
43 * capacity
44 * (11) that orders its elements according to their natural
45 * ordering (using <tt>Comparable</tt>).
46 */
47 public PriorityBlockingQueue() {
48 q = new PriorityQueue<E>();
49 }
50
51 /**
52 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
53 * capacity
54 * that orders its elements according to their natural ordering
55 * (using <tt>Comparable</tt>).
56 *
57 * @param initialCapacity the initial capacity for this priority queue.
58 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
59 * than 1
60 */
61 public PriorityBlockingQueue(int initialCapacity) {
62 q = new PriorityQueue<E>(initialCapacity, null);
63 }
64
65 /**
66 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
67 * capacity
68 * that orders its elements according to the specified comparator.
69 *
70 * @param initialCapacity the initial capacity for this priority queue.
71 * @param comparator the comparator used to order this priority queue.
72 * If <tt>null</tt> then the order depends on the elements' natural
73 * ordering.
74 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
75 * than 1
76 */
77 public PriorityBlockingQueue(int initialCapacity,
78 Comparator<? super E> comparator) {
79 q = new PriorityQueue<E>(initialCapacity, comparator);
80 }
81
82 /**
83 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
84 * in the specified collection. The priority queue has an initial
85 * capacity of 110% of the size of the specified collection. If
86 * the specified collection is a {@link SortedSet} or a {@link
87 * PriorityQueue}, this priority queue will be sorted according to
88 * the same comparator, or according to its elements' natural
89 * order if the collection is sorted according to its elements'
90 * natural order. Otherwise, this priority queue is ordered
91 * according to its elements' natural order.
92 *
93 * @param c the collection whose elements are to be placed
94 * into this priority queue.
95 * @throws ClassCastException if elements of the specified collection
96 * cannot be compared to one another according to the priority
97 * queue's ordering.
98 * @throws NullPointerException if <tt>c</tt> or any element within it
99 * is <tt>null</tt>
100 */
101 public PriorityBlockingQueue(Collection<? extends E> c) {
102 q = new PriorityQueue<E>(c);
103 }
104
105
106 // these first few override just to update doc comments
107
108 /**
109 * Adds the specified element to this queue.
110 * @return <tt>true</tt> (as per the general contract of
111 * <tt>Collection.add</tt>).
112 *
113 * @throws NullPointerException if the specified element is <tt>null</tt>.
114 * @throws ClassCastException if the specified element cannot be compared
115 * with elements currently in the priority queue according
116 * to the priority queue's ordering.
117 */
118 public boolean add(E o) {
119 return super.add(o);
120 }
121
122 /**
123 * Returns the comparator used to order this collection, or <tt>null</tt>
124 * if this collection is sorted according to its elements natural ordering
125 * (using <tt>Comparable</tt>).
126 *
127 * @return the comparator used to order this collection, or <tt>null</tt>
128 * if this collection is sorted according to its elements natural ordering.
129 */
130 public Comparator comparator() {
131 return q.comparator();
132 }
133
134 /**
135 * Inserts the specified element into this priority queue.
136 *
137 * @param o the element to add
138 * @return <tt>true</tt>
139 * @throws ClassCastException if the specified element cannot be compared
140 * with elements currently in the priority queue according
141 * to the priority queue's ordering.
142 * @throws NullPointerException if the specified element is <tt>null</tt>.
143 */
144 public boolean offer(E o) {
145 if (o == null) throw new NullPointerException();
146 final ReentrantLock lock = this.lock;
147 lock.lock();
148 try {
149 boolean ok = q.offer(o);
150 assert ok;
151 notEmpty.signal();
152 return true;
153 } finally {
154 lock.unlock();
155 }
156 }
157
158 /**
159 * Adds the specified element to this priority queue. As the queue is
160 * unbounded this method will never block.
161 * @param o the element to add
162 * @throws ClassCastException if the element cannot be compared
163 * with elements currently in the priority queue according
164 * to the priority queue's ordering.
165 * @throws NullPointerException if the specified element is <tt>null</tt>.
166 */
167 public void put(E o) {
168 offer(o); // never need to block
169 }
170
171 /**
172 * Inserts the specified element into this priority queue. As the queue is
173 * unbounded this method will never block.
174 * @param o the element to add
175 * @param timeout This parameter is ignored as the method never blocks
176 * @param unit This parameter is ignored as the method never blocks
177 * @return <tt>true</tt>
178 * @throws ClassCastException if the element cannot be compared
179 * with elements currently in the priority queue according
180 * to the priority queue's ordering.
181 * @throws NullPointerException if the specified element is <tt>null</tt>.
182 */
183 public boolean offer(E o, long timeout, TimeUnit unit) {
184 return offer(o); // never need to block
185 }
186
187 public E take() throws InterruptedException {
188 final ReentrantLock lock = this.lock;
189 lock.lockInterruptibly();
190 try {
191 try {
192 while (q.size() == 0)
193 notEmpty.await();
194 } catch (InterruptedException ie) {
195 notEmpty.signal(); // propagate to non-interrupted thread
196 throw ie;
197 }
198 E x = q.poll();
199 assert x != null;
200 return x;
201 } finally {
202 lock.unlock();
203 }
204 }
205
206
207 public E poll() {
208 final ReentrantLock lock = this.lock;
209 lock.lock();
210 try {
211 return q.poll();
212 } finally {
213 lock.unlock();
214 }
215 }
216
217 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
218 long nanos = unit.toNanos(timeout);
219 final ReentrantLock lock = this.lock;
220 lock.lockInterruptibly();
221 try {
222 for (;;) {
223 E x = q.poll();
224 if (x != null)
225 return x;
226 if (nanos <= 0)
227 return null;
228 try {
229 nanos = notEmpty.awaitNanos(nanos);
230 } catch (InterruptedException ie) {
231 notEmpty.signal(); // propagate to non-interrupted thread
232 throw ie;
233 }
234 }
235 } finally {
236 lock.unlock();
237 }
238 }
239
240 public E peek() {
241 final ReentrantLock lock = this.lock;
242 lock.lock();
243 try {
244 return q.peek();
245 } finally {
246 lock.unlock();
247 }
248 }
249
250 public int size() {
251 final ReentrantLock lock = this.lock;
252 lock.lock();
253 try {
254 return q.size();
255 } finally {
256 lock.unlock();
257 }
258 }
259
260 /**
261 * Always returns <tt>Integer.MAX_VALUE</tt> because
262 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
263 * @return <tt>Integer.MAX_VALUE</tt>
264 */
265 public int remainingCapacity() {
266 return Integer.MAX_VALUE;
267 }
268
269 public boolean remove(Object o) {
270 final ReentrantLock lock = this.lock;
271 lock.lock();
272 try {
273 return q.remove(o);
274 } finally {
275 lock.unlock();
276 }
277 }
278
279 public boolean contains(Object o) {
280 final ReentrantLock lock = this.lock;
281 lock.lock();
282 try {
283 return q.contains(o);
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 public Object[] toArray() {
290 final ReentrantLock lock = this.lock;
291 lock.lock();
292 try {
293 return q.toArray();
294 } finally {
295 lock.unlock();
296 }
297 }
298
299
300 public String toString() {
301 final ReentrantLock lock = this.lock;
302 lock.lock();
303 try {
304 return q.toString();
305 } finally {
306 lock.unlock();
307 }
308 }
309
310 public int drainTo(Collection<? super E> c) {
311 if (c == null)
312 throw new NullPointerException();
313 if (c == this)
314 throw new IllegalArgumentException();
315 final ReentrantLock lock = this.lock;
316 lock.lock();
317 try {
318 int n = 0;
319 E e;
320 while ( (e = q.poll()) != null) {
321 c.add(e);
322 ++n;
323 }
324 return n;
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 public int drainTo(Collection<? super E> c, int maxElements) {
331 if (c == null)
332 throw new NullPointerException();
333 if (c == this)
334 throw new IllegalArgumentException();
335 if (maxElements <= 0)
336 return 0;
337 final ReentrantLock lock = this.lock;
338 lock.lock();
339 try {
340 int n = 0;
341 E e;
342 while (n < maxElements && (e = q.poll()) != null) {
343 c.add(e);
344 ++n;
345 }
346 return n;
347 } finally {
348 lock.unlock();
349 }
350 }
351
352 /**
353 * Atomically removes all of the elements from this delay queue.
354 * The queue will be empty after this call returns.
355 */
356 public void clear() {
357 final ReentrantLock lock = this.lock;
358 lock.lock();
359 try {
360 q.clear();
361 } finally {
362 lock.unlock();
363 }
364 }
365
366 public <T> T[] toArray(T[] a) {
367 final ReentrantLock lock = this.lock;
368 lock.lock();
369 try {
370 return q.toArray(a);
371 } finally {
372 lock.unlock();
373 }
374 }
375
376 /**
377 * Returns an iterator over the elements in this queue. The
378 * iterator does not return the elements in any particular order.
379 * The returned iterator is a thread-safe "fast-fail" iterator
380 * that will throw {@link
381 * java.util.ConcurrentModificationException} upon detected
382 * interference.
383 *
384 * @return an iterator over the elements in this queue.
385 */
386 public Iterator<E> iterator() {
387 final ReentrantLock lock = this.lock;
388 lock.lock();
389 try {
390 return new Itr(q.iterator());
391 } finally {
392 lock.unlock();
393 }
394 }
395
396 private class Itr<E> implements Iterator<E> {
397 private final Iterator<E> iter;
398 Itr(Iterator<E> i) {
399 iter = i;
400 }
401
402 public boolean hasNext() {
403 /*
404 * No sync -- we rely on underlying hasNext to be
405 * stateless, in which case we can return true by mistake
406 * only when next() will subsequently throw
407 * ConcurrentModificationException.
408 */
409 return iter.hasNext();
410 }
411
412 public E next() {
413 ReentrantLock lock = PriorityBlockingQueue.this.lock;
414 lock.lock();
415 try {
416 return iter.next();
417 } finally {
418 lock.unlock();
419 }
420 }
421
422 public void remove() {
423 ReentrantLock lock = PriorityBlockingQueue.this.lock;
424 lock.lock();
425 try {
426 iter.remove();
427 } finally {
428 lock.unlock();
429 }
430 }
431 }
432
433 /**
434 * Save the state to a stream (that is, serialize it). This
435 * merely wraps default serialization within lock. The
436 * serialization strategy for items is left to underlying
437 * Queue. Note that locking is not needed on deserialization, so
438 * readObject is not defined, just relying on default.
439 */
440 private void writeObject(java.io.ObjectOutputStream s)
441 throws java.io.IOException {
442 lock.lock();
443 try {
444 s.defaultWriteObject();
445 } finally {
446 lock.unlock();
447 }
448 }
449
450 }