ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.43
Committed: Tue May 17 06:50:55 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.42: +77 -20 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.33 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19 dl 1.20 * queue the shortest time. New elements
20     * are inserted at the tail of the queue, and the queue retrieval
21     * operations obtain elements at the head of the queue.
22 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
23     * less predictable performance in most concurrent applications.
24 tim 1.12 *
25 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
26 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
27     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 dl 1.3 * dynamically created upon each insertion unless this would bring the
29     * queue above capacity.
30 dholmes 1.8 *
31 dl 1.36 * <p>This class and its iterator implement all of the
32     * <em>optional</em> methods of the {@link Collection} and {@link
33 dl 1.38 * Iterator} interfaces.
34 dl 1.21 *
35 dl 1.34 * <p>This class is a member of the
36     * <a href="{@docRoot}/../guide/collections/index.html">
37     * Java Collections Framework</a>.
38     *
39 dl 1.6 * @since 1.5
40     * @author Doug Lea
41 dl 1.27 * @param <E> the type of elements held in this collection
42 tim 1.12 *
43 jsr166 1.40 */
44 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
46 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
47 tim 1.1
48 dl 1.2 /*
49     * A variant of the "two lock queue" algorithm. The putLock gates
50     * entry to put (and offer), and has an associated condition for
51     * waiting puts. Similarly for the takeLock. The "count" field
52     * that they both rely on is maintained as an atomic to avoid
53     * needing to get both locks in most cases. Also, to minimize need
54     * for puts to get takeLock and vice-versa, cascading notifies are
55     * used. When a put notices that it has enabled at least one take,
56     * it signals taker. That taker in turn signals others if more
57     * items have been entered since the signal. And symmetrically for
58 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
59 dl 1.2 * iterators acquire both locks.
60 dl 1.38 */
61 dl 1.2
62 dl 1.6 /**
63     * Linked list node class
64     */
65 dl 1.2 static class Node<E> {
66 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
67 dl 1.2 volatile E item;
68     Node<E> next;
69     Node(E x) { item = x; }
70     }
71    
72 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
73 dl 1.2 private final int capacity;
74 dl 1.6
75     /** Current number of elements */
76 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
77 dl 1.2
78 dl 1.6 /** Head of linked list */
79     private transient Node<E> head;
80    
81 dholmes 1.8 /** Tail of linked list */
82 dl 1.6 private transient Node<E> last;
83 dl 1.2
84 dl 1.6 /** Lock held by take, poll, etc */
85 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
86 dl 1.6
87     /** Wait queue for waiting takes */
88 dl 1.32 private final Condition notEmpty = takeLock.newCondition();
89 dl 1.2
90 dl 1.6 /** Lock held by put, offer, etc */
91 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
92 dl 1.6
93     /** Wait queue for waiting puts */
94 dl 1.32 private final Condition notFull = putLock.newCondition();
95 dl 1.2
96     /**
97 jsr166 1.40 * Signals a waiting take. Called only from put/offer (which do not
98 dl 1.4 * otherwise ordinarily lock takeLock.)
99 dl 1.2 */
100     private void signalNotEmpty() {
101 dl 1.31 final ReentrantLock takeLock = this.takeLock;
102 dl 1.2 takeLock.lock();
103     try {
104     notEmpty.signal();
105 tim 1.17 } finally {
106 dl 1.2 takeLock.unlock();
107     }
108     }
109    
110     /**
111 jsr166 1.40 * Signals a waiting put. Called only from take/poll.
112 dl 1.2 */
113     private void signalNotFull() {
114 dl 1.31 final ReentrantLock putLock = this.putLock;
115 dl 1.2 putLock.lock();
116     try {
117     notFull.signal();
118 tim 1.17 } finally {
119 dl 1.2 putLock.unlock();
120     }
121     }
122    
123     /**
124 jsr166 1.40 * Creates a node and links it at end of queue.
125 dl 1.6 * @param x the item
126 dl 1.2 */
127     private void insert(E x) {
128     last = last.next = new Node<E>(x);
129     }
130    
131     /**
132 jsr166 1.40 * Removes a node from head of queue,
133 dl 1.6 * @return the node
134 dl 1.2 */
135     private E extract() {
136     Node<E> first = head.next;
137     head = first;
138 dl 1.28 E x = first.item;
139 dl 1.2 first.item = null;
140     return x;
141     }
142    
143     /**
144 tim 1.12 * Lock to prevent both puts and takes.
145 dl 1.2 */
146     private void fullyLock() {
147     putLock.lock();
148     takeLock.lock();
149 tim 1.1 }
150 dl 1.2
151     /**
152 tim 1.12 * Unlock to allow both puts and takes.
153 dl 1.2 */
154     private void fullyUnlock() {
155     takeLock.unlock();
156     putLock.unlock();
157     }
158    
159    
160     /**
161 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 dholmes 1.8 * {@link Integer#MAX_VALUE}.
163 dl 1.2 */
164     public LinkedBlockingQueue() {
165     this(Integer.MAX_VALUE);
166     }
167    
168     /**
169 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170     *
171 jsr166 1.43 * @param capacity the capacity of this queue
172 dholmes 1.8 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173 jsr166 1.43 * than zero
174 dl 1.2 */
175     public LinkedBlockingQueue(int capacity) {
176 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
177 dl 1.2 this.capacity = capacity;
178 dl 1.6 last = head = new Node<E>(null);
179 dl 1.2 }
180    
181     /**
182 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
184 tim 1.12 * given collection,
185 dholmes 1.8 * added in traversal order of the collection's iterator.
186 jsr166 1.43 *
187 dholmes 1.9 * @param c the collection of elements to initially contain
188 jsr166 1.43 * @throws NullPointerException if the specified collection or any
189     * of its elements are null
190 dl 1.2 */
191 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
192 dl 1.2 this(Integer.MAX_VALUE);
193 dl 1.38 for (E e : c)
194     add(e);
195 dl 1.2 }
196    
197 dholmes 1.9
198 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
199     // greater in size than Integer.MAX_VALUE
200 tim 1.12 /**
201 dl 1.20 * Returns the number of elements in this queue.
202     *
203 jsr166 1.43 * @return the number of elements in this queue
204 dholmes 1.8 */
205 dl 1.2 public int size() {
206     return count.get();
207 tim 1.1 }
208 dl 1.2
209 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
210     // without the reference to unlimited queues.
211 tim 1.12 /**
212 jsr166 1.41 * Returns the number of additional elements that this queue can ideally
213     * (in the absence of memory or resource constraints) accept without
214 dholmes 1.8 * blocking. This is always equal to the initial capacity of this queue
215     * less the current <tt>size</tt> of this queue.
216 jsr166 1.41 *
217     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
218     * an element will succeed by inspecting <tt>remainingCapacity</tt>
219     * because it may be the case that another thread is about to
220 jsr166 1.43 * insert or remove an element.
221 dholmes 1.8 */
222 dl 1.2 public int remainingCapacity() {
223     return capacity - count.get();
224     }
225    
226 dholmes 1.22 /**
227     * Adds the specified element to the tail of this queue, waiting if
228     * necessary for space to become available.
229 jsr166 1.43 *
230     * @throws InterruptedException {@inheritDoc}
231     * @throws NullPointerException {@inheritDoc}
232 dholmes 1.22 */
233 jsr166 1.42 public void put(E e) throws InterruptedException {
234     if (e == null) throw new NullPointerException();
235 dl 1.2 // Note: convention in all put/take/etc is to preset
236     // local var holding count negative to indicate failure unless set.
237 tim 1.12 int c = -1;
238 dl 1.31 final ReentrantLock putLock = this.putLock;
239     final AtomicInteger count = this.count;
240 dl 1.2 putLock.lockInterruptibly();
241     try {
242     /*
243     * Note that count is used in wait guard even though it is
244     * not protected by lock. This works because count can
245     * only decrease at this point (all other puts are shut
246     * out by lock), and we (or some other waiting put) are
247     * signalled if it ever changes from
248     * capacity. Similarly for all other uses of count in
249     * other wait guards.
250     */
251     try {
252 tim 1.12 while (count.get() == capacity)
253 dl 1.2 notFull.await();
254 tim 1.17 } catch (InterruptedException ie) {
255 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
256     throw ie;
257     }
258 jsr166 1.42 insert(e);
259 dl 1.2 c = count.getAndIncrement();
260 dl 1.6 if (c + 1 < capacity)
261 dl 1.2 notFull.signal();
262 tim 1.17 } finally {
263 dl 1.2 putLock.unlock();
264     }
265 tim 1.12 if (c == 0)
266 dl 1.2 signalNotEmpty();
267 tim 1.1 }
268 dl 1.2
269 dholmes 1.22 /**
270     * Inserts the specified element at the tail of this queue, waiting if
271     * necessary up to the specified wait time for space to become available.
272 jsr166 1.43 *
273 dl 1.23 * @return <tt>true</tt> if successful, or <tt>false</tt> if
274 jsr166 1.43 * the specified waiting time elapses before space is available.
275     * @throws InterruptedException {@inheritDoc}
276     * @throws NullPointerException {@inheritDoc}
277 dholmes 1.22 */
278 jsr166 1.42 public boolean offer(E e, long timeout, TimeUnit unit)
279 dholmes 1.8 throws InterruptedException {
280 tim 1.12
281 jsr166 1.42 if (e == null) throw new NullPointerException();
282 dl 1.2 long nanos = unit.toNanos(timeout);
283     int c = -1;
284 dl 1.31 final ReentrantLock putLock = this.putLock;
285     final AtomicInteger count = this.count;
286 dholmes 1.8 putLock.lockInterruptibly();
287 dl 1.2 try {
288     for (;;) {
289     if (count.get() < capacity) {
290 jsr166 1.42 insert(e);
291 dl 1.2 c = count.getAndIncrement();
292 dl 1.6 if (c + 1 < capacity)
293 dl 1.2 notFull.signal();
294     break;
295     }
296     if (nanos <= 0)
297     return false;
298     try {
299     nanos = notFull.awaitNanos(nanos);
300 tim 1.17 } catch (InterruptedException ie) {
301 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
302     throw ie;
303     }
304     }
305 tim 1.17 } finally {
306 dl 1.2 putLock.unlock();
307     }
308 tim 1.12 if (c == 0)
309 dl 1.2 signalNotEmpty();
310     return true;
311 tim 1.1 }
312 dl 1.2
313 dl 1.23 /**
314     * Inserts the specified element at the tail of this queue if possible,
315     * returning immediately if this queue is full.
316     *
317     * @return <tt>true</tt> if it was possible to add the element to
318     * this queue, else <tt>false</tt>
319 jsr166 1.43 * @throws NullPointerException if the specified element is null
320 dl 1.23 */
321 jsr166 1.42 public boolean offer(E e) {
322     if (e == null) throw new NullPointerException();
323 dl 1.31 final AtomicInteger count = this.count;
324 dl 1.2 if (count.get() == capacity)
325     return false;
326 tim 1.12 int c = -1;
327 dl 1.31 final ReentrantLock putLock = this.putLock;
328 dholmes 1.8 putLock.lock();
329 dl 1.2 try {
330     if (count.get() < capacity) {
331 jsr166 1.42 insert(e);
332 dl 1.2 c = count.getAndIncrement();
333 dl 1.6 if (c + 1 < capacity)
334 dl 1.2 notFull.signal();
335     }
336 tim 1.17 } finally {
337 dl 1.2 putLock.unlock();
338     }
339 tim 1.12 if (c == 0)
340 dl 1.2 signalNotEmpty();
341     return c >= 0;
342 tim 1.1 }
343 dl 1.2
344    
345     public E take() throws InterruptedException {
346     E x;
347     int c = -1;
348 dl 1.31 final AtomicInteger count = this.count;
349     final ReentrantLock takeLock = this.takeLock;
350 dl 1.2 takeLock.lockInterruptibly();
351     try {
352     try {
353 tim 1.12 while (count.get() == 0)
354 dl 1.2 notEmpty.await();
355 tim 1.17 } catch (InterruptedException ie) {
356 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
357     throw ie;
358     }
359    
360     x = extract();
361     c = count.getAndDecrement();
362     if (c > 1)
363     notEmpty.signal();
364 tim 1.17 } finally {
365 dl 1.2 takeLock.unlock();
366     }
367 tim 1.12 if (c == capacity)
368 dl 1.2 signalNotFull();
369     return x;
370     }
371    
372     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
373     E x = null;
374     int c = -1;
375 dholmes 1.8 long nanos = unit.toNanos(timeout);
376 dl 1.31 final AtomicInteger count = this.count;
377     final ReentrantLock takeLock = this.takeLock;
378 dl 1.2 takeLock.lockInterruptibly();
379     try {
380     for (;;) {
381     if (count.get() > 0) {
382     x = extract();
383     c = count.getAndDecrement();
384     if (c > 1)
385     notEmpty.signal();
386     break;
387     }
388     if (nanos <= 0)
389     return null;
390     try {
391     nanos = notEmpty.awaitNanos(nanos);
392 tim 1.17 } catch (InterruptedException ie) {
393 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
394     throw ie;
395     }
396     }
397 tim 1.17 } finally {
398 dl 1.2 takeLock.unlock();
399     }
400 tim 1.12 if (c == capacity)
401 dl 1.2 signalNotFull();
402     return x;
403     }
404    
405     public E poll() {
406 dl 1.31 final AtomicInteger count = this.count;
407 dl 1.2 if (count.get() == 0)
408     return null;
409     E x = null;
410 tim 1.12 int c = -1;
411 dl 1.31 final ReentrantLock takeLock = this.takeLock;
412 dl 1.30 takeLock.lock();
413 dl 1.2 try {
414     if (count.get() > 0) {
415     x = extract();
416     c = count.getAndDecrement();
417     if (c > 1)
418     notEmpty.signal();
419     }
420 tim 1.17 } finally {
421 dl 1.2 takeLock.unlock();
422     }
423 tim 1.12 if (c == capacity)
424 dl 1.2 signalNotFull();
425     return x;
426 tim 1.1 }
427 dl 1.2
428    
429     public E peek() {
430     if (count.get() == 0)
431     return null;
432 dl 1.31 final ReentrantLock takeLock = this.takeLock;
433 dholmes 1.8 takeLock.lock();
434 dl 1.2 try {
435     Node<E> first = head.next;
436     if (first == null)
437     return null;
438     else
439     return first.item;
440 tim 1.17 } finally {
441 dl 1.2 takeLock.unlock();
442     }
443 tim 1.1 }
444    
445 dl 1.35 /**
446     * Removes a single instance of the specified element from this
447 dl 1.36 * queue, if it is present.
448 dl 1.35 */
449 dholmes 1.9 public boolean remove(Object o) {
450     if (o == null) return false;
451 dl 1.2 boolean removed = false;
452     fullyLock();
453     try {
454     Node<E> trail = head;
455     Node<E> p = head.next;
456     while (p != null) {
457 dholmes 1.9 if (o.equals(p.item)) {
458 dl 1.2 removed = true;
459     break;
460     }
461     trail = p;
462     p = p.next;
463     }
464     if (removed) {
465     p.item = null;
466     trail.next = p.next;
467 dl 1.39 if (last == p)
468     last = trail;
469 dl 1.2 if (count.getAndDecrement() == capacity)
470     notFull.signalAll();
471     }
472 tim 1.17 } finally {
473 dl 1.2 fullyUnlock();
474     }
475     return removed;
476 tim 1.1 }
477 dl 1.2
478 jsr166 1.43 /**
479     * Returns an array containing all of the elements in this queue, in
480     * proper sequence.
481     *
482     * <p>The returned array will be "safe" in that no references to it are
483     * maintained by this queue. (In other words, this method must allocate
484     * a new array). The caller is thus free to modify the returned array.
485     *
486     * <p>This method acts as bridge between array-based and collection-based
487     * APIs.
488     *
489     * @return an array containing all of the elements in this queue
490     */
491 dl 1.2 public Object[] toArray() {
492     fullyLock();
493     try {
494     int size = count.get();
495 tim 1.12 Object[] a = new Object[size];
496 dl 1.2 int k = 0;
497 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
498 dl 1.2 a[k++] = p.item;
499     return a;
500 tim 1.17 } finally {
501 dl 1.2 fullyUnlock();
502     }
503 tim 1.1 }
504 dl 1.2
505 jsr166 1.43 /**
506     * Returns an array containing all of the elements in this queue, in
507     * proper sequence; the runtime type of the returned array is that of
508     * the specified array. If the queue fits in the specified array, it
509     * is returned therein. Otherwise, a new array is allocated with the
510     * runtime type of the specified array and the size of this queue.
511     *
512     * <p>If this queue fits in the specified array with room to spare
513     * (i.e., the array has more elements than this queue), the element in
514     * the array immediately following the end of the queue is set to
515     * <tt>null</tt>.
516     *
517     * <p>Like the {@link #toArray()} method, this method acts as bridge between
518     * array-based and collection-based APIs. Further, this method allows
519     * precise control over the runtime type of the output array, and may,
520     * under certain circumstances, be used to save allocation costs.
521     *
522     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
523     * The following code can be used to dump the queue into a newly
524     * allocated array of <tt>String</tt>:
525     *
526     * <pre>
527     * String[] y = x.toArray(new String[0]);</pre>
528     *
529     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
530     * <tt>toArray()</tt>.
531     *
532     * @param a the array into which the elements of the queue are to
533     * be stored, if it is big enough; otherwise, a new array of the
534     * same runtime type is allocated for this purpose
535     * @return an array containing all of the elements in this queue
536     * @throws ArrayStoreException if the runtime type of the specified array
537     * is not a supertype of the runtime type of every element in
538     * this queue
539     * @throws NullPointerException if the specified array is null
540     */
541 dl 1.2 public <T> T[] toArray(T[] a) {
542     fullyLock();
543     try {
544     int size = count.get();
545     if (a.length < size)
546 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
547     (a.getClass().getComponentType(), size);
548 tim 1.12
549 dl 1.2 int k = 0;
550 tim 1.12 for (Node p = head.next; p != null; p = p.next)
551 dl 1.2 a[k++] = (T)p.item;
552     return a;
553 tim 1.17 } finally {
554 dl 1.2 fullyUnlock();
555     }
556 tim 1.1 }
557 dl 1.2
558     public String toString() {
559     fullyLock();
560     try {
561     return super.toString();
562 tim 1.17 } finally {
563 dl 1.2 fullyUnlock();
564     }
565 tim 1.1 }
566 dl 1.2
567 dl 1.35 /**
568     * Atomically removes all of the elements from this queue.
569     * The queue will be empty after this call returns.
570     */
571 dl 1.24 public void clear() {
572     fullyLock();
573     try {
574     head.next = null;
575 dl 1.38 assert head.item == null;
576     last = head;
577 dl 1.24 if (count.getAndSet(0) == capacity)
578     notFull.signalAll();
579     } finally {
580     fullyUnlock();
581     }
582     }
583    
584 jsr166 1.43 /**
585     * @throws UnsupportedOperationException {@inheritDoc}
586     * @throws ClassCastException {@inheritDoc}
587     * @throws NullPointerException {@inheritDoc}
588     * @throws IllegalArgumentException {@inheritDoc}
589     */
590 dl 1.24 public int drainTo(Collection<? super E> c) {
591     if (c == null)
592     throw new NullPointerException();
593     if (c == this)
594     throw new IllegalArgumentException();
595     Node first;
596     fullyLock();
597     try {
598     first = head.next;
599     head.next = null;
600 dl 1.38 assert head.item == null;
601     last = head;
602 dl 1.24 if (count.getAndSet(0) == capacity)
603     notFull.signalAll();
604     } finally {
605     fullyUnlock();
606     }
607     // Transfer the elements outside of locks
608     int n = 0;
609 dl 1.29 for (Node<E> p = first; p != null; p = p.next) {
610     c.add(p.item);
611 dl 1.24 p.item = null;
612     ++n;
613     }
614     return n;
615     }
616 jsr166 1.40
617 jsr166 1.43 /**
618     * @throws UnsupportedOperationException {@inheritDoc}
619     * @throws ClassCastException {@inheritDoc}
620     * @throws NullPointerException {@inheritDoc}
621     * @throws IllegalArgumentException {@inheritDoc}
622     */
623 dl 1.24 public int drainTo(Collection<? super E> c, int maxElements) {
624     if (c == null)
625     throw new NullPointerException();
626     if (c == this)
627     throw new IllegalArgumentException();
628     fullyLock();
629     try {
630     int n = 0;
631 dl 1.29 Node<E> p = head.next;
632 dl 1.24 while (p != null && n < maxElements) {
633 dl 1.29 c.add(p.item);
634 dl 1.24 p.item = null;
635     p = p.next;
636     ++n;
637     }
638     if (n != 0) {
639     head.next = p;
640 dl 1.38 assert head.item == null;
641     if (p == null)
642     last = head;
643 dl 1.24 if (count.getAndAdd(-n) == capacity)
644     notFull.signalAll();
645     }
646     return n;
647     } finally {
648     fullyUnlock();
649     }
650     }
651    
652 dholmes 1.14 /**
653     * Returns an iterator over the elements in this queue in proper sequence.
654 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
655 jsr166 1.40 * will never throw {@link ConcurrentModificationException},
656 dl 1.15 * and guarantees to traverse elements as they existed upon
657     * construction of the iterator, and may (but is not guaranteed to)
658     * reflect any modifications subsequent to construction.
659 dholmes 1.14 *
660 jsr166 1.43 * @return an iterator over the elements in this queue in proper sequence
661 dholmes 1.14 */
662 dl 1.2 public Iterator<E> iterator() {
663     return new Itr();
664 tim 1.1 }
665 dl 1.2
666     private class Itr implements Iterator<E> {
667 tim 1.12 /*
668 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
669     * item to hand out so that if hasNext() reports true, we will
670     * still have it to return even if lost race with a take etc.
671     */
672 dl 1.31 private Node<E> current;
673     private Node<E> lastRet;
674     private E currentElement;
675 tim 1.12
676 dl 1.2 Itr() {
677 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
678     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
679     putLock.lock();
680     takeLock.lock();
681 dl 1.2 try {
682     current = head.next;
683 dl 1.4 if (current != null)
684     currentElement = current.item;
685 tim 1.17 } finally {
686 dl 1.31 takeLock.unlock();
687     putLock.unlock();
688 dl 1.2 }
689     }
690 tim 1.12
691     public boolean hasNext() {
692 dl 1.2 return current != null;
693     }
694    
695 tim 1.12 public E next() {
696 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
697     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
698     putLock.lock();
699     takeLock.lock();
700 dl 1.2 try {
701     if (current == null)
702     throw new NoSuchElementException();
703 dl 1.4 E x = currentElement;
704 dl 1.2 lastRet = current;
705     current = current.next;
706 dl 1.4 if (current != null)
707     currentElement = current.item;
708 dl 1.2 return x;
709 tim 1.17 } finally {
710 dl 1.31 takeLock.unlock();
711     putLock.unlock();
712 dl 1.2 }
713     }
714    
715 tim 1.12 public void remove() {
716 dl 1.2 if (lastRet == null)
717 tim 1.12 throw new IllegalStateException();
718 dl 1.31 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
719     final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
720     putLock.lock();
721     takeLock.lock();
722 dl 1.2 try {
723     Node<E> node = lastRet;
724     lastRet = null;
725     Node<E> trail = head;
726     Node<E> p = head.next;
727     while (p != null && p != node) {
728     trail = p;
729     p = p.next;
730     }
731     if (p == node) {
732     p.item = null;
733     trail.next = p.next;
734 dl 1.39 if (last == p)
735     last = trail;
736 dl 1.2 int c = count.getAndDecrement();
737     if (c == capacity)
738     notFull.signalAll();
739     }
740 tim 1.17 } finally {
741 dl 1.31 takeLock.unlock();
742     putLock.unlock();
743 dl 1.2 }
744     }
745 tim 1.1 }
746 dl 1.2
747     /**
748     * Save the state to a stream (that is, serialize it).
749     *
750     * @serialData The capacity is emitted (int), followed by all of
751     * its elements (each an <tt>Object</tt>) in the proper order,
752     * followed by a null
753 dl 1.6 * @param s the stream
754 dl 1.2 */
755     private void writeObject(java.io.ObjectOutputStream s)
756     throws java.io.IOException {
757    
758 tim 1.12 fullyLock();
759 dl 1.2 try {
760     // Write out any hidden stuff, plus capacity
761     s.defaultWriteObject();
762    
763     // Write out all elements in the proper order.
764 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
765 dl 1.2 s.writeObject(p.item);
766    
767     // Use trailing null as sentinel
768     s.writeObject(null);
769 tim 1.17 } finally {
770 dl 1.2 fullyUnlock();
771     }
772 tim 1.1 }
773    
774 dl 1.2 /**
775 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
776 dl 1.2 * deserialize it).
777 dl 1.6 * @param s the stream
778 dl 1.2 */
779     private void readObject(java.io.ObjectInputStream s)
780     throws java.io.IOException, ClassNotFoundException {
781 tim 1.12 // Read in capacity, and any hidden stuff
782     s.defaultReadObject();
783 dl 1.2
784 dl 1.19 count.set(0);
785     last = head = new Node<E>(null);
786    
787 dl 1.6 // Read in all elements and place in queue
788 dl 1.2 for (;;) {
789     E item = (E)s.readObject();
790     if (item == null)
791     break;
792     add(item);
793     }
794 tim 1.1 }
795     }