ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.8
Committed: Tue Jul 8 00:46:34 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.7: +1 -1 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * An unbounded blocking queue based on a {@link PriorityQueue},
13 * obeying its ordering rules and implementation characteristics.
14 * @since 1.5
15 * @author Doug Lea
16 **/
17 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
18 implements BlockingQueue<E>, java.io.Serializable {
19
20 private final PriorityQueue<E> q;
21 private final ReentrantLock lock = new ReentrantLock();
22 private final Condition notEmpty = lock.newCondition();
23
24 /**
25 * Create a new priority queue with the default initial capacity (11)
26 * that orders its elements according to their natural ordering.
27 */
28 public PriorityBlockingQueue() {
29 q = new PriorityQueue<E>();
30 }
31
32 /**
33 * Create a new priority queue with the specified initial capacity
34 * that orders its elements according to their natural ordering.
35 *
36 * @param initialCapacity the initial capacity for this priority queue.
37 */
38 public PriorityBlockingQueue(int initialCapacity) {
39 q = new PriorityQueue<E>(initialCapacity, null);
40 }
41
42 /**
43 * Create a new priority queue with the specified initial capacity (11)
44 * that orders its elements according to the specified comparator.
45 *
46 * @param initialCapacity the initial capacity for this priority queue.
47 * @param comparator the comparator used to order this priority queue.
48 */
49 public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
50 q = new PriorityQueue<E>(initialCapacity, comparator);
51 }
52
53 /**
54 * Create a new priority queue containing the elements in the specified
55 * collection. The priority queue has an initial capacity of 110% of the
56 * size of the specified collection. If the specified collection
57 * implements the {@link Sorted} interface, the priority queue will be
58 * sorted according to the same comparator, or according to its elements'
59 * natural order if the collection is sorted according to its elements'
60 * natural order. If the specified collection does not implement the
61 * <tt>Sorted</tt> interface, the priority queue is ordered according to
62 * its elements' natural order.
63 *
64 * @param initialElements the collection whose elements are to be placed
65 * into this priority queue.
66 * @throws ClassCastException if elements of the specified collection
67 * cannot be compared to one another according to the priority
68 * queue's ordering.
69 * @throws NullPointerException if the specified collection or an
70 * element of the specified collection is <tt>null</tt>.
71 */
72 public PriorityBlockingQueue(Collection<E> initialElements) {
73 q = new PriorityQueue<E>(initialElements);
74 }
75
76 /**
77 * Returns the comparator associated with this priority queue, or
78 * <tt>null</tt> if it uses its elements' natural ordering.
79 *
80 * @return the comparator associated with this priority queue, or
81 * <tt>null</tt> if it uses its elements' natural ordering.
82 */
83 public Comparator comparator() {
84 return q.comparator();
85 }
86
87 public boolean offer(E x) {
88 if (x == null) throw new NullPointerException();
89 lock.lock();
90 try {
91 boolean ok = q.offer(x);
92 assert ok;
93 notEmpty.signal();
94 return true;
95 }
96 finally {
97 lock.unlock();
98 }
99 }
100
101 public void put(E x) throws InterruptedException {
102 offer(x); // never need to block
103 }
104
105 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
106 return offer(x); // never need to block
107 }
108
109 public E take() throws InterruptedException {
110 lock.lockInterruptibly();
111 try {
112 try {
113 while (q.size() == 0)
114 notEmpty.await();
115 }
116 catch (InterruptedException ie) {
117 notEmpty.signal(); // propagate to non-interrupted thread
118 throw ie;
119 }
120 E x = q.poll();
121 assert x != null;
122 return x;
123 }
124 finally {
125 lock.unlock();
126 }
127 }
128
129
130 public E poll() {
131 lock.lock();
132 try {
133 return q.poll();
134 }
135 finally {
136 lock.unlock();
137 }
138 }
139
140 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
141 lock.lockInterruptibly();
142 long nanos = unit.toNanos(timeout);
143 try {
144 for (;;) {
145 E x = q.poll();
146 if (x != null)
147 return x;
148 if (nanos <= 0)
149 return null;
150 try {
151 nanos = notEmpty.awaitNanos(nanos);
152 }
153 catch (InterruptedException ie) {
154 notEmpty.signal(); // propagate to non-interrupted thread
155 throw ie;
156 }
157
158 }
159 }
160 finally {
161 lock.unlock();
162 }
163 }
164
165 public E peek() {
166 lock.lock();
167 try {
168 return q.peek();
169 }
170 finally {
171 lock.unlock();
172 }
173 }
174
175 public int size() {
176 lock.lock();
177 try {
178 return q.size();
179 }
180 finally {
181 lock.unlock();
182 }
183 }
184
185 /**
186 * Always returns <tt>Integer.MAX_VALUE</tt> because
187 * PriorityBlockingQueues are not capacity constrained.
188 * @return <tt>Integer.MAX_VALUE</tt>
189 */
190 public int remainingCapacity() {
191 return Integer.MAX_VALUE;
192 }
193
194 public boolean remove(Object x) {
195 lock.lock();
196 try {
197 return q.remove(x);
198 }
199 finally {
200 lock.unlock();
201 }
202 }
203
204 public boolean contains(Object x) {
205 lock.lock();
206 try {
207 return q.contains(x);
208 }
209 finally {
210 lock.unlock();
211 }
212 }
213
214 public Object[] toArray() {
215 lock.lock();
216 try {
217 return q.toArray();
218 }
219 finally {
220 lock.unlock();
221 }
222 }
223
224
225 public String toString() {
226 lock.lock();
227 try {
228 return q.toString();
229 }
230 finally {
231 lock.unlock();
232 }
233 }
234
235 public <T> T[] toArray(T[] a) {
236 lock.lock();
237 try {
238 return q.toArray(a);
239 }
240 finally {
241 lock.unlock();
242 }
243 }
244
245 public Iterator<E> iterator() {
246 lock.lock();
247 try {
248 return new Itr(q.iterator());
249 }
250 finally {
251 lock.unlock();
252 }
253 }
254
255 private class Itr<E> implements Iterator<E> {
256 private final Iterator<E> iter;
257 Itr(Iterator<E> i) {
258 iter = i;
259 }
260
261 public boolean hasNext() {
262 /*
263 * No sync -- we rely on underlying hasNext to be
264 * stateless, in which case we can return true by mistake
265 * only when next() willl subsequently throw
266 * ConcurrentModificationException.
267 */
268 return iter.hasNext();
269 }
270
271 public E next() {
272 lock.lock();
273 try {
274 return iter.next();
275 }
276 finally {
277 lock.unlock();
278 }
279 }
280
281 public void remove() {
282 lock.lock();
283 try {
284 iter.remove();
285 }
286 finally {
287 lock.unlock();
288 }
289 }
290 }
291
292 /**
293 * Save the state to a stream (that is, serialize it). This
294 * merely wraps default serialization within lock. The
295 * serialization strategy for items is left to underlying
296 * Queue. Note that locking is not needed on deserialization, so
297 * readObject is not defined, just relying on default.
298 */
299 private void writeObject(java.io.ObjectOutputStream s)
300 throws java.io.IOException {
301 lock.lock();
302 try {
303 s.defaultWriteObject();
304 }
305 finally {
306 lock.unlock();
307 }
308 }
309
310 }