ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/PriorityBlockingQueue.java
Revision: 1.8
Committed: Tue Jul 8 00:46:34 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.7: +1 -1 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.5 * An unbounded blocking queue based on a {@link PriorityQueue},
13     * obeying its ordering rules and implementation characteristics.
14 dl 1.6 * @since 1.5
15     * @author Doug Lea
16     **/
17 dl 1.5 public class PriorityBlockingQueue<E> extends AbstractQueue<E>
18 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
19    
20 dl 1.5 private final PriorityQueue<E> q;
21 dl 1.7 private final ReentrantLock lock = new ReentrantLock();
22 dl 1.5 private final Condition notEmpty = lock.newCondition();
23    
24 dl 1.2 /**
25     * Create a new priority queue with the default initial capacity (11)
26     * that orders its elements according to their natural ordering.
27     */
28     public PriorityBlockingQueue() {
29 dl 1.5 q = new PriorityQueue<E>();
30 dl 1.2 }
31    
32     /**
33     * Create a new priority queue with the specified initial capacity
34     * that orders its elements according to their natural ordering.
35     *
36     * @param initialCapacity the initial capacity for this priority queue.
37     */
38     public PriorityBlockingQueue(int initialCapacity) {
39 dl 1.5 q = new PriorityQueue<E>(initialCapacity, null);
40 dl 1.2 }
41    
42     /**
43     * Create a new priority queue with the specified initial capacity (11)
44     * that orders its elements according to the specified comparator.
45     *
46     * @param initialCapacity the initial capacity for this priority queue.
47     * @param comparator the comparator used to order this priority queue.
48     */
49 dl 1.3 public PriorityBlockingQueue(int initialCapacity, Comparator<E> comparator) {
50 dl 1.5 q = new PriorityQueue<E>(initialCapacity, comparator);
51 dl 1.2 }
52    
53     /**
54     * Create a new priority queue containing the elements in the specified
55     * collection. The priority queue has an initial capacity of 110% of the
56     * size of the specified collection. If the specified collection
57     * implements the {@link Sorted} interface, the priority queue will be
58     * sorted according to the same comparator, or according to its elements'
59     * natural order if the collection is sorted according to its elements'
60     * natural order. If the specified collection does not implement the
61     * <tt>Sorted</tt> interface, the priority queue is ordered according to
62     * its elements' natural order.
63     *
64     * @param initialElements the collection whose elements are to be placed
65     * into this priority queue.
66     * @throws ClassCastException if elements of the specified collection
67     * cannot be compared to one another according to the priority
68     * queue's ordering.
69     * @throws NullPointerException if the specified collection or an
70     * element of the specified collection is <tt>null</tt>.
71     */
72     public PriorityBlockingQueue(Collection<E> initialElements) {
73 dl 1.5 q = new PriorityQueue<E>(initialElements);
74 dl 1.7 }
75    
76     /**
77     * Returns the comparator associated with this priority queue, or
78     * <tt>null</tt> if it uses its elements' natural ordering.
79     *
80     * @return the comparator associated with this priority queue, or
81     * <tt>null</tt> if it uses its elements' natural ordering.
82     */
83     public Comparator comparator() {
84     return q.comparator();
85 dl 1.5 }
86    
87     public boolean offer(E x) {
88 dl 1.6 if (x == null) throw new NullPointerException();
89 dl 1.5 lock.lock();
90     try {
91     boolean ok = q.offer(x);
92     assert ok;
93     notEmpty.signal();
94     return true;
95     }
96     finally {
97     lock.unlock();
98     }
99     }
100    
101     public void put(E x) throws InterruptedException {
102     offer(x); // never need to block
103     }
104    
105     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
106     return offer(x); // never need to block
107     }
108    
109     public E take() throws InterruptedException {
110     lock.lockInterruptibly();
111     try {
112     try {
113     while (q.size() == 0)
114     notEmpty.await();
115     }
116     catch (InterruptedException ie) {
117     notEmpty.signal(); // propagate to non-interrupted thread
118     throw ie;
119     }
120     E x = q.poll();
121     assert x != null;
122     return x;
123     }
124     finally {
125     lock.unlock();
126     }
127     }
128    
129    
130     public E poll() {
131     lock.lock();
132     try {
133     return q.poll();
134     }
135     finally {
136     lock.unlock();
137     }
138     }
139    
140     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
141     lock.lockInterruptibly();
142     long nanos = unit.toNanos(timeout);
143     try {
144     for (;;) {
145     E x = q.poll();
146     if (x != null)
147     return x;
148     if (nanos <= 0)
149     return null;
150     try {
151     nanos = notEmpty.awaitNanos(nanos);
152     }
153     catch (InterruptedException ie) {
154     notEmpty.signal(); // propagate to non-interrupted thread
155     throw ie;
156     }
157    
158     }
159     }
160     finally {
161     lock.unlock();
162     }
163     }
164    
165     public E peek() {
166     lock.lock();
167     try {
168     return q.peek();
169     }
170     finally {
171     lock.unlock();
172     }
173     }
174    
175     public int size() {
176     lock.lock();
177     try {
178     return q.size();
179     }
180     finally {
181     lock.unlock();
182     }
183     }
184    
185     /**
186     * Always returns <tt>Integer.MAX_VALUE</tt> because
187     * PriorityBlockingQueues are not capacity constrained.
188     * @return <tt>Integer.MAX_VALUE</tt>
189     */
190     public int remainingCapacity() {
191     return Integer.MAX_VALUE;
192     }
193    
194     public boolean remove(Object x) {
195     lock.lock();
196     try {
197     return q.remove(x);
198     }
199     finally {
200     lock.unlock();
201     }
202     }
203    
204     public boolean contains(Object x) {
205     lock.lock();
206     try {
207     return q.contains(x);
208     }
209     finally {
210     lock.unlock();
211     }
212     }
213    
214     public Object[] toArray() {
215     lock.lock();
216     try {
217     return q.toArray();
218     }
219     finally {
220     lock.unlock();
221     }
222     }
223    
224    
225     public String toString() {
226     lock.lock();
227     try {
228     return q.toString();
229     }
230     finally {
231     lock.unlock();
232     }
233     }
234    
235     public <T> T[] toArray(T[] a) {
236     lock.lock();
237     try {
238     return q.toArray(a);
239     }
240     finally {
241     lock.unlock();
242     }
243     }
244    
245     public Iterator<E> iterator() {
246     lock.lock();
247     try {
248     return new Itr(q.iterator());
249     }
250     finally {
251     lock.unlock();
252     }
253     }
254    
255     private class Itr<E> implements Iterator<E> {
256     private final Iterator<E> iter;
257     Itr(Iterator<E> i) {
258     iter = i;
259     }
260    
261     public boolean hasNext() {
262     /*
263     * No sync -- we rely on underlying hasNext to be
264     * stateless, in which case we can return true by mistake
265     * only when next() willl subsequently throw
266     * ConcurrentModificationException.
267     */
268     return iter.hasNext();
269     }
270    
271     public E next() {
272     lock.lock();
273     try {
274     return iter.next();
275     }
276     finally {
277     lock.unlock();
278     }
279     }
280    
281     public void remove() {
282     lock.lock();
283     try {
284     iter.remove();
285     }
286     finally {
287     lock.unlock();
288     }
289     }
290     }
291    
292     /**
293     * Save the state to a stream (that is, serialize it). This
294     * merely wraps default serialization within lock. The
295     * serialization strategy for items is left to underlying
296     * Queue. Note that locking is not needed on deserialization, so
297     * readObject is not defined, just relying on default.
298     */
299     private void writeObject(java.io.ObjectOutputStream s)
300     throws java.io.IOException {
301     lock.lock();
302     try {
303     s.defaultWriteObject();
304     }
305     finally {
306     lock.unlock();
307     }
308 tim 1.1 }
309    
310     }