ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java (file contents):
Revision 1.5 by dl, Sun Jun 22 23:51:37 2003 UTC vs.
Revision 1.6 by dl, Tue Jun 24 14:34:48 2003 UTC

# Line 17 | Line 17 | import java.util.*;
17   * way to prevent unlmited queue expansion.  Linked nodes are
18   * dynamically created upon each insertion unless this would bring the
19   * queue above capacity.
20 + * @since 1.5
21 + * @author Doug Lea
22   *
23   **/
24   public class LinkedBlockingQueue<E> extends AbstractQueue<E>
# Line 36 | Line 38 | public class LinkedBlockingQueue<E> exte
38       * iterators acquire both locks.
39      */
40  
41 +    /**
42 +     * Linked list node class
43 +     */
44      static class Node<E> {
45 +        /** The item, volatile to ensure barrier separating write and read */
46          volatile E item;
47          Node<E> next;
48          Node(E x) { item = x; }
49      }
50  
51 +    /** The capacity bound, or Integer.MAX_VALUE if none */
52      private final int capacity;
53 +
54 +    /** Current number of elements */
55      private transient final AtomicInteger count = new AtomicInteger(0);
56  
57 <    private transient Node<E> head = new Node<E>(null);
58 <    private transient Node<E> last = head;
57 >    /** Head of linked list */
58 >    private transient Node<E> head;
59 >
60 >    /** Tail of lined list */
61 >    private transient Node<E> last;
62  
63 +    /** Lock held by take, poll, etc */
64      private final ReentrantLock takeLock = new ReentrantLock();
65 +
66 +    /** Wait queue for waiting takes */
67      private final Condition notEmpty = takeLock.newCondition();
68  
69 +    /** Lock held by put, offer, etc */
70      private final ReentrantLock putLock = new ReentrantLock();
55    private final Condition notFull = putLock.newCondition();
71  
72 +    /** Wait queue for waiting puts */
73 +    private final Condition notFull = putLock.newCondition();
74  
75      /**
76       * Signal a waiting take. Called only from put/offer (which do not
# Line 84 | Line 101 | public class LinkedBlockingQueue<E> exte
101  
102      /**
103       * Create a node and link it and end of queue
104 +     * @param x the item
105       */
106      private void insert(E x) {
107          last = last.next = new Node<E>(x);
# Line 91 | Line 109 | public class LinkedBlockingQueue<E> exte
109  
110      /**
111       * Remove a node from head of queue,
112 +     * @return the node
113       */
114      private E extract() {
115          Node<E> first = head.next;
# Line 126 | Line 145 | public class LinkedBlockingQueue<E> exte
145  
146      /**
147       * Creates a LinkedBlockingQueue with the given capacity constraint.
148 +     * @param capacity the maminum number of elements to hold without blocking.
149       */
150      public LinkedBlockingQueue(int capacity) {
151 <        if (capacity <= 0) throw new IllegalArgumentException();
151 >        if (capacity <= 0) throw new NullPointerException();
152          this.capacity = capacity;
153 +        last = head = new Node<E>(null);
154      }
155  
156      /**
157       * Creates a LinkedBlockingQueue without an intrinsic capacity
158 <     * constraint, initially holding the given elements.
158 >     * constraint, initially holding the given elements, added in
159 >     * traveral order of the collection's iterator.
160 >     * @param initialElements the elements to initially contain
161       */
162      public LinkedBlockingQueue(Collection<E> initialElements) {
163          this(Integer.MAX_VALUE);
# Line 151 | Line 174 | public class LinkedBlockingQueue<E> exte
174      }
175  
176      public void put(E x) throws InterruptedException {
177 <        if (x == null) throw new IllegalArgumentException();
177 >        if (x == null) throw new NullPointerException();
178          // Note: convention in all put/take/etc is to preset
179          // local var holding count  negative to indicate failure unless set.
180          int c = -1;
# Line 176 | Line 199 | public class LinkedBlockingQueue<E> exte
199              }
200              insert(x);
201              c = count.getAndIncrement();
202 <            if (c+1 < capacity)
202 >            if (c + 1 < capacity)
203                  notFull.signal();
204          }
205          finally {
# Line 187 | Line 210 | public class LinkedBlockingQueue<E> exte
210      }
211  
212      public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
213 <        if (x == null) throw new IllegalArgumentException();
213 >        if (x == null) throw new NullPointerException();
214          putLock.lockInterruptibly();
215          long nanos = unit.toNanos(timeout);
216          int c = -1;
# Line 196 | Line 219 | public class LinkedBlockingQueue<E> exte
219                  if (count.get() < capacity) {
220                      insert(x);
221                      c = count.getAndIncrement();
222 <                    if (c+1 < capacity)
222 >                    if (c + 1 < capacity)
223                          notFull.signal();
224                      break;
225                  }
# Line 220 | Line 243 | public class LinkedBlockingQueue<E> exte
243      }
244  
245      public boolean offer(E x) {
246 <        if (x == null) throw new IllegalArgumentException();
246 >        if (x == null) throw new NullPointerException();
247          if (count.get() == capacity)
248              return false;
249          putLock.tryLock();
# Line 229 | Line 252 | public class LinkedBlockingQueue<E> exte
252              if (count.get() < capacity) {
253                  insert(x);
254                  c = count.getAndIncrement();
255 <                if (c+1 < capacity)
255 >                if (c + 1 < capacity)
256                      notFull.signal();
257              }
258          }
# Line 270 | Line 293 | public class LinkedBlockingQueue<E> exte
293      }
294  
295      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
273
296          E x = null;
297          int c = -1;
298          takeLock.lockInterruptibly();
# Line 494 | Line 516 | public class LinkedBlockingQueue<E> exte
516       * @serialData The capacity is emitted (int), followed by all of
517       * its elements (each an <tt>Object</tt>) in the proper order,
518       * followed by a null
519 +     * @param s the stream
520       */
521      private void writeObject(java.io.ObjectOutputStream s)
522          throws java.io.IOException {
# Line 518 | Line 541 | public class LinkedBlockingQueue<E> exte
541      /**
542       * Reconstitute the Queue instance from a stream (that is,
543       * deserialize it).
544 +     * @param s the stream
545       */
546      private void readObject(java.io.ObjectInputStream s)
547          throws java.io.IOException, ClassNotFoundException {
548          // Read in capacity, and any hidden stuff
549          s.defaultReadObject();
550  
551 <        // Read in all elements and place in queue
551 >        // Read in all elements and place in queue
552          for (;;) {
553              E item = (E)s.readObject();
554              if (item == null)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines