ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractBlockingQueueFromQueue.java
Revision: 1.2
Committed: Sun Jun 22 21:47:17 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +0 -0 lines
State: FILE REMOVED
Log Message:
Split ArrayBlockingQueue and PriorityBlockingQueue to no longer subclass AbstractBlockingQueueFromQueue

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11     * An AbstractBlockingQueueFromQueue places blocking concurrency control
12     * around a non-synchronized, non-thread-safe Queue.
13     **/
14     abstract class AbstractBlockingQueueFromQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
15    
16     /*
17     * Concurrency control via the classic two-condition algorithm
18     * found in any textbook.
19     */
20    
21     private transient final FairReentrantLock lock = new FairReentrantLock();
22     private transient final Condition notEmpty = lock.newCondition();
23     private transient final Condition notFull = lock.newCondition();
24     private final Queue<E> q;
25     private final int capacity;
26    
27     protected AbstractBlockingQueueFromQueue(Queue<E> queue, int capacity) {
28     if (capacity <= 0) throw new IllegalArgumentException();
29     this.capacity = capacity;
30     q = queue;
31     }
32    
33    
34     public void put(E x) throws InterruptedException {
35     if (x == null) throw new IllegalArgumentException();
36     lock.lockInterruptibly();
37     try {
38     try {
39     while (q.size() == capacity)
40     notFull.await();
41     }
42     catch (InterruptedException ie) {
43     notFull.signal(); // propagate to non-interrupted thread
44     throw ie;
45     }
46     boolean ok = q.offer(x);
47     assert ok;
48     notEmpty.signal();
49     }
50     finally {
51     lock.unlock();
52     }
53     }
54    
55     public E take() throws InterruptedException {
56     lock.lockInterruptibly();
57     try {
58     try {
59     while (q.size() == 0)
60     notEmpty.await();
61     }
62     catch (InterruptedException ie) {
63     notEmpty.signal(); // propagate to non-interrupted thread
64     throw ie;
65     }
66     E x = q.poll();
67     assert x != null;
68     notFull.signal();
69     return x;
70     }
71     finally {
72     lock.unlock();
73     }
74     }
75    
76     public boolean offer(E x) {
77     if (x == null) throw new IllegalArgumentException();
78     lock.lock();
79     try {
80     if (q.size() == capacity)
81     return false;
82     else {
83     boolean ok = q.offer(x);
84     assert ok;
85     notEmpty.signal();
86     return true;
87     }
88     }
89     finally {
90     lock.unlock();
91     }
92     }
93    
94     public E poll() {
95     lock.lock();
96     try {
97     E x = q.poll();
98     if (x != null)
99     notFull.signal();
100     return x;
101     }
102     finally {
103     lock.unlock();
104     }
105     }
106    
107     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
108     if (x == null) throw new IllegalArgumentException();
109     lock.lockInterruptibly();
110     long nanos = unit.toNanos(timeout);
111     try {
112     for (;;) {
113     if (q.size() != capacity) {
114     boolean ok = q.offer(x);
115     assert ok;
116     notEmpty.signal();
117     return true;
118     }
119     if (nanos <= 0)
120     return false;
121     try {
122     nanos = notFull.awaitNanos(nanos);
123     }
124     catch (InterruptedException ie) {
125     notFull.signal(); // propagate to non-interrupted thread
126     throw ie;
127     }
128     }
129     }
130     finally {
131     lock.unlock();
132     }
133     }
134    
135     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
136     lock.lockInterruptibly();
137     long nanos = unit.toNanos(timeout);
138     try {
139     for (;;) {
140     E x = q.poll();
141     if (x != null) {
142     notFull.signal();
143     return x;
144     }
145     if (nanos <= 0)
146     return null;
147     try {
148     nanos = notEmpty.awaitNanos(nanos);
149     }
150     catch (InterruptedException ie) {
151     notEmpty.signal(); // propagate to non-interrupted thread
152     throw ie;
153     }
154    
155     }
156     }
157     finally {
158     lock.unlock();
159     }
160     }
161    
162     public E peek() {
163     lock.lock();
164     try {
165     return q.peek();
166     }
167     finally {
168     lock.unlock();
169     }
170     }
171    
172     public int size() {
173     lock.lock();
174     try {
175     return q.size();
176     }
177     finally {
178     lock.unlock();
179     }
180     }
181    
182     public int remainingCapacity() {
183     lock.lock();
184     try {
185     return capacity - q.size();
186     }
187     finally {
188     lock.unlock();
189     }
190     }
191    
192     public boolean remove(Object x) {
193     lock.lock();
194     try {
195     boolean removed = q.remove(x);
196     if (removed)
197     notFull.signal();
198     return removed;
199     }
200     finally {
201     lock.unlock();
202     }
203     }
204    
205     public boolean contains(Object x) {
206     lock.lock();
207     try {
208     return q.contains(x);
209     }
210     finally {
211     lock.unlock();
212     }
213     }
214    
215     public Object[] toArray() {
216     lock.lock();
217     try {
218     return q.toArray();
219     }
220     finally {
221     lock.unlock();
222     }
223     }
224    
225    
226     public String toString() {
227     lock.lock();
228     try {
229     return q.toString();
230     }
231     finally {
232     lock.unlock();
233     }
234     }
235    
236     public <T> T[] toArray(T[] a) {
237     lock.lock();
238     try {
239     return q.toArray(a);
240     }
241     finally {
242     lock.unlock();
243     }
244     }
245    
246     public Iterator<E> iterator() {
247     lock.lock();
248     try {
249     return new Itr(q.iterator());
250     }
251     finally {
252     lock.unlock();
253     }
254     }
255    
256     private class Itr<E> implements Iterator<E> {
257     private final Iterator<E> iter;
258     Itr(Iterator<E> i) {
259     iter = i;
260     }
261    
262     public boolean hasNext() {
263     /*
264     * No sync -- we rely on underlying hasNext to be
265     * stateless, in which case we can return true by mistake
266     * only when next() willl subsequently throw
267     * ConcurrentModificationException.
268     */
269     return iter.hasNext();
270     }
271    
272     public E next() {
273     lock.lock();
274     try {
275     return iter.next();
276     }
277     finally {
278     lock.unlock();
279     }
280     }
281    
282     public void remove() {
283     lock.lock();
284     try {
285     iter.remove();
286     notFull.signal();
287     }
288     finally {
289     lock.unlock();
290     }
291     }
292     }
293    
294     /**
295     * Save the state to a stream (that is, serialize it). This
296     * merely wraps default serialization within lock. The
297     * serialization strategy for items is left to underlying
298     * Queue. Note that locking is not needed on deserialization, so
299     * readObject is not defined, just relying on default.
300     */
301     private void writeObject(java.io.ObjectOutputStream s)
302     throws java.io.IOException {
303     lock.lock();
304     try {
305     s.defaultWriteObject();
306     }
307     finally {
308     lock.unlock();
309     }
310     }
311     }
312    
313    
314