ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractBlockingQueueFromQueue.java
Revision: 1.2
Committed: Sun Jun 22 21:47:17 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +0 -0 lines
State: FILE REMOVED
Log Message:
Split ArrayBlockingQueue and PriorityBlockingQueue to no longer subclass AbstractBlockingQueueFromQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * An AbstractBlockingQueueFromQueue places blocking concurrency control
12 * around a non-synchronized, non-thread-safe Queue.
13 **/
14 abstract class AbstractBlockingQueueFromQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
15
16 /*
17 * Concurrency control via the classic two-condition algorithm
18 * found in any textbook.
19 */
20
21 private transient final FairReentrantLock lock = new FairReentrantLock();
22 private transient final Condition notEmpty = lock.newCondition();
23 private transient final Condition notFull = lock.newCondition();
24 private final Queue<E> q;
25 private final int capacity;
26
27 protected AbstractBlockingQueueFromQueue(Queue<E> queue, int capacity) {
28 if (capacity <= 0) throw new IllegalArgumentException();
29 this.capacity = capacity;
30 q = queue;
31 }
32
33
34 public void put(E x) throws InterruptedException {
35 if (x == null) throw new IllegalArgumentException();
36 lock.lockInterruptibly();
37 try {
38 try {
39 while (q.size() == capacity)
40 notFull.await();
41 }
42 catch (InterruptedException ie) {
43 notFull.signal(); // propagate to non-interrupted thread
44 throw ie;
45 }
46 boolean ok = q.offer(x);
47 assert ok;
48 notEmpty.signal();
49 }
50 finally {
51 lock.unlock();
52 }
53 }
54
55 public E take() throws InterruptedException {
56 lock.lockInterruptibly();
57 try {
58 try {
59 while (q.size() == 0)
60 notEmpty.await();
61 }
62 catch (InterruptedException ie) {
63 notEmpty.signal(); // propagate to non-interrupted thread
64 throw ie;
65 }
66 E x = q.poll();
67 assert x != null;
68 notFull.signal();
69 return x;
70 }
71 finally {
72 lock.unlock();
73 }
74 }
75
76 public boolean offer(E x) {
77 if (x == null) throw new IllegalArgumentException();
78 lock.lock();
79 try {
80 if (q.size() == capacity)
81 return false;
82 else {
83 boolean ok = q.offer(x);
84 assert ok;
85 notEmpty.signal();
86 return true;
87 }
88 }
89 finally {
90 lock.unlock();
91 }
92 }
93
94 public E poll() {
95 lock.lock();
96 try {
97 E x = q.poll();
98 if (x != null)
99 notFull.signal();
100 return x;
101 }
102 finally {
103 lock.unlock();
104 }
105 }
106
107 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
108 if (x == null) throw new IllegalArgumentException();
109 lock.lockInterruptibly();
110 long nanos = unit.toNanos(timeout);
111 try {
112 for (;;) {
113 if (q.size() != capacity) {
114 boolean ok = q.offer(x);
115 assert ok;
116 notEmpty.signal();
117 return true;
118 }
119 if (nanos <= 0)
120 return false;
121 try {
122 nanos = notFull.awaitNanos(nanos);
123 }
124 catch (InterruptedException ie) {
125 notFull.signal(); // propagate to non-interrupted thread
126 throw ie;
127 }
128 }
129 }
130 finally {
131 lock.unlock();
132 }
133 }
134
135 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
136 lock.lockInterruptibly();
137 long nanos = unit.toNanos(timeout);
138 try {
139 for (;;) {
140 E x = q.poll();
141 if (x != null) {
142 notFull.signal();
143 return x;
144 }
145 if (nanos <= 0)
146 return null;
147 try {
148 nanos = notEmpty.awaitNanos(nanos);
149 }
150 catch (InterruptedException ie) {
151 notEmpty.signal(); // propagate to non-interrupted thread
152 throw ie;
153 }
154
155 }
156 }
157 finally {
158 lock.unlock();
159 }
160 }
161
162 public E peek() {
163 lock.lock();
164 try {
165 return q.peek();
166 }
167 finally {
168 lock.unlock();
169 }
170 }
171
172 public int size() {
173 lock.lock();
174 try {
175 return q.size();
176 }
177 finally {
178 lock.unlock();
179 }
180 }
181
182 public int remainingCapacity() {
183 lock.lock();
184 try {
185 return capacity - q.size();
186 }
187 finally {
188 lock.unlock();
189 }
190 }
191
192 public boolean remove(Object x) {
193 lock.lock();
194 try {
195 boolean removed = q.remove(x);
196 if (removed)
197 notFull.signal();
198 return removed;
199 }
200 finally {
201 lock.unlock();
202 }
203 }
204
205 public boolean contains(Object x) {
206 lock.lock();
207 try {
208 return q.contains(x);
209 }
210 finally {
211 lock.unlock();
212 }
213 }
214
215 public Object[] toArray() {
216 lock.lock();
217 try {
218 return q.toArray();
219 }
220 finally {
221 lock.unlock();
222 }
223 }
224
225
226 public String toString() {
227 lock.lock();
228 try {
229 return q.toString();
230 }
231 finally {
232 lock.unlock();
233 }
234 }
235
236 public <T> T[] toArray(T[] a) {
237 lock.lock();
238 try {
239 return q.toArray(a);
240 }
241 finally {
242 lock.unlock();
243 }
244 }
245
246 public Iterator<E> iterator() {
247 lock.lock();
248 try {
249 return new Itr(q.iterator());
250 }
251 finally {
252 lock.unlock();
253 }
254 }
255
256 private class Itr<E> implements Iterator<E> {
257 private final Iterator<E> iter;
258 Itr(Iterator<E> i) {
259 iter = i;
260 }
261
262 public boolean hasNext() {
263 /*
264 * No sync -- we rely on underlying hasNext to be
265 * stateless, in which case we can return true by mistake
266 * only when next() willl subsequently throw
267 * ConcurrentModificationException.
268 */
269 return iter.hasNext();
270 }
271
272 public E next() {
273 lock.lock();
274 try {
275 return iter.next();
276 }
277 finally {
278 lock.unlock();
279 }
280 }
281
282 public void remove() {
283 lock.lock();
284 try {
285 iter.remove();
286 notFull.signal();
287 }
288 finally {
289 lock.unlock();
290 }
291 }
292 }
293
294 /**
295 * Save the state to a stream (that is, serialize it). This
296 * merely wraps default serialization within lock. The
297 * serialization strategy for items is left to underlying
298 * Queue. Note that locking is not needed on deserialization, so
299 * readObject is not defined, just relying on default.
300 */
301 private void writeObject(java.io.ObjectOutputStream s)
302 throws java.io.IOException {
303 lock.lock();
304 try {
305 s.defaultWriteObject();
306 }
307 finally {
308 lock.unlock();
309 }
310 }
311 }
312
313
314