ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.4
Committed: Tue Feb 5 19:54:07 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +1 -1 lines
Log Message:
javadoc style

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.util.concurrent.atomic.AtomicInteger;
10     import java.util.concurrent.locks.Condition;
11     import java.util.concurrent.locks.ReentrantLock;
12     import java.util.AbstractQueue;
13     import java.util.Collection;
14     import java.util.Iterator;
15     import java.util.NoSuchElementException;
16    
17     /**
18     * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
19     * linked nodes.
20     * This queue orders elements FIFO (first-in-first-out).
21     * The <em>head</em> of the queue is that element that has been on the
22     * queue the longest time.
23     * The <em>tail</em> of the queue is that element that has been on the
24     * queue the shortest time. New elements
25     * are inserted at the tail of the queue, and the queue retrieval
26     * operations obtain elements at the head of the queue.
27     * Linked queues typically have higher throughput than array-based queues but
28     * less predictable performance in most concurrent applications.
29     *
30     * <p>The optional capacity bound constructor argument serves as a
31     * way to prevent excessive queue expansion. The capacity, if unspecified,
32     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
33     * dynamically created upon each insertion unless this would bring the
34     * queue above capacity.
35     *
36     * <p>This class and its iterator implement all of the
37     * <em>optional</em> methods of the {@link Collection} and {@link
38     * Iterator} interfaces.
39     *
40     * <p>This class is a member of the
41     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
42     * Java Collections Framework</a>.
43     *
44     * @since 1.5
45     * @author Doug Lea
46     * @param <E> the type of elements held in this collection
47     */
48     public class LinkedBlockingQueue<E> extends AbstractQueue<E>
49     implements BlockingQueue<E>, java.io.Serializable {
50     private static final long serialVersionUID = -6903933977591709194L;
51    
52     /*
53     * A variant of the "two lock queue" algorithm. The putLock gates
54     * entry to put (and offer), and has an associated condition for
55     * waiting puts. Similarly for the takeLock. The "count" field
56     * that they both rely on is maintained as an atomic to avoid
57     * needing to get both locks in most cases. Also, to minimize need
58     * for puts to get takeLock and vice-versa, cascading notifies are
59     * used. When a put notices that it has enabled at least one take,
60     * it signals taker. That taker in turn signals others if more
61     * items have been entered since the signal. And symmetrically for
62     * takes signalling puts. Operations such as remove(Object) and
63     * iterators acquire both locks.
64     *
65     * Visibility between writers and readers is provided as follows:
66     *
67     * Whenever an element is enqueued, the putLock is acquired and
68     * count updated. A subsequent reader guarantees visibility to the
69     * enqueued Node by either acquiring the putLock (via fullyLock)
70     * or by acquiring the takeLock, and then reading n = count.get();
71     * this gives visibility to the first n items.
72     *
73     * To implement weakly consistent iterators, it appears we need to
74     * keep all Nodes GC-reachable from a predecessor dequeued Node.
75     * That would cause two problems:
76     * - allow a rogue Iterator to cause unbounded memory retention
77     * - cause cross-generational linking of old Nodes to new Nodes if
78     * a Node was tenured while live, which generational GCs have a
79     * hard time dealing with, causing repeated major collections.
80     * However, only non-deleted Nodes need to be reachable from
81     * dequeued Nodes, and reachability does not necessarily have to
82     * be of the kind understood by the GC. We use the trick of
83     * linking a Node that has just been dequeued to itself. Such a
84     * self-link implicitly means to advance to head.next.
85     */
86    
87     /**
88     * Linked list node class
89     */
90     static class Node<E> {
91     E item;
92    
93     /**
94     * One of:
95     * - the real successor Node
96     * - this Node, meaning the successor is head.next
97     * - null, meaning there is no successor (this is the last node)
98     */
99     Node<E> next;
100    
101     Node(E x) { item = x; }
102     }
103    
104     /** The capacity bound, or Integer.MAX_VALUE if none */
105     private final int capacity;
106    
107     /** Current number of elements */
108     private final AtomicInteger count = new AtomicInteger();
109    
110     /**
111     * Head of linked list.
112     * Invariant: head.item == null
113     */
114     transient Node<E> head;
115    
116     /**
117     * Tail of linked list.
118     * Invariant: last.next == null
119     */
120     private transient Node<E> last;
121    
122     /** Lock held by take, poll, etc */
123     private final ReentrantLock takeLock = new ReentrantLock();
124    
125     /** Wait queue for waiting takes */
126     private final Condition notEmpty = takeLock.newCondition();
127    
128     /** Lock held by put, offer, etc */
129     private final ReentrantLock putLock = new ReentrantLock();
130    
131     /** Wait queue for waiting puts */
132     private final Condition notFull = putLock.newCondition();
133    
134     /**
135     * Signals a waiting take. Called only from put/offer (which do not
136     * otherwise ordinarily lock takeLock.)
137     */
138     private void signalNotEmpty() {
139     final ReentrantLock takeLock = this.takeLock;
140     takeLock.lock();
141     try {
142     notEmpty.signal();
143     } finally {
144     takeLock.unlock();
145     }
146     }
147    
148     /**
149     * Signals a waiting put. Called only from take/poll.
150     */
151     private void signalNotFull() {
152     final ReentrantLock putLock = this.putLock;
153     putLock.lock();
154     try {
155     notFull.signal();
156     } finally {
157     putLock.unlock();
158     }
159     }
160    
161     /**
162     * Links node at end of queue.
163     *
164     * @param node the node
165     */
166     private void enqueue(Node<E> node) {
167     // assert putLock.isHeldByCurrentThread();
168     // assert last.next == null;
169     last = last.next = node;
170     }
171    
172     /**
173     * Removes a node from head of queue.
174     *
175     * @return the node
176     */
177     private E dequeue() {
178     // assert takeLock.isHeldByCurrentThread();
179     // assert head.item == null;
180     Node<E> h = head;
181     Node<E> first = h.next;
182     h.next = h; // help GC
183     head = first;
184     E x = first.item;
185     first.item = null;
186     return x;
187     }
188    
189     /**
190 jsr166 1.2 * Locks to prevent both puts and takes.
191 dl 1.1 */
192     void fullyLock() {
193     putLock.lock();
194     takeLock.lock();
195     }
196    
197     /**
198 jsr166 1.2 * Unlocks to allow both puts and takes.
199 dl 1.1 */
200     void fullyUnlock() {
201     takeLock.unlock();
202     putLock.unlock();
203     }
204    
205     // /**
206     // * Tells whether both locks are held by current thread.
207     // */
208     // boolean isFullyLocked() {
209     // return (putLock.isHeldByCurrentThread() &&
210     // takeLock.isHeldByCurrentThread());
211     // }
212    
213     /**
214     * Creates a {@code LinkedBlockingQueue} with a capacity of
215     * {@link Integer#MAX_VALUE}.
216     */
217     public LinkedBlockingQueue() {
218     this(Integer.MAX_VALUE);
219     }
220    
221     /**
222     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
223     *
224     * @param capacity the capacity of this queue
225     * @throws IllegalArgumentException if {@code capacity} is not greater
226     * than zero
227     */
228     public LinkedBlockingQueue(int capacity) {
229     if (capacity <= 0) throw new IllegalArgumentException();
230     this.capacity = capacity;
231     last = head = new Node<E>(null);
232     }
233    
234     /**
235     * Creates a {@code LinkedBlockingQueue} with a capacity of
236     * {@link Integer#MAX_VALUE}, initially containing the elements of the
237     * given collection,
238     * added in traversal order of the collection's iterator.
239     *
240     * @param c the collection of elements to initially contain
241     * @throws NullPointerException if the specified collection or any
242     * of its elements are null
243     */
244     public LinkedBlockingQueue(Collection<? extends E> c) {
245     this(Integer.MAX_VALUE);
246     final ReentrantLock putLock = this.putLock;
247     putLock.lock(); // Never contended, but necessary for visibility
248     try {
249     int n = 0;
250     for (E e : c) {
251     if (e == null)
252     throw new NullPointerException();
253     if (n == capacity)
254     throw new IllegalStateException("Queue full");
255     enqueue(new Node<E>(e));
256     ++n;
257     }
258     count.set(n);
259     } finally {
260     putLock.unlock();
261     }
262     }
263    
264     // this doc comment is overridden to remove the reference to collections
265     // greater in size than Integer.MAX_VALUE
266     /**
267     * Returns the number of elements in this queue.
268     *
269     * @return the number of elements in this queue
270     */
271     public int size() {
272     return count.get();
273     }
274    
275     // this doc comment is a modified copy of the inherited doc comment,
276     // without the reference to unlimited queues.
277     /**
278     * Returns the number of additional elements that this queue can ideally
279     * (in the absence of memory or resource constraints) accept without
280     * blocking. This is always equal to the initial capacity of this queue
281     * less the current {@code size} of this queue.
282     *
283     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
284     * an element will succeed by inspecting {@code remainingCapacity}
285     * because it may be the case that another thread is about to
286     * insert or remove an element.
287     */
288     public int remainingCapacity() {
289     return capacity - count.get();
290     }
291    
292     /**
293     * Inserts the specified element at the tail of this queue, waiting if
294     * necessary for space to become available.
295     *
296     * @throws InterruptedException {@inheritDoc}
297     * @throws NullPointerException {@inheritDoc}
298     */
299     public void put(E e) throws InterruptedException {
300     if (e == null) throw new NullPointerException();
301     // Note: convention in all put/take/etc is to preset local var
302     // holding count negative to indicate failure unless set.
303     int c = -1;
304     Node<E> node = new Node<E>(e);
305     final ReentrantLock putLock = this.putLock;
306     final AtomicInteger count = this.count;
307     putLock.lockInterruptibly();
308     try {
309     /*
310     * Note that count is used in wait guard even though it is
311     * not protected by lock. This works because count can
312     * only decrease at this point (all other puts are shut
313     * out by lock), and we (or some other waiting put) are
314     * signalled if it ever changes from capacity. Similarly
315     * for all other uses of count in other wait guards.
316     */
317     while (count.get() == capacity) {
318     notFull.await();
319     }
320     enqueue(node);
321     c = count.getAndIncrement();
322     if (c + 1 < capacity)
323     notFull.signal();
324     } finally {
325     putLock.unlock();
326     }
327     if (c == 0)
328     signalNotEmpty();
329     }
330    
331     /**
332     * Inserts the specified element at the tail of this queue, waiting if
333     * necessary up to the specified wait time for space to become available.
334     *
335     * @return {@code true} if successful, or {@code false} if
336 jsr166 1.4 * the specified waiting time elapses before space is available
337 dl 1.1 * @throws InterruptedException {@inheritDoc}
338     * @throws NullPointerException {@inheritDoc}
339     */
340     public boolean offer(E e, long timeout, TimeUnit unit)
341     throws InterruptedException {
342    
343     if (e == null) throw new NullPointerException();
344     long nanos = unit.toNanos(timeout);
345     int c = -1;
346     final ReentrantLock putLock = this.putLock;
347     final AtomicInteger count = this.count;
348     putLock.lockInterruptibly();
349     try {
350     while (count.get() == capacity) {
351     if (nanos <= 0)
352     return false;
353     nanos = notFull.awaitNanos(nanos);
354     }
355     enqueue(new Node<E>(e));
356     c = count.getAndIncrement();
357     if (c + 1 < capacity)
358     notFull.signal();
359     } finally {
360     putLock.unlock();
361     }
362     if (c == 0)
363     signalNotEmpty();
364     return true;
365     }
366    
367     /**
368     * Inserts the specified element at the tail of this queue if it is
369     * possible to do so immediately without exceeding the queue's capacity,
370     * returning {@code true} upon success and {@code false} if this queue
371     * is full.
372     * When using a capacity-restricted queue, this method is generally
373     * preferable to method {@link BlockingQueue#add add}, which can fail to
374     * insert an element only by throwing an exception.
375     *
376     * @throws NullPointerException if the specified element is null
377     */
378     public boolean offer(E e) {
379     if (e == null) throw new NullPointerException();
380     final AtomicInteger count = this.count;
381     if (count.get() == capacity)
382     return false;
383     int c = -1;
384     Node<E> node = new Node<E>(e);
385     final ReentrantLock putLock = this.putLock;
386     putLock.lock();
387     try {
388     if (count.get() < capacity) {
389     enqueue(node);
390     c = count.getAndIncrement();
391     if (c + 1 < capacity)
392     notFull.signal();
393     }
394     } finally {
395     putLock.unlock();
396     }
397     if (c == 0)
398     signalNotEmpty();
399     return c >= 0;
400     }
401    
402     public E take() throws InterruptedException {
403     E x;
404     int c = -1;
405     final AtomicInteger count = this.count;
406     final ReentrantLock takeLock = this.takeLock;
407     takeLock.lockInterruptibly();
408     try {
409     while (count.get() == 0) {
410     notEmpty.await();
411     }
412     x = dequeue();
413     c = count.getAndDecrement();
414     if (c > 1)
415     notEmpty.signal();
416     } finally {
417     takeLock.unlock();
418     }
419     if (c == capacity)
420     signalNotFull();
421     return x;
422     }
423    
424     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
425     E x = null;
426     int c = -1;
427     long nanos = unit.toNanos(timeout);
428     final AtomicInteger count = this.count;
429     final ReentrantLock takeLock = this.takeLock;
430     takeLock.lockInterruptibly();
431     try {
432     while (count.get() == 0) {
433     if (nanos <= 0)
434     return null;
435     nanos = notEmpty.awaitNanos(nanos);
436     }
437     x = dequeue();
438     c = count.getAndDecrement();
439     if (c > 1)
440     notEmpty.signal();
441     } finally {
442     takeLock.unlock();
443     }
444     if (c == capacity)
445     signalNotFull();
446     return x;
447     }
448    
449     public E poll() {
450     final AtomicInteger count = this.count;
451     if (count.get() == 0)
452     return null;
453     E x = null;
454     int c = -1;
455     final ReentrantLock takeLock = this.takeLock;
456     takeLock.lock();
457     try {
458     if (count.get() > 0) {
459     x = dequeue();
460     c = count.getAndDecrement();
461     if (c > 1)
462     notEmpty.signal();
463     }
464     } finally {
465     takeLock.unlock();
466     }
467     if (c == capacity)
468     signalNotFull();
469     return x;
470     }
471    
472     public E peek() {
473     if (count.get() == 0)
474     return null;
475     final ReentrantLock takeLock = this.takeLock;
476     takeLock.lock();
477     try {
478     Node<E> first = head.next;
479     if (first == null)
480     return null;
481     else
482     return first.item;
483     } finally {
484     takeLock.unlock();
485     }
486     }
487    
488     /**
489     * Unlinks interior Node p with predecessor trail.
490     */
491     void unlink(Node<E> p, Node<E> trail) {
492     // assert isFullyLocked();
493     // p.next is not changed, to allow iterators that are
494     // traversing p to maintain their weak-consistency guarantee.
495     p.item = null;
496     trail.next = p.next;
497     if (last == p)
498     last = trail;
499     if (count.getAndDecrement() == capacity)
500     notFull.signal();
501     }
502    
503     /**
504     * Removes a single instance of the specified element from this queue,
505     * if it is present. More formally, removes an element {@code e} such
506     * that {@code o.equals(e)}, if this queue contains one or more such
507     * elements.
508     * Returns {@code true} if this queue contained the specified element
509     * (or equivalently, if this queue changed as a result of the call).
510     *
511     * @param o element to be removed from this queue, if present
512     * @return {@code true} if this queue changed as a result of the call
513     */
514     public boolean remove(Object o) {
515     if (o == null) return false;
516     fullyLock();
517     try {
518     for (Node<E> trail = head, p = trail.next;
519     p != null;
520     trail = p, p = p.next) {
521     if (o.equals(p.item)) {
522     unlink(p, trail);
523     return true;
524     }
525     }
526     return false;
527     } finally {
528     fullyUnlock();
529     }
530     }
531    
532     /**
533     * Returns {@code true} if this queue contains the specified element.
534     * More formally, returns {@code true} if and only if this queue contains
535     * at least one element {@code e} such that {@code o.equals(e)}.
536     *
537     * @param o object to be checked for containment in this queue
538     * @return {@code true} if this queue contains the specified element
539     */
540     public boolean contains(Object o) {
541     if (o == null) return false;
542     fullyLock();
543     try {
544     for (Node<E> p = head.next; p != null; p = p.next)
545     if (o.equals(p.item))
546     return true;
547     return false;
548     } finally {
549     fullyUnlock();
550     }
551     }
552    
553     /**
554     * Returns an array containing all of the elements in this queue, in
555     * proper sequence.
556     *
557     * <p>The returned array will be "safe" in that no references to it are
558     * maintained by this queue. (In other words, this method must allocate
559     * a new array). The caller is thus free to modify the returned array.
560     *
561     * <p>This method acts as bridge between array-based and collection-based
562     * APIs.
563     *
564     * @return an array containing all of the elements in this queue
565     */
566     public Object[] toArray() {
567     fullyLock();
568     try {
569     int size = count.get();
570     Object[] a = new Object[size];
571     int k = 0;
572     for (Node<E> p = head.next; p != null; p = p.next)
573     a[k++] = p.item;
574     return a;
575     } finally {
576     fullyUnlock();
577     }
578     }
579    
580     /**
581     * Returns an array containing all of the elements in this queue, in
582     * proper sequence; the runtime type of the returned array is that of
583     * the specified array. If the queue fits in the specified array, it
584     * is returned therein. Otherwise, a new array is allocated with the
585     * runtime type of the specified array and the size of this queue.
586     *
587     * <p>If this queue fits in the specified array with room to spare
588     * (i.e., the array has more elements than this queue), the element in
589     * the array immediately following the end of the queue is set to
590     * {@code null}.
591     *
592     * <p>Like the {@link #toArray()} method, this method acts as bridge between
593     * array-based and collection-based APIs. Further, this method allows
594     * precise control over the runtime type of the output array, and may,
595     * under certain circumstances, be used to save allocation costs.
596     *
597     * <p>Suppose {@code x} is a queue known to contain only strings.
598     * The following code can be used to dump the queue into a newly
599     * allocated array of {@code String}:
600     *
601     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
602     *
603     * Note that {@code toArray(new Object[0])} is identical in function to
604     * {@code toArray()}.
605     *
606     * @param a the array into which the elements of the queue are to
607     * be stored, if it is big enough; otherwise, a new array of the
608     * same runtime type is allocated for this purpose
609     * @return an array containing all of the elements in this queue
610     * @throws ArrayStoreException if the runtime type of the specified array
611     * is not a supertype of the runtime type of every element in
612     * this queue
613     * @throws NullPointerException if the specified array is null
614     */
615     @SuppressWarnings("unchecked")
616     public <T> T[] toArray(T[] a) {
617     fullyLock();
618     try {
619     int size = count.get();
620     if (a.length < size)
621     a = (T[])java.lang.reflect.Array.newInstance
622     (a.getClass().getComponentType(), size);
623    
624     int k = 0;
625     for (Node<E> p = head.next; p != null; p = p.next)
626     a[k++] = (T)p.item;
627     if (a.length > k)
628     a[k] = null;
629     return a;
630     } finally {
631     fullyUnlock();
632     }
633     }
634    
635     public String toString() {
636     fullyLock();
637     try {
638     Node<E> p = head.next;
639     if (p == null)
640     return "[]";
641    
642     StringBuilder sb = new StringBuilder();
643     sb.append('[');
644     for (;;) {
645     E e = p.item;
646     sb.append(e == this ? "(this Collection)" : e);
647     p = p.next;
648     if (p == null)
649     return sb.append(']').toString();
650     sb.append(',').append(' ');
651     }
652     } finally {
653     fullyUnlock();
654     }
655     }
656    
657     /**
658     * Atomically removes all of the elements from this queue.
659     * The queue will be empty after this call returns.
660     */
661     public void clear() {
662     fullyLock();
663     try {
664     for (Node<E> p, h = head; (p = h.next) != null; h = p) {
665     h.next = h;
666     p.item = null;
667     }
668     head = last;
669     // assert head.item == null && head.next == null;
670     if (count.getAndSet(0) == capacity)
671     notFull.signal();
672     } finally {
673     fullyUnlock();
674     }
675     }
676    
677     /**
678     * @throws UnsupportedOperationException {@inheritDoc}
679     * @throws ClassCastException {@inheritDoc}
680     * @throws NullPointerException {@inheritDoc}
681     * @throws IllegalArgumentException {@inheritDoc}
682     */
683     public int drainTo(Collection<? super E> c) {
684     return drainTo(c, Integer.MAX_VALUE);
685     }
686    
687     /**
688     * @throws UnsupportedOperationException {@inheritDoc}
689     * @throws ClassCastException {@inheritDoc}
690     * @throws NullPointerException {@inheritDoc}
691     * @throws IllegalArgumentException {@inheritDoc}
692     */
693     public int drainTo(Collection<? super E> c, int maxElements) {
694     if (c == null)
695     throw new NullPointerException();
696     if (c == this)
697     throw new IllegalArgumentException();
698     if (maxElements <= 0)
699     return 0;
700     boolean signalNotFull = false;
701     final ReentrantLock takeLock = this.takeLock;
702     takeLock.lock();
703     try {
704     int n = Math.min(maxElements, count.get());
705     // count.get provides visibility to first n Nodes
706     Node<E> h = head;
707     int i = 0;
708     try {
709     while (i < n) {
710     Node<E> p = h.next;
711     c.add(p.item);
712     p.item = null;
713     h.next = h;
714     h = p;
715     ++i;
716     }
717     return n;
718     } finally {
719     // Restore invariants even if c.add() threw
720     if (i > 0) {
721     // assert h.item == null;
722     head = h;
723     signalNotFull = (count.getAndAdd(-i) == capacity);
724     }
725     }
726     } finally {
727     takeLock.unlock();
728     if (signalNotFull)
729     signalNotFull();
730     }
731     }
732    
733     /**
734     * Returns an iterator over the elements in this queue in proper sequence.
735     * The elements will be returned in order from first (head) to last (tail).
736     *
737     * <p>The returned iterator is a "weakly consistent" iterator that
738     * will never throw {@link java.util.ConcurrentModificationException
739     * ConcurrentModificationException}, and guarantees to traverse
740     * elements as they existed upon construction of the iterator, and
741     * may (but is not guaranteed to) reflect any modifications
742     * subsequent to construction.
743     *
744     * @return an iterator over the elements in this queue in proper sequence
745     */
746     public Iterator<E> iterator() {
747     return new Itr();
748     }
749    
750     private class Itr implements Iterator<E> {
751     /*
752     * Basic weakly-consistent iterator. At all times hold the next
753     * item to hand out so that if hasNext() reports true, we will
754     * still have it to return even if lost race with a take etc.
755     */
756 jsr166 1.3
757 dl 1.1 private Node<E> current;
758     private Node<E> lastRet;
759     private E currentElement;
760    
761     Itr() {
762     fullyLock();
763     try {
764     current = head.next;
765     if (current != null)
766     currentElement = current.item;
767     } finally {
768     fullyUnlock();
769     }
770     }
771    
772     public boolean hasNext() {
773     return current != null;
774     }
775    
776     /**
777     * Returns the next live successor of p, or null if no such.
778     *
779     * Unlike other traversal methods, iterators need to handle both:
780     * - dequeued nodes (p.next == p)
781     * - (possibly multiple) interior removed nodes (p.item == null)
782     */
783     private Node<E> nextNode(Node<E> p) {
784     for (;;) {
785     Node<E> s = p.next;
786     if (s == p)
787     return head.next;
788     if (s == null || s.item != null)
789     return s;
790     p = s;
791     }
792     }
793    
794     public E next() {
795     fullyLock();
796     try {
797     if (current == null)
798     throw new NoSuchElementException();
799     E x = currentElement;
800     lastRet = current;
801     current = nextNode(current);
802     currentElement = (current == null) ? null : current.item;
803     return x;
804     } finally {
805     fullyUnlock();
806     }
807     }
808    
809     public void remove() {
810     if (lastRet == null)
811     throw new IllegalStateException();
812     fullyLock();
813     try {
814     Node<E> node = lastRet;
815     lastRet = null;
816     for (Node<E> trail = head, p = trail.next;
817     p != null;
818     trail = p, p = p.next) {
819     if (p == node) {
820     unlink(p, trail);
821     break;
822     }
823     }
824     } finally {
825     fullyUnlock();
826     }
827     }
828     }
829    
830     /**
831     * Saves this queue to a stream (that is, serializes it).
832     *
833     * @serialData The capacity is emitted (int), followed by all of
834     * its elements (each an {@code Object}) in the proper order,
835     * followed by a null
836     */
837     private void writeObject(java.io.ObjectOutputStream s)
838     throws java.io.IOException {
839    
840     fullyLock();
841     try {
842     // Write out any hidden stuff, plus capacity
843     s.defaultWriteObject();
844    
845     // Write out all elements in the proper order.
846     for (Node<E> p = head.next; p != null; p = p.next)
847     s.writeObject(p.item);
848    
849     // Use trailing null as sentinel
850     s.writeObject(null);
851     } finally {
852     fullyUnlock();
853     }
854     }
855    
856     /**
857     * Reconstitutes this queue from a stream (that is, deserializes it).
858     */
859     private void readObject(java.io.ObjectInputStream s)
860     throws java.io.IOException, ClassNotFoundException {
861     // Read in capacity, and any hidden stuff
862     s.defaultReadObject();
863    
864     count.set(0);
865     last = head = new Node<E>(null);
866    
867     // Read in all elements and place in queue
868     for (;;) {
869     @SuppressWarnings("unchecked")
870     E item = (E)s.readObject();
871     if (item == null)
872     break;
873     add(item);
874     }
875     }
876     }