ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.38
Committed: Wed Jun 2 23:49:07 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.37: +5 -8 lines
Log Message:
CopyOnWriteArraySet and ConcurrentHashMap no longer implement Cloneable
Improve javadoc wording in other classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} that uses
14 * the same ordering rules as class {@link PriorityQueue} and supplies
15 * blocking retrieval operations. While this queue is logically
16 * unbounded, attempted additions may fail due to resource exhaustion
17 * (causing <tt>OutOfMemoryError</tt>). This class does not permit
18 * <tt>null</tt> elements. A priority queue relying on natural
19 * ordering also does not permit insertion of non-comparable objects
20 * (doing so results in <tt>ClassCastException</tt>).
21 *
22 * <p>This class and its iterator implement all of the
23 * <em>optional</em> methods of the {@link Collection} and {@link
24 * Iterator} interfaces.
25 * The Iterator provided in method {@link #iterator()} is
26 * <em>not</em> guaranteed to traverse the elements of the
27 * PriorityBlockingQueue in any particular order. If you need ordered
28 * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
29 *
30 * <p>This class is a member of the
31 * <a href="{@docRoot}/../guide/collections/index.html">
32 * Java Collections Framework</a>.
33 *
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = 5595510919245408276L;
41
42 private final PriorityQueue<E> q;
43 private final ReentrantLock lock = new ReentrantLock(true);
44 private final Condition notEmpty = lock.newCondition();
45
46 /**
47 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
48 * capacity
49 * (11) that orders its elements according to their natural
50 * ordering (using <tt>Comparable</tt>).
51 */
52 public PriorityBlockingQueue() {
53 q = new PriorityQueue<E>();
54 }
55
56 /**
57 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
58 * capacity
59 * that orders its elements according to their natural ordering
60 * (using <tt>Comparable</tt>).
61 *
62 * @param initialCapacity the initial capacity for this priority queue.
63 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
64 * than 1
65 */
66 public PriorityBlockingQueue(int initialCapacity) {
67 q = new PriorityQueue<E>(initialCapacity, null);
68 }
69
70 /**
71 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
72 * capacity
73 * that orders its elements according to the specified comparator.
74 *
75 * @param initialCapacity the initial capacity for this priority queue.
76 * @param comparator the comparator used to order this priority queue.
77 * If <tt>null</tt> then the order depends on the elements' natural
78 * ordering.
79 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
80 * than 1
81 */
82 public PriorityBlockingQueue(int initialCapacity,
83 Comparator<? super E> comparator) {
84 q = new PriorityQueue<E>(initialCapacity, comparator);
85 }
86
87 /**
88 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
89 * in the specified collection. The priority queue has an initial
90 * capacity of 110% of the size of the specified collection. If
91 * the specified collection is a {@link SortedSet} or a {@link
92 * PriorityQueue}, this priority queue will be sorted according to
93 * the same comparator, or according to its elements' natural
94 * order if the collection is sorted according to its elements'
95 * natural order. Otherwise, this priority queue is ordered
96 * according to its elements' natural order.
97 *
98 * @param c the collection whose elements are to be placed
99 * into this priority queue.
100 * @throws ClassCastException if elements of the specified collection
101 * cannot be compared to one another according to the priority
102 * queue's ordering.
103 * @throws NullPointerException if <tt>c</tt> or any element within it
104 * is <tt>null</tt>
105 */
106 public PriorityBlockingQueue(Collection<? extends E> c) {
107 q = new PriorityQueue<E>(c);
108 }
109
110
111 // these first few override just to update doc comments
112
113 /**
114 * Adds the specified element to this queue.
115 * @param o the element to add
116 * @return <tt>true</tt> (as per the general contract of
117 * <tt>Collection.add</tt>).
118 *
119 * @throws NullPointerException if the specified element is <tt>null</tt>.
120 * @throws ClassCastException if the specified element cannot be compared
121 * with elements currently in the priority queue according
122 * to the priority queue's ordering.
123 */
124 public boolean add(E o) {
125 return super.add(o);
126 }
127
128 /**
129 * Returns the comparator used to order this collection, or <tt>null</tt>
130 * if this collection is sorted according to its elements natural ordering
131 * (using <tt>Comparable</tt>).
132 *
133 * @return the comparator used to order this collection, or <tt>null</tt>
134 * if this collection is sorted according to its elements natural ordering.
135 */
136 public Comparator<? super E> comparator() {
137 return q.comparator();
138 }
139
140 /**
141 * Inserts the specified element into this priority queue.
142 *
143 * @param o the element to add
144 * @return <tt>true</tt>
145 * @throws ClassCastException if the specified element cannot be compared
146 * with elements currently in the priority queue according
147 * to the priority queue's ordering.
148 * @throws NullPointerException if the specified element is <tt>null</tt>.
149 */
150 public boolean offer(E o) {
151 if (o == null) throw new NullPointerException();
152 final ReentrantLock lock = this.lock;
153 lock.lock();
154 try {
155 boolean ok = q.offer(o);
156 assert ok;
157 notEmpty.signal();
158 return true;
159 } finally {
160 lock.unlock();
161 }
162 }
163
164 /**
165 * Adds the specified element to this priority queue. As the queue is
166 * unbounded this method will never block.
167 * @param o the element to add
168 * @throws ClassCastException if the element cannot be compared
169 * with elements currently in the priority queue according
170 * to the priority queue's ordering.
171 * @throws NullPointerException if the specified element is <tt>null</tt>.
172 */
173 public void put(E o) {
174 offer(o); // never need to block
175 }
176
177 /**
178 * Inserts the specified element into this priority queue. As the queue is
179 * unbounded this method will never block.
180 * @param o the element to add
181 * @param timeout This parameter is ignored as the method never blocks
182 * @param unit This parameter is ignored as the method never blocks
183 * @return <tt>true</tt>
184 * @throws ClassCastException if the element cannot be compared
185 * with elements currently in the priority queue according
186 * to the priority queue's ordering.
187 * @throws NullPointerException if the specified element is <tt>null</tt>.
188 */
189 public boolean offer(E o, long timeout, TimeUnit unit) {
190 return offer(o); // never need to block
191 }
192
193 public E take() throws InterruptedException {
194 final ReentrantLock lock = this.lock;
195 lock.lockInterruptibly();
196 try {
197 try {
198 while (q.size() == 0)
199 notEmpty.await();
200 } catch (InterruptedException ie) {
201 notEmpty.signal(); // propagate to non-interrupted thread
202 throw ie;
203 }
204 E x = q.poll();
205 assert x != null;
206 return x;
207 } finally {
208 lock.unlock();
209 }
210 }
211
212
213 public E poll() {
214 final ReentrantLock lock = this.lock;
215 lock.lock();
216 try {
217 return q.poll();
218 } finally {
219 lock.unlock();
220 }
221 }
222
223 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
224 long nanos = unit.toNanos(timeout);
225 final ReentrantLock lock = this.lock;
226 lock.lockInterruptibly();
227 try {
228 for (;;) {
229 E x = q.poll();
230 if (x != null)
231 return x;
232 if (nanos <= 0)
233 return null;
234 try {
235 nanos = notEmpty.awaitNanos(nanos);
236 } catch (InterruptedException ie) {
237 notEmpty.signal(); // propagate to non-interrupted thread
238 throw ie;
239 }
240 }
241 } finally {
242 lock.unlock();
243 }
244 }
245
246 public E peek() {
247 final ReentrantLock lock = this.lock;
248 lock.lock();
249 try {
250 return q.peek();
251 } finally {
252 lock.unlock();
253 }
254 }
255
256 public int size() {
257 final ReentrantLock lock = this.lock;
258 lock.lock();
259 try {
260 return q.size();
261 } finally {
262 lock.unlock();
263 }
264 }
265
266 /**
267 * Always returns <tt>Integer.MAX_VALUE</tt> because
268 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
269 * @return <tt>Integer.MAX_VALUE</tt>
270 */
271 public int remainingCapacity() {
272 return Integer.MAX_VALUE;
273 }
274
275 /**
276 * Removes a single instance of the specified element from this
277 * queue, if it is present.
278 */
279 public boolean remove(Object o) {
280 final ReentrantLock lock = this.lock;
281 lock.lock();
282 try {
283 return q.remove(o);
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 public boolean contains(Object o) {
290 final ReentrantLock lock = this.lock;
291 lock.lock();
292 try {
293 return q.contains(o);
294 } finally {
295 lock.unlock();
296 }
297 }
298
299 public Object[] toArray() {
300 final ReentrantLock lock = this.lock;
301 lock.lock();
302 try {
303 return q.toArray();
304 } finally {
305 lock.unlock();
306 }
307 }
308
309
310 public String toString() {
311 final ReentrantLock lock = this.lock;
312 lock.lock();
313 try {
314 return q.toString();
315 } finally {
316 lock.unlock();
317 }
318 }
319
320 public int drainTo(Collection<? super E> c) {
321 if (c == null)
322 throw new NullPointerException();
323 if (c == this)
324 throw new IllegalArgumentException();
325 final ReentrantLock lock = this.lock;
326 lock.lock();
327 try {
328 int n = 0;
329 E e;
330 while ( (e = q.poll()) != null) {
331 c.add(e);
332 ++n;
333 }
334 return n;
335 } finally {
336 lock.unlock();
337 }
338 }
339
340 public int drainTo(Collection<? super E> c, int maxElements) {
341 if (c == null)
342 throw new NullPointerException();
343 if (c == this)
344 throw new IllegalArgumentException();
345 if (maxElements <= 0)
346 return 0;
347 final ReentrantLock lock = this.lock;
348 lock.lock();
349 try {
350 int n = 0;
351 E e;
352 while (n < maxElements && (e = q.poll()) != null) {
353 c.add(e);
354 ++n;
355 }
356 return n;
357 } finally {
358 lock.unlock();
359 }
360 }
361
362 /**
363 * Atomically removes all of the elements from this queue.
364 * The queue will be empty after this call returns.
365 */
366 public void clear() {
367 final ReentrantLock lock = this.lock;
368 lock.lock();
369 try {
370 q.clear();
371 } finally {
372 lock.unlock();
373 }
374 }
375
376 public <T> T[] toArray(T[] a) {
377 final ReentrantLock lock = this.lock;
378 lock.lock();
379 try {
380 return q.toArray(a);
381 } finally {
382 lock.unlock();
383 }
384 }
385
386 /**
387 * Returns an iterator over the elements in this queue. The
388 * iterator does not return the elements in any particular order.
389 * The returned iterator is a thread-safe "fast-fail" iterator
390 * that will throw {@link
391 * java.util.ConcurrentModificationException} upon detected
392 * interference.
393 *
394 * @return an iterator over the elements in this queue.
395 */
396 public Iterator<E> iterator() {
397 final ReentrantLock lock = this.lock;
398 lock.lock();
399 try {
400 return new Itr(q.iterator());
401 } finally {
402 lock.unlock();
403 }
404 }
405
406 private class Itr<E> implements Iterator<E> {
407 private final Iterator<E> iter;
408 Itr(Iterator<E> i) {
409 iter = i;
410 }
411
412 public boolean hasNext() {
413 /*
414 * No sync -- we rely on underlying hasNext to be
415 * stateless, in which case we can return true by mistake
416 * only when next() will subsequently throw
417 * ConcurrentModificationException.
418 */
419 return iter.hasNext();
420 }
421
422 public E next() {
423 ReentrantLock lock = PriorityBlockingQueue.this.lock;
424 lock.lock();
425 try {
426 return iter.next();
427 } finally {
428 lock.unlock();
429 }
430 }
431
432 public void remove() {
433 ReentrantLock lock = PriorityBlockingQueue.this.lock;
434 lock.lock();
435 try {
436 iter.remove();
437 } finally {
438 lock.unlock();
439 }
440 }
441 }
442
443 /**
444 * Save the state to a stream (that is, serialize it). This
445 * merely wraps default serialization within lock. The
446 * serialization strategy for items is left to underlying
447 * Queue. Note that locking is not needed on deserialization, so
448 * readObject is not defined, just relying on default.
449 */
450 private void writeObject(java.io.ObjectOutputStream s)
451 throws java.io.IOException {
452 lock.lock();
453 try {
454 s.defaultWriteObject();
455 } finally {
456 lock.unlock();
457 }
458 }
459
460 }