ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.5
Committed: Sun Jun 22 21:47:17 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.4: +234 -9 lines
Log Message:
Split ArrayBlockingQueue and PriorityBlockingQueue to no longer subclass AbstractBlockingQueueFromQueue

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11 dl 1.5 * An unbounded blocking queue based on a {@link PriorityQueue},
12     * obeying its ordering rules and implementation characteristics.
13 tim 1.1 **/
14 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
15 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
16    
17 dl 1.5 private final PriorityQueue<E> q;
18     private final FairReentrantLock lock = new FairReentrantLock();
19     private final Condition notEmpty = lock.newCondition();
20    
21 dl 1.2 /**
22     * Create a new priority queue with the default initial capacity (11)
23     * that orders its elements according to their natural ordering.
24     */
25     public PriorityBlockingQueue() {
26 dl 1.5 q = new PriorityQueue<E>();
27 dl 1.2 }
28    
29     /**
30     * Create a new priority queue with the specified initial capacity
31     * that orders its elements according to their natural ordering.
32     *
33     * @param initialCapacity the initial capacity for this priority queue.
34     */
35     public PriorityBlockingQueue(int initialCapacity) {
36 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
37 dl 1.2 }
38    
39     /**
40     * Create a new priority queue with the specified initial capacity (11)
41     * that orders its elements according to the specified comparator.
42     *
43     * @param initialCapacity the initial capacity for this priority queue.
44     * @param comparator the comparator used to order this priority queue.
45     */
46 dl 1.3 public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
47 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
48 dl 1.2 }
49    
50     /**
51     * Create a new priority queue containing the elements in the specified
52     * collection. The priority queue has an initial capacity of 110% of the
53     * size of the specified collection. If the specified collection
54     * implements the {@link Sorted} interface, the priority queue will be
55     * sorted according to the same comparator, or according to its elements'
56     * natural order if the collection is sorted according to its elements'
57     * natural order. If the specified collection does not implement the
58     * <tt>Sorted</tt> interface, the priority queue is ordered according to
59     * its elements' natural order.
60     *
61     * @param initialElements the collection whose elements are to be placed
62     * into this priority queue.
63     * @throws ClassCastException if elements of the specified collection
64     * cannot be compared to one another according to the priority
65     * queue's ordering.
66     * @throws NullPointerException if the specified collection or an
67     * element of the specified collection is <tt>null</tt>.
68     */
69     public PriorityBlockingQueue(Collection<E> initialElements) {
70 dl 1.5 q = new PriorityQueue<E>(initialElements);
71     }
72    
73     public boolean offer(E x) {
74     if (x == null) throw new IllegalArgumentException();
75     lock.lock();
76     try {
77     boolean ok = q.offer(x);
78     assert ok;
79     notEmpty.signal();
80     return true;
81     }
82     finally {
83     lock.unlock();
84     }
85     }
86    
87     public void put(E x) throws InterruptedException {
88     offer(x); // never need to block
89     }
90    
91     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
92     return offer(x); // never need to block
93     }
94    
95     public E take() throws InterruptedException {
96     lock.lockInterruptibly();
97     try {
98     try {
99     while (q.size() == 0)
100     notEmpty.await();
101     }
102     catch (InterruptedException ie) {
103     notEmpty.signal(); // propagate to non-interrupted thread
104     throw ie;
105     }
106     E x = q.poll();
107     assert x != null;
108     return x;
109     }
110     finally {
111     lock.unlock();
112     }
113     }
114    
115    
116     public E poll() {
117     lock.lock();
118     try {
119     return q.poll();
120     }
121     finally {
122     lock.unlock();
123     }
124     }
125    
126     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
127     lock.lockInterruptibly();
128     long nanos = unit.toNanos(timeout);
129     try {
130     for (;;) {
131     E x = q.poll();
132     if (x != null)
133     return x;
134     if (nanos <= 0)
135     return null;
136     try {
137     nanos = notEmpty.awaitNanos(nanos);
138     }
139     catch (InterruptedException ie) {
140     notEmpty.signal(); // propagate to non-interrupted thread
141     throw ie;
142     }
143    
144     }
145     }
146     finally {
147     lock.unlock();
148     }
149     }
150    
151     public E peek() {
152     lock.lock();
153     try {
154     return q.peek();
155     }
156     finally {
157     lock.unlock();
158     }
159     }
160    
161     public int size() {
162     lock.lock();
163     try {
164     return q.size();
165     }
166     finally {
167     lock.unlock();
168     }
169     }
170    
171     /**
172     * Always returns <tt>Integer.MAX_VALUE</tt> because
173     * PriorityBlockingQueues are not capacity constrained.
174     * @return <tt>Integer.MAX_VALUE</tt>
175     */
176     public int remainingCapacity() {
177     return Integer.MAX_VALUE;
178     }
179    
180     public boolean remove(Object x) {
181     lock.lock();
182     try {
183     return q.remove(x);
184     }
185     finally {
186     lock.unlock();
187     }
188     }
189    
190     public boolean contains(Object x) {
191     lock.lock();
192     try {
193     return q.contains(x);
194     }
195     finally {
196     lock.unlock();
197     }
198     }
199    
200     public Object[] toArray() {
201     lock.lock();
202     try {
203     return q.toArray();
204     }
205     finally {
206     lock.unlock();
207     }
208     }
209    
210    
211     public String toString() {
212     lock.lock();
213     try {
214     return q.toString();
215     }
216     finally {
217     lock.unlock();
218     }
219     }
220    
221     public <T> T[] toArray(T[] a) {
222     lock.lock();
223     try {
224     return q.toArray(a);
225     }
226     finally {
227     lock.unlock();
228     }
229     }
230    
231     public Iterator<E> iterator() {
232     lock.lock();
233     try {
234     return new Itr(q.iterator());
235     }
236     finally {
237     lock.unlock();
238     }
239     }
240    
241     private class Itr<E> implements Iterator<E> {
242     private final Iterator<E> iter;
243     Itr(Iterator<E> i) {
244     iter = i;
245     }
246    
247     public boolean hasNext() {
248     /*
249     * No sync -- we rely on underlying hasNext to be
250     * stateless, in which case we can return true by mistake
251     * only when next() willl subsequently throw
252     * ConcurrentModificationException.
253     */
254     return iter.hasNext();
255     }
256    
257     public E next() {
258     lock.lock();
259     try {
260     return iter.next();
261     }
262     finally {
263     lock.unlock();
264     }
265     }
266    
267     public void remove() {
268     lock.lock();
269     try {
270     iter.remove();
271     }
272     finally {
273     lock.unlock();
274     }
275     }
276     }
277    
278     /**
279     * Save the state to a stream (that is, serialize it). This
280     * merely wraps default serialization within lock. The
281     * serialization strategy for items is left to underlying
282     * Queue. Note that locking is not needed on deserialization, so
283     * readObject is not defined, just relying on default.
284     */
285     private void writeObject(java.io.ObjectOutputStream s)
286     throws java.io.IOException {
287     lock.lock();
288     try {
289     s.defaultWriteObject();
290     }
291     finally {
292     lock.unlock();
293     }
294 tim 1.1 }
295    
296     }