ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.11
Committed: Mon Jul 28 09:40:14 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.10: +6 -6 lines
Log Message:
Commented out BlockingQueue.addAll; changed a few signatures to please 2.2 compiler

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dholmes 1.10 * An unbounded {@link BlockingQueue blocking queue} based on a
13     * {@link PriorityQueue},
14 dl 1.5 * obeying its ordering rules and implementation characteristics.
15 dl 1.6 * @since 1.5
16     * @author Doug Lea
17     **/
18 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
19 dholmes 1.10 implements BlockingQueue<E>, Sorted, java.io.Serializable {
20 tim 1.1
21 dl 1.5 private final PriorityQueue<E> q;
22 dl 1.9 private final ReentrantLock lock = new ReentrantLock(true);
23 dl 1.5 private final Condition notEmpty = lock.newCondition();
24    
25 dl 1.2 /**
26 dholmes 1.10 * Create a <tt>PriorityBlockingQueue</tt> with the default initial capacity
27     * (11) that orders its elements according to their natural
28     * ordering (using <tt>Comparable</tt>.)
29 dl 1.2 */
30     public PriorityBlockingQueue() {
31 dl 1.5 q = new PriorityQueue<E>();
32 dl 1.2 }
33    
34     /**
35 dholmes 1.10 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
36     * capacity
37     * that orders its elements according to their natural ordering
38     * (using <tt>Comparable</tt>.)
39 dl 1.2 *
40     * @param initialCapacity the initial capacity for this priority queue.
41     */
42     public PriorityBlockingQueue(int initialCapacity) {
43 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
44 dl 1.2 }
45    
46     /**
47 dholmes 1.10 * Create a <tt>PriorityBlockingQueue</tt> with the specified initial
48     * capacity
49 dl 1.2 * that orders its elements according to the specified comparator.
50     *
51     * @param initialCapacity the initial capacity for this priority queue.
52     * @param comparator the comparator used to order this priority queue.
53 dholmes 1.10 * If <tt>null</tt> then the order depends on the elements' natural
54     * ordering.
55 dl 1.2 */
56 dholmes 1.10 public PriorityBlockingQueue(int initialCapacity,
57     Comparator<E> comparator) {
58 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
59 dl 1.2 }
60    
61     /**
62 dholmes 1.10 * Create a <tt>PriorityBlockingQueue</tt> containing the elements
63     * in the specified
64 dl 1.2 * collection. The priority queue has an initial capacity of 110% of the
65     * size of the specified collection. If the specified collection
66     * implements the {@link Sorted} interface, the priority queue will be
67     * sorted according to the same comparator, or according to its elements'
68     * natural order if the collection is sorted according to its elements'
69 dholmes 1.10 * natural order. If the specified collection does not implement
70     * <tt>Sorted</tt>, the priority queue is ordered according to
71 dl 1.2 * its elements' natural order.
72     *
73     * @param initialElements the collection whose elements are to be placed
74     * into this priority queue.
75     * @throws ClassCastException if elements of the specified collection
76     * cannot be compared to one another according to the priority
77     * queue's ordering.
78     * @throws NullPointerException if the specified collection or an
79     * element of the specified collection is <tt>null</tt>.
80     */
81     public PriorityBlockingQueue(Collection<E> initialElements) {
82 dl 1.5 q = new PriorityQueue<E>(initialElements);
83 dl 1.7 }
84    
85 dholmes 1.10
86     // these first two override just to get the throws docs
87    
88     /**
89     * @throws NullPointerException if the specified element is <tt>null</tt>.
90     */
91     public boolean add(E element) {
92     return super.add(element);
93     }
94    
95 dl 1.11 // /**
96     // * @throws NullPointerException if any element is <tt>null</tt>.
97     // */
98     // public boolean addAll(Collection c) {
99     // return super.addAll(c);
100     // }
101 dholmes 1.10
102 dl 1.7 public Comparator comparator() {
103     return q.comparator();
104 dl 1.5 }
105    
106 dholmes 1.10 /** @throws NullPointerException if <tt>x</tt> is <tt>null</tt> */
107 dl 1.5 public boolean offer(E x) {
108 dl 1.6 if (x == null) throw new NullPointerException();
109 dl 1.5 lock.lock();
110     try {
111     boolean ok = q.offer(x);
112     assert ok;
113     notEmpty.signal();
114     return true;
115     }
116     finally {
117     lock.unlock();
118     }
119     }
120    
121     public void put(E x) throws InterruptedException {
122     offer(x); // never need to block
123     }
124    
125 dholmes 1.10 public boolean offer(E x, long timeout, TimeUnit unit)
126     throws InterruptedException {
127 dl 1.5 return offer(x); // never need to block
128     }
129    
130     public E take() throws InterruptedException {
131     lock.lockInterruptibly();
132     try {
133     try {
134     while (q.size() == 0)
135     notEmpty.await();
136     }
137     catch (InterruptedException ie) {
138     notEmpty.signal(); // propagate to non-interrupted thread
139     throw ie;
140     }
141     E x = q.poll();
142     assert x != null;
143     return x;
144     }
145     finally {
146     lock.unlock();
147     }
148     }
149    
150    
151     public E poll() {
152     lock.lock();
153     try {
154     return q.poll();
155     }
156     finally {
157     lock.unlock();
158     }
159     }
160    
161     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
162 dholmes 1.10 long nanos = unit.toNanos(timeout);
163 dl 1.5 lock.lockInterruptibly();
164     try {
165     for (;;) {
166     E x = q.poll();
167     if (x != null)
168     return x;
169     if (nanos <= 0)
170     return null;
171     try {
172     nanos = notEmpty.awaitNanos(nanos);
173     }
174     catch (InterruptedException ie) {
175     notEmpty.signal(); // propagate to non-interrupted thread
176     throw ie;
177     }
178     }
179     }
180     finally {
181     lock.unlock();
182     }
183     }
184    
185     public E peek() {
186     lock.lock();
187     try {
188     return q.peek();
189     }
190     finally {
191     lock.unlock();
192     }
193     }
194    
195     public int size() {
196     lock.lock();
197     try {
198     return q.size();
199     }
200     finally {
201     lock.unlock();
202     }
203     }
204    
205     /**
206     * Always returns <tt>Integer.MAX_VALUE</tt> because
207     * PriorityBlockingQueues are not capacity constrained.
208     * @return <tt>Integer.MAX_VALUE</tt>
209     */
210     public int remainingCapacity() {
211     return Integer.MAX_VALUE;
212     }
213    
214     public boolean remove(Object x) {
215     lock.lock();
216     try {
217     return q.remove(x);
218     }
219     finally {
220     lock.unlock();
221     }
222     }
223    
224     public boolean contains(Object x) {
225     lock.lock();
226     try {
227     return q.contains(x);
228     }
229     finally {
230     lock.unlock();
231     }
232     }
233    
234     public Object[] toArray() {
235     lock.lock();
236     try {
237     return q.toArray();
238     }
239     finally {
240     lock.unlock();
241     }
242     }
243    
244    
245     public String toString() {
246     lock.lock();
247     try {
248     return q.toString();
249     }
250     finally {
251     lock.unlock();
252     }
253     }
254    
255     public <T> T[] toArray(T[] a) {
256     lock.lock();
257     try {
258     return q.toArray(a);
259     }
260     finally {
261     lock.unlock();
262     }
263     }
264    
265     public Iterator<E> iterator() {
266     lock.lock();
267     try {
268     return new Itr(q.iterator());
269     }
270     finally {
271     lock.unlock();
272     }
273     }
274    
275     private class Itr<E> implements Iterator<E> {
276     private final Iterator<E> iter;
277     Itr(Iterator<E> i) {
278     iter = i;
279     }
280    
281     public boolean hasNext() {
282     /*
283     * No sync -- we rely on underlying hasNext to be
284     * stateless, in which case we can return true by mistake
285     * only when next() willl subsequently throw
286     * ConcurrentModificationException.
287     */
288     return iter.hasNext();
289     }
290    
291     public E next() {
292     lock.lock();
293     try {
294     return iter.next();
295     }
296     finally {
297     lock.unlock();
298     }
299     }
300    
301     public void remove() {
302     lock.lock();
303     try {
304     iter.remove();
305     }
306     finally {
307     lock.unlock();
308     }
309     }
310     }
311    
312     /**
313     * Save the state to a stream (that is, serialize it). This
314     * merely wraps default serialization within lock. The
315     * serialization strategy for items is left to underlying
316     * Queue. Note that locking is not needed on deserialization, so
317     * readObject is not defined, just relying on default.
318     */
319     private void writeObject(java.io.ObjectOutputStream s)
320     throws java.io.IOException {
321     lock.lock();
322     try {
323     s.defaultWriteObject();
324     }
325     finally {
326     lock.unlock();
327     }
328 tim 1.1 }
329    
330     }