ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.14
Committed: Tue Aug 5 06:32:02 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.13: +26 -23 lines
Log Message:
Regressed to the unbounded form

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.13 * An unbounded {@link BlockingQueue blocking queue} based on a
14 dholmes 1.10 * {@link PriorityQueue},
15 dl 1.5 * obeying its ordering rules and implementation characteristics.
16 dl 1.6 * @since 1.5
17     * @author Doug Lea
18 dholmes 1.14 */
19 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
20 dholmes 1.14 implements Sorted, BlockingQueue<E>, java.io.Serializable {
21 tim 1.1
22 dl 1.5 private final PriorityQueue<E> q;
23 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
24 dl 1.5 private final Condition notEmpty = lock.newCondition();
25    
26 dl 1.2 /**
27 dholmes 1.14 * Create a <tt>PriorityBlockingQueue</tt> with the default initial
28     * capacity
29 dholmes 1.10 * (11) that orders its elements according to their natural
30     * ordering (using <tt>Comparable</tt>.)
31 dl 1.2 */
32     public PriorityBlockingQueue() {
33 dl 1.5 q = new PriorityQueue<E>();
34 dl 1.2 }
35    
36     /**
37 tim 1.13 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
38 dholmes 1.10 * capacity
39     * that orders its elements according to their natural ordering
40     * (using <tt>Comparable</tt>.)
41 dl 1.2 *
42     * @param initialCapacity the initial capacity for this priority queue.
43     */
44     public PriorityBlockingQueue(int initialCapacity) {
45 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
46 dl 1.2 }
47    
48     /**
49 tim 1.13 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
50 dholmes 1.10 * capacity
51 dl 1.2 * that orders its elements according to the specified comparator.
52     *
53     * @param initialCapacity the initial capacity for this priority queue.
54     * @param comparator the comparator used to order this priority queue.
55 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
56     * ordering.
57 dl 1.2 */
58 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
59 dholmes 1.14 Comparator<? super E> comparator) {
60 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
61 dl 1.2 }
62    
63     /**
64 tim 1.13 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
65 dholmes 1.10 * in the specified
66 dl 1.2 * collection. The priority queue has an initial capacity of 110% of the
67     * size of the specified collection. If the specified collection
68     * implements the {@link Sorted} interface, the priority queue will be
69     * sorted according to the same comparator, or according to its elements'
70     * natural order if the collection is sorted according to its elements'
71 dholmes 1.10 * natural order. If the specified collection does not implement
72     * <tt>Sorted</tt>, the priority queue is ordered according to
73 dl 1.2 * its elements' natural order.
74     *
75 dholmes 1.14 * @param c the collection whose elements are to be placed
76 dl 1.2 * into this priority queue.
77     * @throws ClassCastException if elements of the specified collection
78     * cannot be compared to one another according to the priority
79     * queue's ordering.
80 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
81     * is <tt>null</tt>
82 dl 1.2 */
83 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
84     q = new PriorityQueue<E>(c);
85 dl 1.7 }
86    
87 dholmes 1.10
88     // these first two override just to get the throws docs
89    
90     /**
91 dholmes 1.14 * @throws NullPointerException {@inheritDoc}
92 dholmes 1.10 */
93     public boolean add(E element) {
94     return super.add(element);
95     }
96    
97 tim 1.13 /**
98 dholmes 1.14 * @throws NullPointerException {@inheritDoc}
99 tim 1.13 */
100     public boolean addAll(Collection<? extends E> c) {
101     return super.addAll(c);
102     }
103 dholmes 1.10
104 dl 1.7 public Comparator comparator() {
105     return q.comparator();
106 dl 1.5 }
107    
108 dholmes 1.14 /**
109     * @throws NullPointerException if the specified element is <tt>null</tt>
110     **/
111     public boolean offer(E o) {
112     if (o == null) throw new NullPointerException();
113 dl 1.5 lock.lock();
114     try {
115 dholmes 1.14 boolean ok = q.offer(o);
116 dl 1.5 assert ok;
117     notEmpty.signal();
118     return true;
119     }
120     finally {
121 tim 1.13 lock.unlock();
122 dl 1.5 }
123     }
124    
125 dholmes 1.14 public void put(E o) throws InterruptedException {
126     offer(o); // never need to block
127 dl 1.5 }
128    
129 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
130 dholmes 1.10 throws InterruptedException {
131 dholmes 1.14 return offer(o); // never need to block
132 dl 1.5 }
133    
134     public E take() throws InterruptedException {
135     lock.lockInterruptibly();
136     try {
137     try {
138     while (q.size() == 0)
139     notEmpty.await();
140     }
141     catch (InterruptedException ie) {
142     notEmpty.signal(); // propagate to non-interrupted thread
143     throw ie;
144     }
145     E x = q.poll();
146     assert x != null;
147     return x;
148     }
149     finally {
150     lock.unlock();
151     }
152     }
153    
154    
155     public E poll() {
156     lock.lock();
157     try {
158     return q.poll();
159     }
160     finally {
161 tim 1.13 lock.unlock();
162 dl 1.5 }
163     }
164    
165     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
166 dholmes 1.10 long nanos = unit.toNanos(timeout);
167 dl 1.5 lock.lockInterruptibly();
168     try {
169     for (;;) {
170     E x = q.poll();
171 tim 1.13 if (x != null)
172 dl 1.5 return x;
173     if (nanos <= 0)
174     return null;
175     try {
176     nanos = notEmpty.awaitNanos(nanos);
177     }
178     catch (InterruptedException ie) {
179     notEmpty.signal(); // propagate to non-interrupted thread
180     throw ie;
181     }
182     }
183     }
184     finally {
185     lock.unlock();
186     }
187     }
188    
189     public E peek() {
190     lock.lock();
191     try {
192     return q.peek();
193     }
194     finally {
195 tim 1.13 lock.unlock();
196 dl 1.5 }
197     }
198    
199     public int size() {
200     lock.lock();
201     try {
202     return q.size();
203     }
204     finally {
205     lock.unlock();
206     }
207     }
208    
209     /**
210     * Always returns <tt>Integer.MAX_VALUE</tt> because
211     * PriorityBlockingQueues are not capacity constrained.
212     * @return <tt>Integer.MAX_VALUE</tt>
213     */
214     public int remainingCapacity() {
215     return Integer.MAX_VALUE;
216     }
217    
218 dholmes 1.14 public boolean remove(Object o) {
219 dl 1.5 lock.lock();
220     try {
221 dholmes 1.14 return q.remove(o);
222 dl 1.5 }
223     finally {
224     lock.unlock();
225     }
226     }
227    
228 dholmes 1.14 public boolean contains(Object o) {
229 dl 1.5 lock.lock();
230     try {
231 dholmes 1.14 return q.contains(o);
232 dl 1.5 }
233     finally {
234     lock.unlock();
235     }
236     }
237    
238     public Object[] toArray() {
239     lock.lock();
240     try {
241     return q.toArray();
242     }
243     finally {
244     lock.unlock();
245     }
246     }
247    
248    
249     public String toString() {
250     lock.lock();
251     try {
252     return q.toString();
253     }
254     finally {
255     lock.unlock();
256     }
257     }
258    
259     public <T> T[] toArray(T[] a) {
260     lock.lock();
261     try {
262     return q.toArray(a);
263     }
264     finally {
265     lock.unlock();
266     }
267     }
268    
269     public Iterator<E> iterator() {
270     lock.lock();
271     try {
272     return new Itr(q.iterator());
273     }
274     finally {
275     lock.unlock();
276     }
277     }
278    
279     private class Itr<E> implements Iterator<E> {
280     private final Iterator<E> iter;
281 tim 1.13 Itr(Iterator<E> i) {
282     iter = i;
283 dl 1.5 }
284    
285 tim 1.13 public boolean hasNext() {
286 dl 1.5 /*
287     * No sync -- we rely on underlying hasNext to be
288     * stateless, in which case we can return true by mistake
289     * only when next() willl subsequently throw
290     * ConcurrentModificationException.
291     */
292     return iter.hasNext();
293 tim 1.13 }
294    
295     public E next() {
296 dl 1.5 lock.lock();
297     try {
298     return iter.next();
299     }
300     finally {
301     lock.unlock();
302     }
303 tim 1.13 }
304    
305     public void remove() {
306 dl 1.5 lock.lock();
307     try {
308     iter.remove();
309     }
310     finally {
311     lock.unlock();
312     }
313 tim 1.13 }
314 dl 1.5 }
315    
316     /**
317     * Save the state to a stream (that is, serialize it). This
318     * merely wraps default serialization within lock. The
319     * serialization strategy for items is left to underlying
320     * Queue. Note that locking is not needed on deserialization, so
321     * readObject is not defined, just relying on default.
322     */
323     private void writeObject(java.io.ObjectOutputStream s)
324     throws java.io.IOException {
325     lock.lock();
326     try {
327     s.defaultWriteObject();
328     }
329     finally {
330     lock.unlock();
331     }
332 tim 1.1 }
333    
334     }