ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.5
Committed: Sun Jun 22 21:47:17 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.4: +234 -9 lines
Log Message:
Split ArrayBlockingQueue and PriorityBlockingQueue to no longer subclass AbstractBlockingQueueFromQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * An unbounded blocking queue based on a {@link PriorityQueue},
12 * obeying its ordering rules and implementation characteristics.
13 **/
14 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
15 implements BlockingQueue<E>, java.io.Serializable {
16
17 private final PriorityQueue<E> q;
18 private final FairReentrantLock lock = new FairReentrantLock();
19 private final Condition notEmpty = lock.newCondition();
20
21 /**
22 * Create a new priority queue with the default initial capacity (11)
23 * that orders its elements according to their natural ordering.
24 */
25 public PriorityBlockingQueue() {
26 q = new PriorityQueue<E>();
27 }
28
29 /**
30 * Create a new priority queue with the specified initial capacity
31 * that orders its elements according to their natural ordering.
32 *
33 * @param initialCapacity the initial capacity for this priority queue.
34 */
35 public PriorityBlockingQueue(int initialCapacity) {
36 q = new PriorityQueue<E>(initialCapacity, null);
37 }
38
39 /**
40 * Create a new priority queue with the specified initial capacity (11)
41 * that orders its elements according to the specified comparator.
42 *
43 * @param initialCapacity the initial capacity for this priority queue.
44 * @param comparator the comparator used to order this priority queue.
45 */
46 public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
47 q = new PriorityQueue<E>(initialCapacity, comparator);
48 }
49
50 /**
51 * Create a new priority queue containing the elements in the specified
52 * collection. The priority queue has an initial capacity of 110% of the
53 * size of the specified collection. If the specified collection
54 * implements the {@link Sorted} interface, the priority queue will be
55 * sorted according to the same comparator, or according to its elements'
56 * natural order if the collection is sorted according to its elements'
57 * natural order. If the specified collection does not implement the
58 * <tt>Sorted</tt> interface, the priority queue is ordered according to
59 * its elements' natural order.
60 *
61 * @param initialElements the collection whose elements are to be placed
62 * into this priority queue.
63 * @throws ClassCastException if elements of the specified collection
64 * cannot be compared to one another according to the priority
65 * queue's ordering.
66 * @throws NullPointerException if the specified collection or an
67 * element of the specified collection is <tt>null</tt>.
68 */
69 public PriorityBlockingQueue(Collection<E> initialElements) {
70 q = new PriorityQueue<E>(initialElements);
71 }
72
73 public boolean offer(E x) {
74 if (x == null) throw new IllegalArgumentException();
75 lock.lock();
76 try {
77 boolean ok = q.offer(x);
78 assert ok;
79 notEmpty.signal();
80 return true;
81 }
82 finally {
83 lock.unlock();
84 }
85 }
86
87 public void put(E x) throws InterruptedException {
88 offer(x); // never need to block
89 }
90
91 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
92 return offer(x); // never need to block
93 }
94
95 public E take() throws InterruptedException {
96 lock.lockInterruptibly();
97 try {
98 try {
99 while (q.size() == 0)
100 notEmpty.await();
101 }
102 catch (InterruptedException ie) {
103 notEmpty.signal(); // propagate to non-interrupted thread
104 throw ie;
105 }
106 E x = q.poll();
107 assert x != null;
108 return x;
109 }
110 finally {
111 lock.unlock();
112 }
113 }
114
115
116 public E poll() {
117 lock.lock();
118 try {
119 return q.poll();
120 }
121 finally {
122 lock.unlock();
123 }
124 }
125
126 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
127 lock.lockInterruptibly();
128 long nanos = unit.toNanos(timeout);
129 try {
130 for (;;) {
131 E x = q.poll();
132 if (x != null)
133 return x;
134 if (nanos <= 0)
135 return null;
136 try {
137 nanos = notEmpty.awaitNanos(nanos);
138 }
139 catch (InterruptedException ie) {
140 notEmpty.signal(); // propagate to non-interrupted thread
141 throw ie;
142 }
143
144 }
145 }
146 finally {
147 lock.unlock();
148 }
149 }
150
151 public E peek() {
152 lock.lock();
153 try {
154 return q.peek();
155 }
156 finally {
157 lock.unlock();
158 }
159 }
160
161 public int size() {
162 lock.lock();
163 try {
164 return q.size();
165 }
166 finally {
167 lock.unlock();
168 }
169 }
170
171 /**
172 * Always returns <tt>Integer.MAX_VALUE</tt> because
173 * PriorityBlockingQueues are not capacity constrained.
174 * @return <tt>Integer.MAX_VALUE</tt>
175 */
176 public int remainingCapacity() {
177 return Integer.MAX_VALUE;
178 }
179
180 public boolean remove(Object x) {
181 lock.lock();
182 try {
183 return q.remove(x);
184 }
185 finally {
186 lock.unlock();
187 }
188 }
189
190 public boolean contains(Object x) {
191 lock.lock();
192 try {
193 return q.contains(x);
194 }
195 finally {
196 lock.unlock();
197 }
198 }
199
200 public Object[] toArray() {
201 lock.lock();
202 try {
203 return q.toArray();
204 }
205 finally {
206 lock.unlock();
207 }
208 }
209
210
211 public String toString() {
212 lock.lock();
213 try {
214 return q.toString();
215 }
216 finally {
217 lock.unlock();
218 }
219 }
220
221 public <T> T[] toArray(T[] a) {
222 lock.lock();
223 try {
224 return q.toArray(a);
225 }
226 finally {
227 lock.unlock();
228 }
229 }
230
231 public Iterator<E> iterator() {
232 lock.lock();
233 try {
234 return new Itr(q.iterator());
235 }
236 finally {
237 lock.unlock();
238 }
239 }
240
241 private class Itr<E> implements Iterator<E> {
242 private final Iterator<E> iter;
243 Itr(Iterator<E> i) {
244 iter = i;
245 }
246
247 public boolean hasNext() {
248 /*
249 * No sync -- we rely on underlying hasNext to be
250 * stateless, in which case we can return true by mistake
251 * only when next() willl subsequently throw
252 * ConcurrentModificationException.
253 */
254 return iter.hasNext();
255 }
256
257 public E next() {
258 lock.lock();
259 try {
260 return iter.next();
261 }
262 finally {
263 lock.unlock();
264 }
265 }
266
267 public void remove() {
268 lock.lock();
269 try {
270 iter.remove();
271 }
272 finally {
273 lock.unlock();
274 }
275 }
276 }
277
278 /**
279 * Save the state to a stream (that is, serialize it). This
280 * merely wraps default serialization within lock. The
281 * serialization strategy for items is left to underlying
282 * Queue. Note that locking is not needed on deserialization, so
283 * readObject is not defined, just relying on default.
284 */
285 private void writeObject(java.io.ObjectOutputStream s)
286 throws java.io.IOException {
287 lock.lock();
288 try {
289 s.defaultWriteObject();
290 }
291 finally {
292 lock.unlock();
293 }
294 }
295
296 }