ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.22
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.21: +14 -22 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@linkplain BlockingQueue blocking queue} based on a
14 * {@link PriorityQueue},
15 * obeying its ordering rules and implementation characteristics.
16 * While this queue is logically unbounded,
17 * attempted additions may fail due to resource exhaustion (causing
18 * <tt>OutOfMemoryError</tt>).
19 *
20 * <p>The Iterator provided in method {@link #iterator()} is
21 * <em>not</em> guaranteed to traverse the elements of the
22 * PriorityBlockingQueue in any particular order. If you need ordered
23 * traversal, consider using <tt>Arrays.sort(pq.toArray())</tt>.
24 *
25 * @since 1.5
26 * @author Doug Lea
27 */
28 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
29 implements BlockingQueue<E>, java.io.Serializable {
30 private static final long serialVersionUID = 5595510919245408276L;
31
32 private final PriorityQueue<E> q;
33 private final ReentrantLock lock = new ReentrantLock(true);
34 private final Condition notEmpty = lock.newCondition();
35
36 /**
37 * Creates a <tt>PriorityBlockingQueue</tt> with the default initial
38 * capacity
39 * (11) that orders its elements according to their natural
40 * ordering (using <tt>Comparable</tt>).
41 */
42 public PriorityBlockingQueue() {
43 q = new PriorityQueue<E>();
44 }
45
46 /**
47 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
48 * capacity
49 * that orders its elements according to their natural ordering
50 * (using <tt>Comparable</tt>).
51 *
52 * @param initialCapacity the initial capacity for this priority queue.
53 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
54 * than 1
55 */
56 public PriorityBlockingQueue(int initialCapacity) {
57 q = new PriorityQueue<E>(initialCapacity, null);
58 }
59
60 /**
61 * Creates a <tt>PriorityBlockingQueue</tt> with the specified initial
62 * capacity
63 * that orders its elements according to the specified comparator.
64 *
65 * @param initialCapacity the initial capacity for this priority queue.
66 * @param comparator the comparator used to order this priority queue.
67 * If <tt>null</tt> then the order depends on the elements' natural
68 * ordering.
69 * @throws IllegalArgumentException if <tt>initialCapacity</tt> is less
70 * than 1
71 */
72 public PriorityBlockingQueue(int initialCapacity,
73 Comparator<? super E> comparator) {
74 q = new PriorityQueue<E>(initialCapacity, comparator);
75 }
76
77 /**
78 * Creates a <tt>PriorityBlockingQueue</tt> containing the elements
79 * in the specified collection. The priority queue has an initial
80 * capacity of 110% of the size of the specified collection. If
81 * the specified collection is a {@link SortedSet} or a {@link
82 * PriorityQueue}, this priority queue will be sorted according to
83 * the same comparator, or according to its elements' natural
84 * order if the collection is sorted according to its elements'
85 * natural order. Otherwise, this priority queue is ordered
86 * according to its elements' natural order.
87 *
88 * @param c the collection whose elements are to be placed
89 * into this priority queue.
90 * @throws ClassCastException if elements of the specified collection
91 * cannot be compared to one another according to the priority
92 * queue's ordering.
93 * @throws NullPointerException if <tt>c</tt> or any element within it
94 * is <tt>null</tt>
95 */
96 public PriorityBlockingQueue(Collection<? extends E> c) {
97 q = new PriorityQueue<E>(c);
98 }
99
100
101 // these first few override just to update doc comments
102
103 /**
104 * Adds the specified element to this queue.
105 * @return <tt>true</tt> (as per the general contract of
106 * <tt>Collection.add</tt>).
107 *
108 * @throws NullPointerException if the specified element is <tt>null</tt>.
109 * @throws ClassCastException if the specified element cannot be compared
110 * with elements currently in the priority queue according
111 * to the priority queue's ordering.
112 */
113 public boolean add(E o) {
114 return super.add(o);
115 }
116
117 /**
118 * Adds all of the elements in the specified collection to this queue.
119 * The behavior of this operation is undefined if
120 * the specified collection is modified while the operation is in
121 * progress. (This implies that the behavior of this call is undefined if
122 * the specified collection is this queue, and this queue is nonempty.)
123 * <p>
124 * This implementation iterates over the specified collection, and adds
125 * each object returned by the iterator to this collection, in turn.
126 * @param c collection whose elements are to be added to this queue
127 * @return <tt>true</tt> if this queue changed as a result of the
128 * call.
129 * @throws NullPointerException if <tt>c</tt> or any element in <tt>c</tt>
130 * is <tt>null</tt>
131 * @throws ClassCastException if any element cannot be compared
132 * with elements currently in the priority queue according
133 * to the priority queue's ordering.
134 */
135 public boolean addAll(Collection<? extends E> c) {
136 return super.addAll(c);
137 }
138
139 /**
140 * Returns the comparator used to order this collection, or <tt>null</tt>
141 * if this collection is sorted according to its elements natural ordering
142 * (using <tt>Comparable</tt>).
143 *
144 * @return the comparator used to order this collection, or <tt>null</tt>
145 * if this collection is sorted according to its elements natural ordering.
146 */
147 public Comparator comparator() {
148 return q.comparator();
149 }
150
151 /**
152 * Inserts the specified element to this priority queue.
153 *
154 * @param o the element to add
155 * @return <tt>true</tt>
156 * @throws ClassCastException if the specified element cannot be compared
157 * with elements currently in the priority queue according
158 * to the priority queue's ordering.
159 * @throws NullPointerException if the specified element is <tt>null</tt>.
160 */
161 public boolean offer(E o) {
162 if (o == null) throw new NullPointerException();
163 lock.lock();
164 try {
165 boolean ok = q.offer(o);
166 assert ok;
167 notEmpty.signal();
168 return true;
169 } finally {
170 lock.unlock();
171 }
172 }
173
174 /**
175 * Adds the specified element to this priority queue. As the queue is
176 * unbounded this method will never block.
177 * @param o the element to add
178 * @throws ClassCastException if the element cannot be compared
179 * with elements currently in the priority queue according
180 * to the priority queue's ordering.
181 * @throws NullPointerException if the specified element is <tt>null</tt>.
182 */
183 public void put(E o) {
184 offer(o); // never need to block
185 }
186
187 /**
188 * Adds the specified element to this priority queue. As the queue is
189 * unbounded this method will never block.
190 * @param o the element to add
191 * @param timeout This parameter is ignored as the method never blocks
192 * @param unit This parameter is ignored as the method never blocks
193 * @return <tt>true</tt>
194 * @throws ClassCastException if the element cannot be compared
195 * with elements currently in the priority queue according
196 * to the priority queue's ordering.
197 * @throws NullPointerException if the specified element is <tt>null</tt>.
198 */
199 public boolean offer(E o, long timeout, TimeUnit unit) {
200 return offer(o); // never need to block
201 }
202
203 public E take() throws InterruptedException {
204 lock.lockInterruptibly();
205 try {
206 try {
207 while (q.size() == 0)
208 notEmpty.await();
209 } catch (InterruptedException ie) {
210 notEmpty.signal(); // propagate to non-interrupted thread
211 throw ie;
212 }
213 E x = q.poll();
214 assert x != null;
215 return x;
216 } finally {
217 lock.unlock();
218 }
219 }
220
221
222 public E poll() {
223 lock.lock();
224 try {
225 return q.poll();
226 } finally {
227 lock.unlock();
228 }
229 }
230
231 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
232 long nanos = unit.toNanos(timeout);
233 lock.lockInterruptibly();
234 try {
235 for (;;) {
236 E x = q.poll();
237 if (x != null)
238 return x;
239 if (nanos <= 0)
240 return null;
241 try {
242 nanos = notEmpty.awaitNanos(nanos);
243 } catch (InterruptedException ie) {
244 notEmpty.signal(); // propagate to non-interrupted thread
245 throw ie;
246 }
247 }
248 } finally {
249 lock.unlock();
250 }
251 }
252
253 public E peek() {
254 lock.lock();
255 try {
256 return q.peek();
257 } finally {
258 lock.unlock();
259 }
260 }
261
262 public int size() {
263 lock.lock();
264 try {
265 return q.size();
266 } finally {
267 lock.unlock();
268 }
269 }
270
271 /**
272 * Always returns <tt>Integer.MAX_VALUE</tt> because
273 * a <tt>PriorityBlockingQueue</tt> is not capacity constrained.
274 * @return <tt>Integer.MAX_VALUE</tt>
275 */
276 public int remainingCapacity() {
277 return Integer.MAX_VALUE;
278 }
279
280 public boolean remove(Object o) {
281 lock.lock();
282 try {
283 return q.remove(o);
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 public boolean contains(Object o) {
290 lock.lock();
291 try {
292 return q.contains(o);
293 } finally {
294 lock.unlock();
295 }
296 }
297
298 public Object[] toArray() {
299 lock.lock();
300 try {
301 return q.toArray();
302 } finally {
303 lock.unlock();
304 }
305 }
306
307
308 public String toString() {
309 lock.lock();
310 try {
311 return q.toString();
312 } finally {
313 lock.unlock();
314 }
315 }
316
317 /**
318 * Atomically removes all of the elements from this delay queue.
319 * The queue will be empty after this call returns.
320 */
321 public void clear() {
322 lock.lock();
323 try {
324 q.clear();
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 public <T> T[] toArray(T[] a) {
331 lock.lock();
332 try {
333 return q.toArray(a);
334 } finally {
335 lock.unlock();
336 }
337 }
338
339 /**
340 * Returns an iterator over the elements in this queue. The iterator
341 * does not return the elements in any particular order.
342 * The
343 * returned iterator is a "fast-fail" iterator that will
344 * throw {@link java.util.ConcurrentModificationException}
345 * upon detected interference.
346 *
347 * @return an iterator over the elements in this queue.
348 */
349 public Iterator<E> iterator() {
350 lock.lock();
351 try {
352 return new Itr(q.iterator());
353 } finally {
354 lock.unlock();
355 }
356 }
357
358 private class Itr<E> implements Iterator<E> {
359 private final Iterator<E> iter;
360 Itr(Iterator<E> i) {
361 iter = i;
362 }
363
364 public boolean hasNext() {
365 /*
366 * No sync -- we rely on underlying hasNext to be
367 * stateless, in which case we can return true by mistake
368 * only when next() willl subsequently throw
369 * ConcurrentModificationException.
370 */
371 return iter.hasNext();
372 }
373
374 public E next() {
375 lock.lock();
376 try {
377 return iter.next();
378 } finally {
379 lock.unlock();
380 }
381 }
382
383 public void remove() {
384 lock.lock();
385 try {
386 iter.remove();
387 } finally {
388 lock.unlock();
389 }
390 }
391 }
392
393 /**
394 * Save the state to a stream (that is, serialize it). This
395 * merely wraps default serialization within lock. The
396 * serialization strategy for items is left to underlying
397 * Queue. Note that locking is not needed on deserialization, so
398 * readObject is not defined, just relying on default.
399 */
400 private void writeObject(java.io.ObjectOutputStream s)
401 throws java.io.IOException {
402 lock.lock();
403 try {
404 s.defaultWriteObject();
405 } finally {
406 lock.unlock();
407 }
408 }
409
410 }