ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.6
Committed: Tue Jun 24 14:34:48 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.5: +4 -2 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11 dl 1.5 * An unbounded blocking queue based on a {@link PriorityQueue},
12     * obeying its ordering rules and implementation characteristics.
13 dl 1.6 * @since 1.5
14     * @author Doug Lea
15     **/
16 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
17 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
18    
19 dl 1.5 private final PriorityQueue<E> q;
20     private final FairReentrantLock lock = new FairReentrantLock();
21     private final Condition notEmpty = lock.newCondition();
22    
23 dl 1.2 /**
24     * Create a new priority queue with the default initial capacity (11)
25     * that orders its elements according to their natural ordering.
26     */
27     public PriorityBlockingQueue() {
28 dl 1.5 q = new PriorityQueue<E>();
29 dl 1.2 }
30    
31     /**
32     * Create a new priority queue with the specified initial capacity
33     * that orders its elements according to their natural ordering.
34     *
35     * @param initialCapacity the initial capacity for this priority queue.
36     */
37     public PriorityBlockingQueue(int initialCapacity) {
38 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
39 dl 1.2 }
40    
41     /**
42     * Create a new priority queue with the specified initial capacity (11)
43     * that orders its elements according to the specified comparator.
44     *
45     * @param initialCapacity the initial capacity for this priority queue.
46     * @param comparator the comparator used to order this priority queue.
47     */
48 dl 1.3 public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
49 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
50 dl 1.2 }
51    
52     /**
53     * Create a new priority queue containing the elements in the specified
54     * collection. The priority queue has an initial capacity of 110% of the
55     * size of the specified collection. If the specified collection
56     * implements the {@link Sorted} interface, the priority queue will be
57     * sorted according to the same comparator, or according to its elements'
58     * natural order if the collection is sorted according to its elements'
59     * natural order. If the specified collection does not implement the
60     * <tt>Sorted</tt> interface, the priority queue is ordered according to
61     * its elements' natural order.
62     *
63     * @param initialElements the collection whose elements are to be placed
64     * into this priority queue.
65     * @throws ClassCastException if elements of the specified collection
66     * cannot be compared to one another according to the priority
67     * queue's ordering.
68     * @throws NullPointerException if the specified collection or an
69     * element of the specified collection is <tt>null</tt>.
70     */
71     public PriorityBlockingQueue(Collection<E> initialElements) {
72 dl 1.5 q = new PriorityQueue<E>(initialElements);
73     }
74    
75     public boolean offer(E x) {
76 dl 1.6 if (x == null) throw new NullPointerException();
77 dl 1.5 lock.lock();
78     try {
79     boolean ok = q.offer(x);
80     assert ok;
81     notEmpty.signal();
82     return true;
83     }
84     finally {
85     lock.unlock();
86     }
87     }
88    
89     public void put(E x) throws InterruptedException {
90     offer(x); // never need to block
91     }
92    
93     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
94     return offer(x); // never need to block
95     }
96    
97     public E take() throws InterruptedException {
98     lock.lockInterruptibly();
99     try {
100     try {
101     while (q.size() == 0)
102     notEmpty.await();
103     }
104     catch (InterruptedException ie) {
105     notEmpty.signal(); // propagate to non-interrupted thread
106     throw ie;
107     }
108     E x = q.poll();
109     assert x != null;
110     return x;
111     }
112     finally {
113     lock.unlock();
114     }
115     }
116    
117    
118     public E poll() {
119     lock.lock();
120     try {
121     return q.poll();
122     }
123     finally {
124     lock.unlock();
125     }
126     }
127    
128     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
129     lock.lockInterruptibly();
130     long nanos = unit.toNanos(timeout);
131     try {
132     for (;;) {
133     E x = q.poll();
134     if (x != null)
135     return x;
136     if (nanos <= 0)
137     return null;
138     try {
139     nanos = notEmpty.awaitNanos(nanos);
140     }
141     catch (InterruptedException ie) {
142     notEmpty.signal(); // propagate to non-interrupted thread
143     throw ie;
144     }
145    
146     }
147     }
148     finally {
149     lock.unlock();
150     }
151     }
152    
153     public E peek() {
154     lock.lock();
155     try {
156     return q.peek();
157     }
158     finally {
159     lock.unlock();
160     }
161     }
162    
163     public int size() {
164     lock.lock();
165     try {
166     return q.size();
167     }
168     finally {
169     lock.unlock();
170     }
171     }
172    
173     /**
174     * Always returns <tt>Integer.MAX_VALUE</tt> because
175     * PriorityBlockingQueues are not capacity constrained.
176     * @return <tt>Integer.MAX_VALUE</tt>
177     */
178     public int remainingCapacity() {
179     return Integer.MAX_VALUE;
180     }
181    
182     public boolean remove(Object x) {
183     lock.lock();
184     try {
185     return q.remove(x);
186     }
187     finally {
188     lock.unlock();
189     }
190     }
191    
192     public boolean contains(Object x) {
193     lock.lock();
194     try {
195     return q.contains(x);
196     }
197     finally {
198     lock.unlock();
199     }
200     }
201    
202     public Object[] toArray() {
203     lock.lock();
204     try {
205     return q.toArray();
206     }
207     finally {
208     lock.unlock();
209     }
210     }
211    
212    
213     public String toString() {
214     lock.lock();
215     try {
216     return q.toString();
217     }
218     finally {
219     lock.unlock();
220     }
221     }
222    
223     public <T> T[] toArray(T[] a) {
224     lock.lock();
225     try {
226     return q.toArray(a);
227     }
228     finally {
229     lock.unlock();
230     }
231     }
232    
233     public Iterator<E> iterator() {
234     lock.lock();
235     try {
236     return new Itr(q.iterator());
237     }
238     finally {
239     lock.unlock();
240     }
241     }
242    
243     private class Itr<E> implements Iterator<E> {
244     private final Iterator<E> iter;
245     Itr(Iterator<E> i) {
246     iter = i;
247     }
248    
249     public boolean hasNext() {
250     /*
251     * No sync -- we rely on underlying hasNext to be
252     * stateless, in which case we can return true by mistake
253     * only when next() willl subsequently throw
254     * ConcurrentModificationException.
255     */
256     return iter.hasNext();
257     }
258    
259     public E next() {
260     lock.lock();
261     try {
262     return iter.next();
263     }
264     finally {
265     lock.unlock();
266     }
267     }
268    
269     public void remove() {
270     lock.lock();
271     try {
272     iter.remove();
273     }
274     finally {
275     lock.unlock();
276     }
277     }
278     }
279    
280     /**
281     * Save the state to a stream (that is, serialize it). This
282     * merely wraps default serialization within lock. The
283     * serialization strategy for items is left to underlying
284     * Queue. Note that locking is not needed on deserialization, so
285     * readObject is not defined, just relying on default.
286     */
287     private void writeObject(java.io.ObjectOutputStream s)
288     throws java.io.IOException {
289     lock.lock();
290     try {
291     s.defaultWriteObject();
292     }
293     finally {
294     lock.unlock();
295     }
296 tim 1.1 }
297    
298     }