17 |
|
* way to prevent unlmited queue expansion. Linked nodes are |
18 |
|
* dynamically created upon each insertion unless this would bring the |
19 |
|
* queue above capacity. |
20 |
+ |
* @since 1.5 |
21 |
+ |
* @author Doug Lea |
22 |
|
* |
23 |
|
**/ |
24 |
|
public class LinkedBlockingQueue<E> extends AbstractQueue<E> |
38 |
|
* iterators acquire both locks. |
39 |
|
*/ |
40 |
|
|
41 |
+ |
/** |
42 |
+ |
* Linked list node class |
43 |
+ |
*/ |
44 |
|
static class Node<E> { |
45 |
+ |
/** The item, volatile to ensure barrier separating write and read */ |
46 |
|
volatile E item; |
47 |
|
Node<E> next; |
48 |
|
Node(E x) { item = x; } |
49 |
|
} |
50 |
|
|
51 |
+ |
/** The capacity bound, or Integer.MAX_VALUE if none */ |
52 |
|
private final int capacity; |
53 |
+ |
|
54 |
+ |
/** Current number of elements */ |
55 |
|
private transient final AtomicInteger count = new AtomicInteger(0); |
56 |
|
|
57 |
< |
private transient Node<E> head = new Node<E>(null); |
58 |
< |
private transient Node<E> last = head; |
57 |
> |
/** Head of linked list */ |
58 |
> |
private transient Node<E> head; |
59 |
> |
|
60 |
> |
/** Tail of lined list */ |
61 |
> |
private transient Node<E> last; |
62 |
|
|
63 |
+ |
/** Lock held by take, poll, etc */ |
64 |
|
private final ReentrantLock takeLock = new ReentrantLock(); |
65 |
+ |
|
66 |
+ |
/** Wait queue for waiting takes */ |
67 |
|
private final Condition notEmpty = takeLock.newCondition(); |
68 |
|
|
69 |
+ |
/** Lock held by put, offer, etc */ |
70 |
|
private final ReentrantLock putLock = new ReentrantLock(); |
55 |
– |
private final Condition notFull = putLock.newCondition(); |
71 |
|
|
72 |
+ |
/** Wait queue for waiting puts */ |
73 |
+ |
private final Condition notFull = putLock.newCondition(); |
74 |
|
|
75 |
|
/** |
76 |
|
* Signal a waiting take. Called only from put/offer (which do not |
101 |
|
|
102 |
|
/** |
103 |
|
* Create a node and link it and end of queue |
104 |
+ |
* @param x the item |
105 |
|
*/ |
106 |
|
private void insert(E x) { |
107 |
|
last = last.next = new Node<E>(x); |
109 |
|
|
110 |
|
/** |
111 |
|
* Remove a node from head of queue, |
112 |
+ |
* @return the node |
113 |
|
*/ |
114 |
|
private E extract() { |
115 |
|
Node<E> first = head.next; |
145 |
|
|
146 |
|
/** |
147 |
|
* Creates a LinkedBlockingQueue with the given capacity constraint. |
148 |
+ |
* @param capacity the maminum number of elements to hold without blocking. |
149 |
|
*/ |
150 |
|
public LinkedBlockingQueue(int capacity) { |
151 |
< |
if (capacity <= 0) throw new IllegalArgumentException(); |
151 |
> |
if (capacity <= 0) throw new NullPointerException(); |
152 |
|
this.capacity = capacity; |
153 |
+ |
last = head = new Node<E>(null); |
154 |
|
} |
155 |
|
|
156 |
|
/** |
157 |
|
* Creates a LinkedBlockingQueue without an intrinsic capacity |
158 |
< |
* constraint, initially holding the given elements. |
158 |
> |
* constraint, initially holding the given elements, added in |
159 |
> |
* traveral order of the collection's iterator. |
160 |
> |
* @param initialElements the elements to initially contain |
161 |
|
*/ |
162 |
|
public LinkedBlockingQueue(Collection<E> initialElements) { |
163 |
|
this(Integer.MAX_VALUE); |
174 |
|
} |
175 |
|
|
176 |
|
public void put(E x) throws InterruptedException { |
177 |
< |
if (x == null) throw new IllegalArgumentException(); |
177 |
> |
if (x == null) throw new NullPointerException(); |
178 |
|
// Note: convention in all put/take/etc is to preset |
179 |
|
// local var holding count negative to indicate failure unless set. |
180 |
|
int c = -1; |
199 |
|
} |
200 |
|
insert(x); |
201 |
|
c = count.getAndIncrement(); |
202 |
< |
if (c+1 < capacity) |
202 |
> |
if (c + 1 < capacity) |
203 |
|
notFull.signal(); |
204 |
|
} |
205 |
|
finally { |
210 |
|
} |
211 |
|
|
212 |
|
public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException { |
213 |
< |
if (x == null) throw new IllegalArgumentException(); |
213 |
> |
if (x == null) throw new NullPointerException(); |
214 |
|
putLock.lockInterruptibly(); |
215 |
|
long nanos = unit.toNanos(timeout); |
216 |
|
int c = -1; |
219 |
|
if (count.get() < capacity) { |
220 |
|
insert(x); |
221 |
|
c = count.getAndIncrement(); |
222 |
< |
if (c+1 < capacity) |
222 |
> |
if (c + 1 < capacity) |
223 |
|
notFull.signal(); |
224 |
|
break; |
225 |
|
} |
243 |
|
} |
244 |
|
|
245 |
|
public boolean offer(E x) { |
246 |
< |
if (x == null) throw new IllegalArgumentException(); |
246 |
> |
if (x == null) throw new NullPointerException(); |
247 |
|
if (count.get() == capacity) |
248 |
|
return false; |
249 |
|
putLock.tryLock(); |
252 |
|
if (count.get() < capacity) { |
253 |
|
insert(x); |
254 |
|
c = count.getAndIncrement(); |
255 |
< |
if (c+1 < capacity) |
255 |
> |
if (c + 1 < capacity) |
256 |
|
notFull.signal(); |
257 |
|
} |
258 |
|
} |
293 |
|
} |
294 |
|
|
295 |
|
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
273 |
– |
|
296 |
|
E x = null; |
297 |
|
int c = -1; |
298 |
|
takeLock.lockInterruptibly(); |
516 |
|
* @serialData The capacity is emitted (int), followed by all of |
517 |
|
* its elements (each an <tt>Object</tt>) in the proper order, |
518 |
|
* followed by a null |
519 |
+ |
* @param s the stream |
520 |
|
*/ |
521 |
|
private void writeObject(java.io.ObjectOutputStream s) |
522 |
|
throws java.io.IOException { |
541 |
|
/** |
542 |
|
* Reconstitute the Queue instance from a stream (that is, |
543 |
|
* deserialize it). |
544 |
+ |
* @param s the stream |
545 |
|
*/ |
546 |
|
private void readObject(java.io.ObjectInputStream s) |
547 |
|
throws java.io.IOException, ClassNotFoundException { |
548 |
|
// Read in capacity, and any hidden stuff |
549 |
|
s.defaultReadObject(); |
550 |
|
|
551 |
< |
// Read in all elements and place in queue |
551 |
> |
// Read in all elements and place in queue |
552 |
|
for (;;) { |
553 |
|
E item = (E)s.readObject(); |
554 |
|
if (item == null) |