ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.18
Committed: Wed Aug 6 18:22:09 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
CVS Tags: JSR166_CR1
Changes since 1.17: +3 -3 lines
Log Message:
Fixes to minor errors found by DocCheck

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} based on a
14 * {@link PriorityQueue},
15 * obeying its ordering rules and implementation characteristics.
16 * While this queue is logically unbounded,
17 * attempted additions may fail due to resource exhaustion (causing
18 * <tt>OutOfMemoryError</tt>).
19 * @since 1.5
20 * @author Doug Lea
21 */
22 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
23 implements BlockingQueue<E>, java.io.Serializable {
24
25 private final PriorityQueue<E> q;
26 private final ReentrantLock lock = new ReentrantLock(true);
27 private final Condition notEmpty = lock.newCondition();
28
29 /**
30 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
31 * capacity
32 * (11) that orders its elements according to their natural
33 * ordering (using <tt>Comparable</tt>).
34 */
35 public PriorityBlockingQueue() {
36 q = new PriorityQueue<E>();
37 }
38
39 /**
40 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
41 * capacity
42 * that orders its elements according to their natural ordering
43 * (using <tt>Comparable</tt>).
44 *
45 * @param initialCapacity the initial capacity for this priority queue.
46 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
47 * than 1
48 */
49 public PriorityBlockingQueue(int initialCapacity) {
50 q = new PriorityQueue<E>(initialCapacity, null);
51 }
52
53 /**
54 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
55 * capacity
56 * that orders its elements according to the specified comparator.
57 *
58 * @param initialCapacity the initial capacity for this priority queue.
59 * @param comparator the comparator used to order this priority queue.
60 * If <tt>null</tt> then the order depends on the elements' natural
61 * ordering.
62 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
63 * than 1
64 */
65 public PriorityBlockingQueue(int initialCapacity,
66 Comparator<? super E> comparator) {
67 q = new PriorityQueue<E>(initialCapacity, comparator);
68 }
69
70 /**
71 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
72 * in the specified collection. The priority queue has an initial
73 * capacity of 110% of the size of the specified collection. If
74 * the specified collection is a {@link SortedSet} or a {@link
75 * PriorityQueue}, this priority queue will be sorted according to
76 * the same comparator, or according to its elements' natural
77 * order if the collection is sorted according to its elements'
78 * natural order. Otherwise, this priority queue is ordered
79 * according to its elements' natural order.
80 *
81 * @param c the collection whose elements are to be placed
82 * into this priority queue.
83 * @throws ClassCastException if elements of the specified collection
84 * cannot be compared to one another according to the priority
85 * queue's ordering.
86 * @throws NullPointerException if <tt>c</tt> or any element within it
87 * is <tt>null</tt>
88 */
89 public PriorityBlockingQueue(Collection<? extends E> c) {
90 q = new PriorityQueue<E>(c);
91 }
92
93
94 // these first few override just to update doc comments
95
96 /**
97 * Adds the specified element to this queue.
98 * @return <tt>true</tt> (as per the general contract of
99 * <tt>Collection.add</tt>).
100 *
101 * @throws NullPointerException {@inheritDoc}
102 * @throws ClassCastException if the specified element cannot be compared
103 * with elements currently in the priority queue according
104 * to the priority queue's ordering.
105 */
106 public boolean add(E o) {
107 return super.add(o);
108 }
109
110 /**
111 * Adds all of the elements in the specified collection to this queue.
112 * The behavior of this operation is undefined if
113 * the specified collection is modified while the operation is in
114 * progress. (This implies that the behavior of this call is undefined if
115 * the specified collection is this queue, and this queue is nonempty.)
116 * <p>
117 * This implementation iterates over the specified collection, and adds
118 * each object returned by the iterator to this collection, in turn.
119 * @throws NullPointerException {@inheritDoc}
120 * @throws ClassCastException if any element cannot be compared
121 * with elements currently in the priority queue according
122 * to the priority queue's ordering.
123 */
124 public boolean addAll(Collection<? extends E> c) {
125 return super.addAll(c);
126 }
127
128 /**
129 * Returns the comparator used to order this collection, or <tt>null</tt>
130 * if this collection is sorted according to its elements natural ordering
131 * (using <tt>Comparable</tt>).
132 *
133 * @return the comparator used to order this collection, or <tt>null</tt>
134 * if this collection is sorted according to its elements natural ordering.
135 */
136 public Comparator comparator() {
137 return q.comparator();
138 }
139
140 /**
141 * Adds the specified element to this priority queue.
142 *
143 * @return <tt>true</tt>
144 * @throws ClassCastException if the specified element cannot be compared
145 * with elements currently in the priority queue according
146 * to the priority queue's ordering.
147 * @throws NullPointerException {@inheritDoc}
148 */
149 public boolean offer(E o) {
150 if (o == null) throw new NullPointerException();
151 lock.lock();
152 try {
153 boolean ok = q.offer(o);
154 assert ok;
155 notEmpty.signal();
156 return true;
157 }
158 finally {
159 lock.unlock();
160 }
161 }
162
163 /**
164 * Adds the specified element to this priority queue. As the queue is
165 * unbounded this method will never block.
166 * @throws ClassCastException if the element cannot be compared
167 * with elements currently in the priority queue according
168 * to the priority queue's ordering.
169 * @throws NullPointerException {@inheritDoc}
170 */
171 public void put(E o) {
172 offer(o); // never need to block
173 }
174
175 /**
176 * Adds the specified element to this priority queue. As the queue is
177 * unbounded this method will never block.
178 * @param o {@inheritDoc}
179 * @param timeout This parameter is ignored as the method never blocks
180 * @param unit This parameter is ignored as the method never blocks
181 * @throws ClassCastException if the element cannot be compared
182 * with elements currently in the priority queue according
183 * to the priority queue's ordering.
184 * @throws NullPointerException {@inheritDoc}
185 * @return <tt>true</tt>
186 */
187 public boolean offer(E o, long timeout, TimeUnit unit) {
188 return offer(o); // never need to block
189 }
190
191 public E take() throws InterruptedException {
192 lock.lockInterruptibly();
193 try {
194 try {
195 while (q.size() == 0)
196 notEmpty.await();
197 }
198 catch (InterruptedException ie) {
199 notEmpty.signal(); // propagate to non-interrupted thread
200 throw ie;
201 }
202 E x = q.poll();
203 assert x != null;
204 return x;
205 }
206 finally {
207 lock.unlock();
208 }
209 }
210
211
212 public E poll() {
213 lock.lock();
214 try {
215 return q.poll();
216 }
217 finally {
218 lock.unlock();
219 }
220 }
221
222 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
223 long nanos = unit.toNanos(timeout);
224 lock.lockInterruptibly();
225 try {
226 for (;;) {
227 E x = q.poll();
228 if (x != null)
229 return x;
230 if (nanos <= 0)
231 return null;
232 try {
233 nanos = notEmpty.awaitNanos(nanos);
234 }
235 catch (InterruptedException ie) {
236 notEmpty.signal(); // propagate to non-interrupted thread
237 throw ie;
238 }
239 }
240 }
241 finally {
242 lock.unlock();
243 }
244 }
245
246 public E peek() {
247 lock.lock();
248 try {
249 return q.peek();
250 }
251 finally {
252 lock.unlock();
253 }
254 }
255
256 public int size() {
257 lock.lock();
258 try {
259 return q.size();
260 }
261 finally {
262 lock.unlock();
263 }
264 }
265
266 /**
267 * Always returns <tt>Integer.MAX_VALUE</tt> because
268 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
269 * @return <tt>Integer.MAX_VALUE</tt>
270 */
271 public int remainingCapacity() {
272 return Integer.MAX_VALUE;
273 }
274
275 /**
276 * Removes a single instance of the specified element from this
277 * queue, if it is present. More formally,
278 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
279 * o.equals(e))</tt>, if the queue contains one or more such
280 * elements. Returns <tt>true</tt> if the queue contained the
281 * specified element (or equivalently, if the queue changed as a
282 * result of the call).
283 *
284 * <p>This implementation iterates over the queue looking for the
285 * specified element. If it finds the element, it removes the element
286 * from the queue using the iterator's remove method.<p>
287 *
288 */
289 public boolean remove(Object o) {
290 lock.lock();
291 try {
292 return q.remove(o);
293 }
294 finally {
295 lock.unlock();
296 }
297 }
298
299 public boolean contains(Object o) {
300 lock.lock();
301 try {
302 return q.contains(o);
303 }
304 finally {
305 lock.unlock();
306 }
307 }
308
309 public Object[] toArray() {
310 lock.lock();
311 try {
312 return q.toArray();
313 }
314 finally {
315 lock.unlock();
316 }
317 }
318
319
320 public String toString() {
321 lock.lock();
322 try {
323 return q.toString();
324 }
325 finally {
326 lock.unlock();
327 }
328 }
329
330 /**
331 * Atomically removes all of the elements from this delay queue.
332 * The queue will be empty after this call returns.
333 */
334 public void clear() {
335 lock.lock();
336 try {
337 q.clear();
338 }
339 finally {
340 lock.unlock();
341 }
342 }
343
344 public <T> T[] toArray(T[] a) {
345 lock.lock();
346 try {
347 return q.toArray(a);
348 }
349 finally {
350 lock.unlock();
351 }
352 }
353
354 /**
355 * Returns an iterator over the elements in this queue. The iterator
356 * does not return the elements in any particular order.
357 * The
358 * returned iterator is a "fast-fail" iterator that will
359 * throw {@link java.util.ConcurrentModificationException}
360 * upon detected interference.
361 *
362 * @return an iterator over the elements in this queue.
363 */
364 public Iterator<E> iterator() {
365 lock.lock();
366 try {
367 return new Itr(q.iterator());
368 }
369 finally {
370 lock.unlock();
371 }
372 }
373
374 private class Itr<E> implements Iterator<E> {
375 private final Iterator<E> iter;
376 Itr(Iterator<E> i) {
377 iter = i;
378 }
379
380 public boolean hasNext() {
381 /*
382 * No sync -- we rely on underlying hasNext to be
383 * stateless, in which case we can return true by mistake
384 * only when next() willl subsequently throw
385 * ConcurrentModificationException.
386 */
387 return iter.hasNext();
388 }
389
390 public E next() {
391 lock.lock();
392 try {
393 return iter.next();
394 }
395 finally {
396 lock.unlock();
397 }
398 }
399
400 public void remove() {
401 lock.lock();
402 try {
403 iter.remove();
404 }
405 finally {
406 lock.unlock();
407 }
408 }
409 }
410
411 /**
412 * Save the state to a stream (that is, serialize it). This
413 * merely wraps default serialization within lock. The
414 * serialization strategy for items is left to underlying
415 * Queue. Note that locking is not needed on deserialization, so
416 * readObject is not defined, just relying on default.
417 */
418 private void writeObject(java.io.ObjectOutputStream s)
419 throws java.io.IOException {
420 lock.lock();
421 try {
422 s.defaultWriteObject();
423 }
424 finally {
425 lock.unlock();
426 }
427 }
428
429 }