ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.15
Committed: Tue Aug 5 12:11:14 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.14: +15 -13 lines
Log Message:
Remove Sorted interface, adjust PQ and PBQ

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 tim 1.13
9 dl 1.8 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.15 * An unbounded {@link BlockingQueue blocking queue} based on a {@link
14     * PriorityQueue}, obeying its ordering rules and implementation
15     * characteristics. While this queue is logically unbounded,
16     * attempted additions may fail due to resource exhaustion (causing
17     * <tt>OutOfMemoryError</tt>) when <tt>Integer.MAX_VALUE</tt> elements
18     * are held.
19 dl 1.6 * @since 1.5
20     * @author Doug Lea
21 dholmes 1.14 */
22 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
23 dl 1.15 implements BlockingQueue<E>, java.io.Serializable {
24 tim 1.1
25 dl 1.5 private final PriorityQueue<E> q;
26 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
27 dl 1.5 private final Condition notEmpty = lock.newCondition();
28    
29 dl 1.2 /**
30 dholmes 1.14 * Create a <tt>PriorityBlockingQueue</tt> with the default initial
31     * capacity
32 dholmes 1.10 * (11) that orders its elements according to their natural
33     * ordering (using <tt>Comparable</tt>.)
34 dl 1.2 */
35     public PriorityBlockingQueue() {
36 dl 1.5 q = new PriorityQueue<E>();
37 dl 1.2 }
38    
39     /**
40 tim 1.13 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
41 dholmes 1.10 * capacity
42     * that orders its elements according to their natural ordering
43     * (using <tt>Comparable</tt>.)
44 dl 1.2 *
45     * @param initialCapacity the initial capacity for this priority queue.
46     */
47     public PriorityBlockingQueue(int initialCapacity) {
48 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
49 dl 1.2 }
50    
51     /**
52 tim 1.13 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
53 dholmes 1.10 * capacity
54 dl 1.2 * that orders its elements according to the specified comparator.
55     *
56     * @param initialCapacity the initial capacity for this priority queue.
57     * @param comparator the comparator used to order this priority queue.
58 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
59     * ordering.
60 dl 1.2 */
61 tim 1.13 public PriorityBlockingQueue(int initialCapacity,
62 dholmes 1.14 Comparator<? super E> comparator) {
63 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
64 dl 1.2 }
65    
66     /**
67 tim 1.13 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
68 dl 1.15 * in the specified collection. The priority queue has an initial
69     * capacity of 110% of the size of the specified collection. If
70     * the specified collection is a {@link SortedSet} or a {@link
71     * PriorityQueue}, this priority queue will be sorted according to
72     * the same comparator, or according to its elements' natural
73     * order if the collection is sorted according to its elements'
74     * natural order. Otherwise, this priority queue is ordered
75     * according to its elements' natural order.
76 dl 1.2 *
77 dholmes 1.14 * @param c the collection whose elements are to be placed
78 dl 1.2 * into this priority queue.
79     * @throws ClassCastException if elements of the specified collection
80     * cannot be compared to one another according to the priority
81     * queue's ordering.
82 dholmes 1.14 * @throws NullPointerException if <tt>c</tt> or any element within it
83     * is <tt>null</tt>
84 dl 1.2 */
85 dholmes 1.14 public PriorityBlockingQueue(Collection<? extends E> c) {
86     q = new PriorityQueue<E>(c);
87 dl 1.7 }
88    
89 dholmes 1.10
90     // these first two override just to get the throws docs
91    
92     /**
93 dholmes 1.14 * @throws NullPointerException {@inheritDoc}
94 dholmes 1.10 */
95     public boolean add(E element) {
96     return super.add(element);
97     }
98    
99 tim 1.13 /**
100 dholmes 1.14 * @throws NullPointerException {@inheritDoc}
101 tim 1.13 */
102     public boolean addAll(Collection<? extends E> c) {
103     return super.addAll(c);
104     }
105 dholmes 1.10
106 dl 1.7 public Comparator comparator() {
107     return q.comparator();
108 dl 1.5 }
109    
110 dholmes 1.14 /**
111     * @throws NullPointerException if the specified element is <tt>null</tt>
112     **/
113     public boolean offer(E o) {
114     if (o == null) throw new NullPointerException();
115 dl 1.5 lock.lock();
116     try {
117 dholmes 1.14 boolean ok = q.offer(o);
118 dl 1.5 assert ok;
119     notEmpty.signal();
120     return true;
121     }
122     finally {
123 tim 1.13 lock.unlock();
124 dl 1.5 }
125     }
126    
127 dholmes 1.14 public void put(E o) throws InterruptedException {
128     offer(o); // never need to block
129 dl 1.5 }
130    
131 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
132 dholmes 1.10 throws InterruptedException {
133 dholmes 1.14 return offer(o); // never need to block
134 dl 1.5 }
135    
136     public E take() throws InterruptedException {
137     lock.lockInterruptibly();
138     try {
139     try {
140     while (q.size() == 0)
141     notEmpty.await();
142     }
143     catch (InterruptedException ie) {
144     notEmpty.signal(); // propagate to non-interrupted thread
145     throw ie;
146     }
147     E x = q.poll();
148     assert x != null;
149     return x;
150     }
151     finally {
152     lock.unlock();
153     }
154     }
155    
156    
157     public E poll() {
158     lock.lock();
159     try {
160     return q.poll();
161     }
162     finally {
163 tim 1.13 lock.unlock();
164 dl 1.5 }
165     }
166    
167     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
168 dholmes 1.10 long nanos = unit.toNanos(timeout);
169 dl 1.5 lock.lockInterruptibly();
170     try {
171     for (;;) {
172     E x = q.poll();
173 tim 1.13 if (x != null)
174 dl 1.5 return x;
175     if (nanos <= 0)
176     return null;
177     try {
178     nanos = notEmpty.awaitNanos(nanos);
179     }
180     catch (InterruptedException ie) {
181     notEmpty.signal(); // propagate to non-interrupted thread
182     throw ie;
183     }
184     }
185     }
186     finally {
187     lock.unlock();
188     }
189     }
190    
191     public E peek() {
192     lock.lock();
193     try {
194     return q.peek();
195     }
196     finally {
197 tim 1.13 lock.unlock();
198 dl 1.5 }
199     }
200    
201     public int size() {
202     lock.lock();
203     try {
204     return q.size();
205     }
206     finally {
207     lock.unlock();
208     }
209     }
210    
211     /**
212     * Always returns <tt>Integer.MAX_VALUE</tt> because
213     * PriorityBlockingQueues are not capacity constrained.
214     * @return <tt>Integer.MAX_VALUE</tt>
215     */
216     public int remainingCapacity() {
217     return Integer.MAX_VALUE;
218     }
219    
220 dholmes 1.14 public boolean remove(Object o) {
221 dl 1.5 lock.lock();
222     try {
223 dholmes 1.14 return q.remove(o);
224 dl 1.5 }
225     finally {
226     lock.unlock();
227     }
228     }
229    
230 dholmes 1.14 public boolean contains(Object o) {
231 dl 1.5 lock.lock();
232     try {
233 dholmes 1.14 return q.contains(o);
234 dl 1.5 }
235     finally {
236     lock.unlock();
237     }
238     }
239    
240     public Object[] toArray() {
241     lock.lock();
242     try {
243     return q.toArray();
244     }
245     finally {
246     lock.unlock();
247     }
248     }
249    
250    
251     public String toString() {
252     lock.lock();
253     try {
254     return q.toString();
255     }
256     finally {
257     lock.unlock();
258     }
259     }
260    
261     public <T> T[] toArray(T[] a) {
262     lock.lock();
263     try {
264     return q.toArray(a);
265     }
266     finally {
267     lock.unlock();
268     }
269     }
270    
271     public Iterator<E> iterator() {
272     lock.lock();
273     try {
274     return new Itr(q.iterator());
275     }
276     finally {
277     lock.unlock();
278     }
279     }
280    
281     private class Itr<E> implements Iterator<E> {
282     private final Iterator<E> iter;
283 tim 1.13 Itr(Iterator<E> i) {
284     iter = i;
285 dl 1.5 }
286    
287 tim 1.13 public boolean hasNext() {
288 dl 1.5 /*
289     * No sync -- we rely on underlying hasNext to be
290     * stateless, in which case we can return true by mistake
291     * only when next() willl subsequently throw
292     * ConcurrentModificationException.
293     */
294     return iter.hasNext();
295 tim 1.13 }
296    
297     public E next() {
298 dl 1.5 lock.lock();
299     try {
300     return iter.next();
301     }
302     finally {
303     lock.unlock();
304     }
305 tim 1.13 }
306    
307     public void remove() {
308 dl 1.5 lock.lock();
309     try {
310     iter.remove();
311     }
312     finally {
313     lock.unlock();
314     }
315 tim 1.13 }
316 dl 1.5 }
317    
318     /**
319     * Save the state to a stream (that is, serialize it). This
320     * merely wraps default serialization within lock. The
321     * serialization strategy for items is left to underlying
322     * Queue. Note that locking is not needed on deserialization, so
323     * readObject is not defined, just relying on default.
324     */
325     private void writeObject(java.io.ObjectOutputStream s)
326     throws java.io.IOException {
327     lock.lock();
328     try {
329     s.defaultWriteObject();
330     }
331     finally {
332     lock.unlock();
333     }
334 tim 1.1 }
335    
336     }