ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.11
Committed: Mon Jul 28 09:40:14 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.10: +6 -6 lines
Log Message:
Commented out BlockingQueue.addAll; changed a few signatures to please 2.2 compiler

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An unbounded {@link BlockingQueue blocking queue} based on a
13 * {@link PriorityQueue},
14 * obeying its ordering rules and implementation characteristics.
15 * @since 1.5
16 * @author Doug Lea
17 **/
18 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
19 implements BlockingQueue<E>, Sorted, java.io.Serializable {
20
21 private final PriorityQueue<E> q;
22 private final ReentrantLock lock = new ReentrantLock(true);
23 private final Condition notEmpty = lock.newCondition();
24
25 /**
26 * Create a <tt>PriorityBlockingQueue</tt> with the default initial capacity
27 * (11) that orders its elements according to their natural
28 * ordering (using <tt>Comparable</tt>.)
29 */
30 public PriorityBlockingQueue() {
31 q = new PriorityQueue<E>();
32 }
33
34 /**
35 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
36 * capacity
37 * that orders its elements according to their natural ordering
38 * (using <tt>Comparable</tt>.)
39 *
40 * @param initialCapacity the initial capacity for this priority queue.
41 */
42 public PriorityBlockingQueue(int initialCapacity) {
43 q = new PriorityQueue<E>(initialCapacity, null);
44 }
45
46 /**
47 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
48 * capacity
49 * that orders its elements according to the specified comparator.
50 *
51 * @param initialCapacity the initial capacity for this priority queue.
52 * @param comparator the comparator used to order this priority queue.
53 * If <tt>null</tt> then the order depends on the elements' natural
54 * ordering.
55 */
56 public PriorityBlockingQueue(int initialCapacity,
57 Comparator<E> comparator) {
58 q = new PriorityQueue<E>(initialCapacity, comparator);
59 }
60
61 /**
62 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
63 * in the specified
64 * collection. The priority queue has an initial capacity of 110% of the
65 * size of the specified collection. If the specified collection
66 * implements the {@link Sorted} interface, the priority queue will be
67 * sorted according to the same comparator, or according to its elements'
68 * natural order if the collection is sorted according to its elements'
69 * natural order. If the specified collection does not implement
70 * <tt>Sorted</tt>, the priority queue is ordered according to
71 * its elements' natural order.
72 *
73 * @param initialElements the collection whose elements are to be placed
74 * into this priority queue.
75 * @throws ClassCastException if elements of the specified collection
76 * cannot be compared to one another according to the priority
77 * queue's ordering.
78 * @throws NullPointerException if the specified collection or an
79 * element of the specified collection is <tt>null</tt>.
80 */
81 public PriorityBlockingQueue(Collection<E> initialElements) {
82 q = new PriorityQueue<E>(initialElements);
83 }
84
85
86 // these first two override just to get the throws docs
87
88 /**
89 * @throws NullPointerException if the specified element is <tt>null</tt>.
90 */
91 public boolean add(E element) {
92 return super.add(element);
93 }
94
95 // /**
96 // * @throws NullPointerException if any element is <tt>null</tt>.
97 // */
98 // public boolean addAll(Collection c) {
99 // return super.addAll(c);
100 // }
101
102 public Comparator comparator() {
103 return q.comparator();
104 }
105
106 /** @throws NullPointerException if <tt>x</tt> is <tt>null</tt> */
107 public boolean offer(E x) {
108 if (x == null) throw new NullPointerException();
109 lock.lock();
110 try {
111 boolean ok = q.offer(x);
112 assert ok;
113 notEmpty.signal();
114 return true;
115 }
116 finally {
117 lock.unlock();
118 }
119 }
120
121 public void put(E x) throws InterruptedException {
122 offer(x); // never need to block
123 }
124
125 public boolean offer(E x, long timeout, TimeUnit unit)
126 throws InterruptedException {
127 return offer(x); // never need to block
128 }
129
130 public E take() throws InterruptedException {
131 lock.lockInterruptibly();
132 try {
133 try {
134 while (q.size() == 0)
135 notEmpty.await();
136 }
137 catch (InterruptedException ie) {
138 notEmpty.signal(); // propagate to non-interrupted thread
139 throw ie;
140 }
141 E x = q.poll();
142 assert x != null;
143 return x;
144 }
145 finally {
146 lock.unlock();
147 }
148 }
149
150
151 public E poll() {
152 lock.lock();
153 try {
154 return q.poll();
155 }
156 finally {
157 lock.unlock();
158 }
159 }
160
161 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
162 long nanos = unit.toNanos(timeout);
163 lock.lockInterruptibly();
164 try {
165 for (;;) {
166 E x = q.poll();
167 if (x != null)
168 return x;
169 if (nanos <= 0)
170 return null;
171 try {
172 nanos = notEmpty.awaitNanos(nanos);
173 }
174 catch (InterruptedException ie) {
175 notEmpty.signal(); // propagate to non-interrupted thread
176 throw ie;
177 }
178 }
179 }
180 finally {
181 lock.unlock();
182 }
183 }
184
185 public E peek() {
186 lock.lock();
187 try {
188 return q.peek();
189 }
190 finally {
191 lock.unlock();
192 }
193 }
194
195 public int size() {
196 lock.lock();
197 try {
198 return q.size();
199 }
200 finally {
201 lock.unlock();
202 }
203 }
204
205 /**
206 * Always returns <tt>Integer.MAX_VALUE</tt> because
207 * PriorityBlockingQueues are not capacity constrained.
208 * @return <tt>Integer.MAX_VALUE</tt>
209 */
210 public int remainingCapacity() {
211 return Integer.MAX_VALUE;
212 }
213
214 public boolean remove(Object x) {
215 lock.lock();
216 try {
217 return q.remove(x);
218 }
219 finally {
220 lock.unlock();
221 }
222 }
223
224 public boolean contains(Object x) {
225 lock.lock();
226 try {
227 return q.contains(x);
228 }
229 finally {
230 lock.unlock();
231 }
232 }
233
234 public Object[] toArray() {
235 lock.lock();
236 try {
237 return q.toArray();
238 }
239 finally {
240 lock.unlock();
241 }
242 }
243
244
245 public String toString() {
246 lock.lock();
247 try {
248 return q.toString();
249 }
250 finally {
251 lock.unlock();
252 }
253 }
254
255 public <T> T[] toArray(T[] a) {
256 lock.lock();
257 try {
258 return q.toArray(a);
259 }
260 finally {
261 lock.unlock();
262 }
263 }
264
265 public Iterator<E> iterator() {
266 lock.lock();
267 try {
268 return new Itr(q.iterator());
269 }
270 finally {
271 lock.unlock();
272 }
273 }
274
275 private class Itr<E> implements Iterator<E> {
276 private final Iterator<E> iter;
277 Itr(Iterator<E> i) {
278 iter = i;
279 }
280
281 public boolean hasNext() {
282 /*
283 * No sync -- we rely on underlying hasNext to be
284 * stateless, in which case we can return true by mistake
285 * only when next() willl subsequently throw
286 * ConcurrentModificationException.
287 */
288 return iter.hasNext();
289 }
290
291 public E next() {
292 lock.lock();
293 try {
294 return iter.next();
295 }
296 finally {
297 lock.unlock();
298 }
299 }
300
301 public void remove() {
302 lock.lock();
303 try {
304 iter.remove();
305 }
306 finally {
307 lock.unlock();
308 }
309 }
310 }
311
312 /**
313 * Save the state to a stream (that is, serialize it). This
314 * merely wraps default serialization within lock. The
315 * serialization strategy for items is left to underlying
316 * Queue. Note that locking is not needed on deserialization, so
317 * readObject is not defined, just relying on default.
318 */
319 private void writeObject(java.io.ObjectOutputStream s)
320 throws java.io.IOException {
321 lock.lock();
322 try {
323 s.defaultWriteObject();
324 }
325 finally {
326 lock.unlock();
327 }
328 }
329
330 }