ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.6
Committed: Tue Jun 24 14:34:48 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.5: +4 -2 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * An unbounded blocking queue based on a {@link PriorityQueue},
12 * obeying its ordering rules and implementation characteristics.
13 * @since 1.5
14 * @author Doug Lea
15 **/
16 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
17 implements BlockingQueue<E>, java.io.Serializable {
18
19 private final PriorityQueue<E> q;
20 private final FairReentrantLock lock = new FairReentrantLock();
21 private final Condition notEmpty = lock.newCondition();
22
23 /**
24 * Create a new priority queue with the default initial capacity (11)
25 * that orders its elements according to their natural ordering.
26 */
27 public PriorityBlockingQueue() {
28 q = new PriorityQueue<E>();
29 }
30
31 /**
32 * Create a new priority queue with the specified initial capacity
33 * that orders its elements according to their natural ordering.
34 *
35 * @param initialCapacity the initial capacity for this priority queue.
36 */
37 public PriorityBlockingQueue(int initialCapacity) {
38 q = new PriorityQueue<E>(initialCapacity, null);
39 }
40
41 /**
42 * Create a new priority queue with the specified initial capacity (11)
43 * that orders its elements according to the specified comparator.
44 *
45 * @param initialCapacity the initial capacity for this priority queue.
46 * @param comparator the comparator used to order this priority queue.
47 */
48 public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
49 q = new PriorityQueue<E>(initialCapacity, comparator);
50 }
51
52 /**
53 * Create a new priority queue containing the elements in the specified
54 * collection. The priority queue has an initial capacity of 110% of the
55 * size of the specified collection. If the specified collection
56 * implements the {@link Sorted} interface, the priority queue will be
57 * sorted according to the same comparator, or according to its elements'
58 * natural order if the collection is sorted according to its elements'
59 * natural order. If the specified collection does not implement the
60 * <tt>Sorted</tt> interface, the priority queue is ordered according to
61 * its elements' natural order.
62 *
63 * @param initialElements the collection whose elements are to be placed
64 * into this priority queue.
65 * @throws ClassCastException if elements of the specified collection
66 * cannot be compared to one another according to the priority
67 * queue's ordering.
68 * @throws NullPointerException if the specified collection or an
69 * element of the specified collection is <tt>null</tt>.
70 */
71 public PriorityBlockingQueue(Collection<E> initialElements) {
72 q = new PriorityQueue<E>(initialElements);
73 }
74
75 public boolean offer(E x) {
76 if (x == null) throw new NullPointerException();
77 lock.lock();
78 try {
79 boolean ok = q.offer(x);
80 assert ok;
81 notEmpty.signal();
82 return true;
83 }
84 finally {
85 lock.unlock();
86 }
87 }
88
89 public void put(E x) throws InterruptedException {
90 offer(x); // never need to block
91 }
92
93 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
94 return offer(x); // never need to block
95 }
96
97 public E take() throws InterruptedException {
98 lock.lockInterruptibly();
99 try {
100 try {
101 while (q.size() == 0)
102 notEmpty.await();
103 }
104 catch (InterruptedException ie) {
105 notEmpty.signal(); // propagate to non-interrupted thread
106 throw ie;
107 }
108 E x = q.poll();
109 assert x != null;
110 return x;
111 }
112 finally {
113 lock.unlock();
114 }
115 }
116
117
118 public E poll() {
119 lock.lock();
120 try {
121 return q.poll();
122 }
123 finally {
124 lock.unlock();
125 }
126 }
127
128 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
129 lock.lockInterruptibly();
130 long nanos = unit.toNanos(timeout);
131 try {
132 for (;;) {
133 E x = q.poll();
134 if (x != null)
135 return x;
136 if (nanos <= 0)
137 return null;
138 try {
139 nanos = notEmpty.awaitNanos(nanos);
140 }
141 catch (InterruptedException ie) {
142 notEmpty.signal(); // propagate to non-interrupted thread
143 throw ie;
144 }
145
146 }
147 }
148 finally {
149 lock.unlock();
150 }
151 }
152
153 public E peek() {
154 lock.lock();
155 try {
156 return q.peek();
157 }
158 finally {
159 lock.unlock();
160 }
161 }
162
163 public int size() {
164 lock.lock();
165 try {
166 return q.size();
167 }
168 finally {
169 lock.unlock();
170 }
171 }
172
173 /**
174 * Always returns <tt>Integer.MAX_VALUE</tt> because
175 * PriorityBlockingQueues are not capacity constrained.
176 * @return <tt>Integer.MAX_VALUE</tt>
177 */
178 public int remainingCapacity() {
179 return Integer.MAX_VALUE;
180 }
181
182 public boolean remove(Object x) {
183 lock.lock();
184 try {
185 return q.remove(x);
186 }
187 finally {
188 lock.unlock();
189 }
190 }
191
192 public boolean contains(Object x) {
193 lock.lock();
194 try {
195 return q.contains(x);
196 }
197 finally {
198 lock.unlock();
199 }
200 }
201
202 public Object[] toArray() {
203 lock.lock();
204 try {
205 return q.toArray();
206 }
207 finally {
208 lock.unlock();
209 }
210 }
211
212
213 public String toString() {
214 lock.lock();
215 try {
216 return q.toString();
217 }
218 finally {
219 lock.unlock();
220 }
221 }
222
223 public <T> T[] toArray(T[] a) {
224 lock.lock();
225 try {
226 return q.toArray(a);
227 }
228 finally {
229 lock.unlock();
230 }
231 }
232
233 public Iterator<E> iterator() {
234 lock.lock();
235 try {
236 return new Itr(q.iterator());
237 }
238 finally {
239 lock.unlock();
240 }
241 }
242
243 private class Itr<E> implements Iterator<E> {
244 private final Iterator<E> iter;
245 Itr(Iterator<E> i) {
246 iter = i;
247 }
248
249 public boolean hasNext() {
250 /*
251 * No sync -- we rely on underlying hasNext to be
252 * stateless, in which case we can return true by mistake
253 * only when next() willl subsequently throw
254 * ConcurrentModificationException.
255 */
256 return iter.hasNext();
257 }
258
259 public E next() {
260 lock.lock();
261 try {
262 return iter.next();
263 }
264 finally {
265 lock.unlock();
266 }
267 }
268
269 public void remove() {
270 lock.lock();
271 try {
272 iter.remove();
273 }
274 finally {
275 lock.unlock();
276 }
277 }
278 }
279
280 /**
281 * Save the state to a stream (that is, serialize it). This
282 * merely wraps default serialization within lock. The
283 * serialization strategy for items is left to underlying
284 * Queue. Note that locking is not needed on deserialization, so
285 * readObject is not defined, just relying on default.
286 */
287 private void writeObject(java.io.ObjectOutputStream s)
288 throws java.io.IOException {
289 lock.lock();
290 try {
291 s.defaultWriteObject();
292 }
293 finally {
294 lock.unlock();
295 }
296 }
297
298 }