ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.14
Committed: Tue Aug 5 06:32:02 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.13: +26 -23 lines
Log Message:
Regressed to the unbounded form

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@link BlockingQueue blocking queue} based on a
14 * {@link PriorityQueue},
15 * obeying its ordering rules and implementation characteristics.
16 * @since 1.5
17 * @author Doug Lea
18 */
19 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
20 implements Sorted, BlockingQueue<E>, java.io.Serializable {
21
22 private final PriorityQueue<E> q;
23 private final ReentrantLock lock = new ReentrantLock(true);
24 private final Condition notEmpty = lock.newCondition();
25
26 /**
27 * Create a <tt>PriorityBlockingQueue</tt> with the default initial
28 * capacity
29 * (11) that orders its elements according to their natural
30 * ordering (using <tt>Comparable</tt>.)
31 */
32 public PriorityBlockingQueue() {
33 q = new PriorityQueue<E>();
34 }
35
36 /**
37 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
38 * capacity
39 * that orders its elements according to their natural ordering
40 * (using <tt>Comparable</tt>.)
41 *
42 * @param initialCapacity the initial capacity for this priority queue.
43 */
44 public PriorityBlockingQueue(int initialCapacity) {
45 q = new PriorityQueue<E>(initialCapacity, null);
46 }
47
48 /**
49 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
50 * capacity
51 * that orders its elements according to the specified comparator.
52 *
53 * @param initialCapacity the initial capacity for this priority queue.
54 * @param comparator the comparator used to order this priority queue.
55 * If <tt>null</tt> then the order depends on the elements' natural
56 * ordering.
57 */
58 public PriorityBlockingQueue(int initialCapacity,
59 Comparator<? super E> comparator) {
60 q = new PriorityQueue<E>(initialCapacity, comparator);
61 }
62
63 /**
64 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
65 * in the specified
66 * collection. The priority queue has an initial capacity of 110% of the
67 * size of the specified collection. If the specified collection
68 * implements the {@link Sorted} interface, the priority queue will be
69 * sorted according to the same comparator, or according to its elements'
70 * natural order if the collection is sorted according to its elements'
71 * natural order. If the specified collection does not implement
72 * <tt>Sorted</tt>, the priority queue is ordered according to
73 * its elements' natural order.
74 *
75 * @param c the collection whose elements are to be placed
76 * into this priority queue.
77 * @throws ClassCastException if elements of the specified collection
78 * cannot be compared to one another according to the priority
79 * queue's ordering.
80 * @throws NullPointerException if <tt>c</tt> or any element within it
81 * is <tt>null</tt>
82 */
83 public PriorityBlockingQueue(Collection<? extends E> c) {
84 q = new PriorityQueue<E>(c);
85 }
86
87
88 // these first two override just to get the throws docs
89
90 /**
91 * @throws NullPointerException {@inheritDoc}
92 */
93 public boolean add(E element) {
94 return super.add(element);
95 }
96
97 /**
98 * @throws NullPointerException {@inheritDoc}
99 */
100 public boolean addAll(Collection<? extends E> c) {
101 return super.addAll(c);
102 }
103
104 public Comparator comparator() {
105 return q.comparator();
106 }
107
108 /**
109 * @throws NullPointerException if the specified element is <tt>null</tt>
110 **/
111 public boolean offer(E o) {
112 if (o == null) throw new NullPointerException();
113 lock.lock();
114 try {
115 boolean ok = q.offer(o);
116 assert ok;
117 notEmpty.signal();
118 return true;
119 }
120 finally {
121 lock.unlock();
122 }
123 }
124
125 public void put(E o) throws InterruptedException {
126 offer(o); // never need to block
127 }
128
129 public boolean offer(E o, long timeout, TimeUnit unit)
130 throws InterruptedException {
131 return offer(o); // never need to block
132 }
133
134 public E take() throws InterruptedException {
135 lock.lockInterruptibly();
136 try {
137 try {
138 while (q.size() == 0)
139 notEmpty.await();
140 }
141 catch (InterruptedException ie) {
142 notEmpty.signal(); // propagate to non-interrupted thread
143 throw ie;
144 }
145 E x = q.poll();
146 assert x != null;
147 return x;
148 }
149 finally {
150 lock.unlock();
151 }
152 }
153
154
155 public E poll() {
156 lock.lock();
157 try {
158 return q.poll();
159 }
160 finally {
161 lock.unlock();
162 }
163 }
164
165 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
166 long nanos = unit.toNanos(timeout);
167 lock.lockInterruptibly();
168 try {
169 for (;;) {
170 E x = q.poll();
171 if (x != null)
172 return x;
173 if (nanos <= 0)
174 return null;
175 try {
176 nanos = notEmpty.awaitNanos(nanos);
177 }
178 catch (InterruptedException ie) {
179 notEmpty.signal(); // propagate to non-interrupted thread
180 throw ie;
181 }
182 }
183 }
184 finally {
185 lock.unlock();
186 }
187 }
188
189 public E peek() {
190 lock.lock();
191 try {
192 return q.peek();
193 }
194 finally {
195 lock.unlock();
196 }
197 }
198
199 public int size() {
200 lock.lock();
201 try {
202 return q.size();
203 }
204 finally {
205 lock.unlock();
206 }
207 }
208
209 /**
210 * Always returns <tt>Integer.MAX_VALUE</tt> because
211 * PriorityBlockingQueues are not capacity constrained.
212 * @return <tt>Integer.MAX_VALUE</tt>
213 */
214 public int remainingCapacity() {
215 return Integer.MAX_VALUE;
216 }
217
218 public boolean remove(Object o) {
219 lock.lock();
220 try {
221 return q.remove(o);
222 }
223 finally {
224 lock.unlock();
225 }
226 }
227
228 public boolean contains(Object o) {
229 lock.lock();
230 try {
231 return q.contains(o);
232 }
233 finally {
234 lock.unlock();
235 }
236 }
237
238 public Object[] toArray() {
239 lock.lock();
240 try {
241 return q.toArray();
242 }
243 finally {
244 lock.unlock();
245 }
246 }
247
248
249 public String toString() {
250 lock.lock();
251 try {
252 return q.toString();
253 }
254 finally {
255 lock.unlock();
256 }
257 }
258
259 public <T> T[] toArray(T[] a) {
260 lock.lock();
261 try {
262 return q.toArray(a);
263 }
264 finally {
265 lock.unlock();
266 }
267 }
268
269 public Iterator<E> iterator() {
270 lock.lock();
271 try {
272 return new Itr(q.iterator());
273 }
274 finally {
275 lock.unlock();
276 }
277 }
278
279 private class Itr<E> implements Iterator<E> {
280 private final Iterator<E> iter;
281 Itr(Iterator<E> i) {
282 iter = i;
283 }
284
285 public boolean hasNext() {
286 /*
287 * No sync -- we rely on underlying hasNext to be
288 * stateless, in which case we can return true by mistake
289 * only when next() willl subsequently throw
290 * ConcurrentModificationException.
291 */
292 return iter.hasNext();
293 }
294
295 public E next() {
296 lock.lock();
297 try {
298 return iter.next();
299 }
300 finally {
301 lock.unlock();
302 }
303 }
304
305 public void remove() {
306 lock.lock();
307 try {
308 iter.remove();
309 }
310 finally {
311 lock.unlock();
312 }
313 }
314 }
315
316 /**
317 * Save the state to a stream (that is, serialize it). This
318 * merely wraps default serialization within lock. The
319 * serialization strategy for items is left to underlying
320 * Queue. Note that locking is not needed on deserialization, so
321 * readObject is not defined, just relying on default.
322 */
323 private void writeObject(java.io.ObjectOutputStream s)
324 throws java.io.IOException {
325 lock.lock();
326 try {
327 s.defaultWriteObject();
328 }
329 finally {
330 lock.unlock();
331 }
332 }
333
334 }