ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.15
Committed: Tue Aug 5 12:11:14 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.14: +15 -13 lines
Log Message:
Remove Sorted interface, adjust PQ and PBQ

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@link BlockingQueue blocking queue} based on a {@link
14 * PriorityQueue}, obeying its ordering rules and implementation
15 * characteristics. While this queue is logically unbounded,
16 * attempted additions may fail due to resource exhaustion (causing
17 * <tt>OutOfMemoryError</tt>) when <tt>Integer.MAX_VALUE</tt> elements
18 * are held.
19 * @since 1.5
20 * @author Doug Lea
21 */
22 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
23 implements BlockingQueue<E>, java.io.Serializable {
24
25 private final PriorityQueue<E> q;
26 private final ReentrantLock lock = new ReentrantLock(true);
27 private final Condition notEmpty = lock.newCondition();
28
29 /**
30 * Create a <tt>PriorityBlockingQueue</tt> with the default initial
31 * capacity
32 * (11) that orders its elements according to their natural
33 * ordering (using <tt>Comparable</tt>.)
34 */
35 public PriorityBlockingQueue() {
36 q = new PriorityQueue<E>();
37 }
38
39 /**
40 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
41 * capacity
42 * that orders its elements according to their natural ordering
43 * (using <tt>Comparable</tt>.)
44 *
45 * @param initialCapacity the initial capacity for this priority queue.
46 */
47 public PriorityBlockingQueue(int initialCapacity) {
48 q = new PriorityQueue<E>(initialCapacity, null);
49 }
50
51 /**
52 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
53 * capacity
54 * that orders its elements according to the specified comparator.
55 *
56 * @param initialCapacity the initial capacity for this priority queue.
57 * @param comparator the comparator used to order this priority queue.
58 * If <tt>null</tt> then the order depends on the elements' natural
59 * ordering.
60 */
61 public PriorityBlockingQueue(int initialCapacity,
62 Comparator<? super E> comparator) {
63 q = new PriorityQueue<E>(initialCapacity, comparator);
64 }
65
66 /**
67 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
68 * in the specified collection. The priority queue has an initial
69 * capacity of 110% of the size of the specified collection. If
70 * the specified collection is a {@link SortedSet} or a {@link
71 * PriorityQueue}, this priority queue will be sorted according to
72 * the same comparator, or according to its elements' natural
73 * order if the collection is sorted according to its elements'
74 * natural order. Otherwise, this priority queue is ordered
75 * according to its elements' natural order.
76 *
77 * @param c the collection whose elements are to be placed
78 * into this priority queue.
79 * @throws ClassCastException if elements of the specified collection
80 * cannot be compared to one another according to the priority
81 * queue's ordering.
82 * @throws NullPointerException if <tt>c</tt> or any element within it
83 * is <tt>null</tt>
84 */
85 public PriorityBlockingQueue(Collection<? extends E> c) {
86 q = new PriorityQueue<E>(c);
87 }
88
89
90 // these first two override just to get the throws docs
91
92 /**
93 * @throws NullPointerException {@inheritDoc}
94 */
95 public boolean add(E element) {
96 return super.add(element);
97 }
98
99 /**
100 * @throws NullPointerException {@inheritDoc}
101 */
102 public boolean addAll(Collection<? extends E> c) {
103 return super.addAll(c);
104 }
105
106 public Comparator comparator() {
107 return q.comparator();
108 }
109
110 /**
111 * @throws NullPointerException if the specified element is <tt>null</tt>
112 **/
113 public boolean offer(E o) {
114 if (o == null) throw new NullPointerException();
115 lock.lock();
116 try {
117 boolean ok = q.offer(o);
118 assert ok;
119 notEmpty.signal();
120 return true;
121 }
122 finally {
123 lock.unlock();
124 }
125 }
126
127 public void put(E o) throws InterruptedException {
128 offer(o); // never need to block
129 }
130
131 public boolean offer(E o, long timeout, TimeUnit unit)
132 throws InterruptedException {
133 return offer(o); // never need to block
134 }
135
136 public E take() throws InterruptedException {
137 lock.lockInterruptibly();
138 try {
139 try {
140 while (q.size() == 0)
141 notEmpty.await();
142 }
143 catch (InterruptedException ie) {
144 notEmpty.signal(); // propagate to non-interrupted thread
145 throw ie;
146 }
147 E x = q.poll();
148 assert x != null;
149 return x;
150 }
151 finally {
152 lock.unlock();
153 }
154 }
155
156
157 public E poll() {
158 lock.lock();
159 try {
160 return q.poll();
161 }
162 finally {
163 lock.unlock();
164 }
165 }
166
167 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
168 long nanos = unit.toNanos(timeout);
169 lock.lockInterruptibly();
170 try {
171 for (;;) {
172 E x = q.poll();
173 if (x != null)
174 return x;
175 if (nanos <= 0)
176 return null;
177 try {
178 nanos = notEmpty.awaitNanos(nanos);
179 }
180 catch (InterruptedException ie) {
181 notEmpty.signal(); // propagate to non-interrupted thread
182 throw ie;
183 }
184 }
185 }
186 finally {
187 lock.unlock();
188 }
189 }
190
191 public E peek() {
192 lock.lock();
193 try {
194 return q.peek();
195 }
196 finally {
197 lock.unlock();
198 }
199 }
200
201 public int size() {
202 lock.lock();
203 try {
204 return q.size();
205 }
206 finally {
207 lock.unlock();
208 }
209 }
210
211 /**
212 * Always returns <tt>Integer.MAX_VALUE</tt> because
213 * PriorityBlockingQueues are not capacity constrained.
214 * @return <tt>Integer.MAX_VALUE</tt>
215 */
216 public int remainingCapacity() {
217 return Integer.MAX_VALUE;
218 }
219
220 public boolean remove(Object o) {
221 lock.lock();
222 try {
223 return q.remove(o);
224 }
225 finally {
226 lock.unlock();
227 }
228 }
229
230 public boolean contains(Object o) {
231 lock.lock();
232 try {
233 return q.contains(o);
234 }
235 finally {
236 lock.unlock();
237 }
238 }
239
240 public Object[] toArray() {
241 lock.lock();
242 try {
243 return q.toArray();
244 }
245 finally {
246 lock.unlock();
247 }
248 }
249
250
251 public String toString() {
252 lock.lock();
253 try {
254 return q.toString();
255 }
256 finally {
257 lock.unlock();
258 }
259 }
260
261 public <T> T[] toArray(T[] a) {
262 lock.lock();
263 try {
264 return q.toArray(a);
265 }
266 finally {
267 lock.unlock();
268 }
269 }
270
271 public Iterator<E> iterator() {
272 lock.lock();
273 try {
274 return new Itr(q.iterator());
275 }
276 finally {
277 lock.unlock();
278 }
279 }
280
281 private class Itr<E> implements Iterator<E> {
282 private final Iterator<E> iter;
283 Itr(Iterator<E> i) {
284 iter = i;
285 }
286
287 public boolean hasNext() {
288 /*
289 * No sync -- we rely on underlying hasNext to be
290 * stateless, in which case we can return true by mistake
291 * only when next() willl subsequently throw
292 * ConcurrentModificationException.
293 */
294 return iter.hasNext();
295 }
296
297 public E next() {
298 lock.lock();
299 try {
300 return iter.next();
301 }
302 finally {
303 lock.unlock();
304 }
305 }
306
307 public void remove() {
308 lock.lock();
309 try {
310 iter.remove();
311 }
312 finally {
313 lock.unlock();
314 }
315 }
316 }
317
318 /**
319 * Save the state to a stream (that is, serialize it). This
320 * merely wraps default serialization within lock. The
321 * serialization strategy for items is left to underlying
322 * Queue. Note that locking is not needed on deserialization, so
323 * readObject is not defined, just relying on default.
324 */
325 private void writeObject(java.io.ObjectOutputStream s)
326 throws java.io.IOException {
327 lock.lock();
328 try {
329 s.defaultWriteObject();
330 }
331 finally {
332 lock.unlock();
333 }
334 }
335
336 }