ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.13
Committed: Mon Jul 28 16:00:19 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.12: +27 -26 lines
Log Message:
Added addAll() back in.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An unbounded {@link BlockingQueue blocking queue} based on a
14 * {@link PriorityQueue},
15 * obeying its ordering rules and implementation characteristics.
16 * @since 1.5
17 * @author Doug Lea
18 **/
19 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
20 implements BlockingQueue<E>, java.io.Serializable {
21
22 private final PriorityQueue<E> q;
23 private final ReentrantLock lock = new ReentrantLock(true);
24 private final Condition notEmpty = lock.newCondition();
25
26 /**
27 * Create a <tt>PriorityBlockingQueue</tt> with the default initial capacity
28 * (11) that orders its elements according to their natural
29 * ordering (using <tt>Comparable</tt>.)
30 */
31 public PriorityBlockingQueue() {
32 q = new PriorityQueue<E>();
33 }
34
35 /**
36 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
37 * capacity
38 * that orders its elements according to their natural ordering
39 * (using <tt>Comparable</tt>.)
40 *
41 * @param initialCapacity the initial capacity for this priority queue.
42 */
43 public PriorityBlockingQueue(int initialCapacity) {
44 q = new PriorityQueue<E>(initialCapacity, null);
45 }
46
47 /**
48 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
49 * capacity
50 * that orders its elements according to the specified comparator.
51 *
52 * @param initialCapacity the initial capacity for this priority queue.
53 * @param comparator the comparator used to order this priority queue.
54 * If <tt>null</tt> then the order depends on the elements' natural
55 * ordering.
56 */
57 public PriorityBlockingQueue(int initialCapacity,
58 Comparator<E> comparator) {
59 q = new PriorityQueue<E>(initialCapacity, comparator);
60 }
61
62 /**
63 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
64 * in the specified
65 * collection. The priority queue has an initial capacity of 110% of the
66 * size of the specified collection. If the specified collection
67 * implements the {@link Sorted} interface, the priority queue will be
68 * sorted according to the same comparator, or according to its elements'
69 * natural order if the collection is sorted according to its elements'
70 * natural order. If the specified collection does not implement
71 * <tt>Sorted</tt>, the priority queue is ordered according to
72 * its elements' natural order.
73 *
74 * @param initialElements the collection whose elements are to be placed
75 * into this priority queue.
76 * @throws ClassCastException if elements of the specified collection
77 * cannot be compared to one another according to the priority
78 * queue's ordering.
79 * @throws NullPointerException if the specified collection or an
80 * element of the specified collection is <tt>null</tt>.
81 */
82 public PriorityBlockingQueue(Collection<E> initialElements) {
83 q = new PriorityQueue<E>(initialElements);
84 }
85
86
87 // these first two override just to get the throws docs
88
89 /**
90 * @throws NullPointerException if the specified element is <tt>null</tt>.
91 */
92 public boolean add(E element) {
93 return super.add(element);
94 }
95
96 /**
97 * @throws NullPointerException if any element is <tt>null</tt>.
98 */
99 public boolean addAll(Collection<? extends E> c) {
100 return super.addAll(c);
101 }
102
103 public Comparator comparator() {
104 return q.comparator();
105 }
106
107 /** @throws NullPointerException if <tt>x</tt> is <tt>null</tt> */
108 public boolean offer(E x) {
109 if (x == null) throw new NullPointerException();
110 lock.lock();
111 try {
112 boolean ok = q.offer(x);
113 assert ok;
114 notEmpty.signal();
115 return true;
116 }
117 finally {
118 lock.unlock();
119 }
120 }
121
122 public void put(E x) throws InterruptedException {
123 offer(x); // never need to block
124 }
125
126 public boolean offer(E x, long timeout, TimeUnit unit)
127 throws InterruptedException {
128 return offer(x); // never need to block
129 }
130
131 public E take() throws InterruptedException {
132 lock.lockInterruptibly();
133 try {
134 try {
135 while (q.size() == 0)
136 notEmpty.await();
137 }
138 catch (InterruptedException ie) {
139 notEmpty.signal(); // propagate to non-interrupted thread
140 throw ie;
141 }
142 E x = q.poll();
143 assert x != null;
144 return x;
145 }
146 finally {
147 lock.unlock();
148 }
149 }
150
151
152 public E poll() {
153 lock.lock();
154 try {
155 return q.poll();
156 }
157 finally {
158 lock.unlock();
159 }
160 }
161
162 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
163 long nanos = unit.toNanos(timeout);
164 lock.lockInterruptibly();
165 try {
166 for (;;) {
167 E x = q.poll();
168 if (x != null)
169 return x;
170 if (nanos <= 0)
171 return null;
172 try {
173 nanos = notEmpty.awaitNanos(nanos);
174 }
175 catch (InterruptedException ie) {
176 notEmpty.signal(); // propagate to non-interrupted thread
177 throw ie;
178 }
179 }
180 }
181 finally {
182 lock.unlock();
183 }
184 }
185
186 public E peek() {
187 lock.lock();
188 try {
189 return q.peek();
190 }
191 finally {
192 lock.unlock();
193 }
194 }
195
196 public int size() {
197 lock.lock();
198 try {
199 return q.size();
200 }
201 finally {
202 lock.unlock();
203 }
204 }
205
206 /**
207 * Always returns <tt>Integer.MAX_VALUE</tt> because
208 * PriorityBlockingQueues are not capacity constrained.
209 * @return <tt>Integer.MAX_VALUE</tt>
210 */
211 public int remainingCapacity() {
212 return Integer.MAX_VALUE;
213 }
214
215 public boolean remove(Object x) {
216 lock.lock();
217 try {
218 return q.remove(x);
219 }
220 finally {
221 lock.unlock();
222 }
223 }
224
225 public boolean contains(Object x) {
226 lock.lock();
227 try {
228 return q.contains(x);
229 }
230 finally {
231 lock.unlock();
232 }
233 }
234
235 public Object[] toArray() {
236 lock.lock();
237 try {
238 return q.toArray();
239 }
240 finally {
241 lock.unlock();
242 }
243 }
244
245
246 public String toString() {
247 lock.lock();
248 try {
249 return q.toString();
250 }
251 finally {
252 lock.unlock();
253 }
254 }
255
256 public <T> T[] toArray(T[] a) {
257 lock.lock();
258 try {
259 return q.toArray(a);
260 }
261 finally {
262 lock.unlock();
263 }
264 }
265
266 public Iterator<E> iterator() {
267 lock.lock();
268 try {
269 return new Itr(q.iterator());
270 }
271 finally {
272 lock.unlock();
273 }
274 }
275
276 private class Itr<E> implements Iterator<E> {
277 private final Iterator<E> iter;
278 Itr(Iterator<E> i) {
279 iter = i;
280 }
281
282 public boolean hasNext() {
283 /*
284 * No sync -- we rely on underlying hasNext to be
285 * stateless, in which case we can return true by mistake
286 * only when next() willl subsequently throw
287 * ConcurrentModificationException.
288 */
289 return iter.hasNext();
290 }
291
292 public E next() {
293 lock.lock();
294 try {
295 return iter.next();
296 }
297 finally {
298 lock.unlock();
299 }
300 }
301
302 public void remove() {
303 lock.lock();
304 try {
305 iter.remove();
306 }
307 finally {
308 lock.unlock();
309 }
310 }
311 }
312
313 /**
314 * Save the state to a stream (that is, serialize it). This
315 * merely wraps default serialization within lock. The
316 * serialization strategy for items is left to underlying
317 * Queue. Note that locking is not needed on deserialization, so
318 * readObject is not defined, just relying on default.
319 */
320 private void writeObject(java.io.ObjectOutputStream s)
321 throws java.io.IOException {
322 lock.lock();
323 try {
324 s.defaultWriteObject();
325 }
326 finally {
327 lock.unlock();
328 }
329 }
330
331 }