ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.3 by dl, Mon Aug 27 19:48:36 2007 UTC vs.
Revision 1.25 by jsr166, Fri Jul 24 23:48:26 2009 UTC

# Line 19 | Line 19 | import java.io.*;
19   * producer.  The <em>tail</em> of the queue is that element that has
20   * been on the queue the shortest time for some producer.
21   *
22 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
22 > * <p>Beware that, unlike in most collections, the {@code size}
23   * method is <em>NOT</em> a constant-time operation. Because of the
24   * asynchronous nature of these queues, determining the current number
25   * of elements requires a traversal of the elements.
# Line 42 | Line 42 | import java.io.*;
42   * @since 1.7
43   * @author Doug Lea
44   * @param <E> the type of elements held in this collection
45 *
45   */
46   public class LinkedTransferQueue<E> extends AbstractQueue<E>
47      implements TransferQueue<E>, java.io.Serializable {
48      private static final long serialVersionUID = -3223113410248163686L;
49  
50      /*
52     * This is still a work in prgress...
53     *
51       * This class extends the approach used in FIFO-mode
52       * SynchronousQueues. See the internal documentation, as well as
53       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
54       * Lea & Scott
55       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
56       *
57 <     * The main extension is to provide different Wait modes
58 <     * for the main "xfer" method that puts or takes items.
59 <     * These don't impact the basic dual-queue logic, but instead
60 <     * control whether or how threads block upon insertion
61 <     * of request or data nodes into the dual queue.
57 >     * The main extension is to provide different Wait modes for the
58 >     * main "xfer" method that puts or takes items.  These don't
59 >     * impact the basic dual-queue logic, but instead control whether
60 >     * or how threads block upon insertion of request or data nodes
61 >     * into the dual queue. It also uses slightly different
62 >     * conventions for tracking whether nodes are off-list or
63 >     * cancelled.
64       */
65  
66      // Wait modes for xfer method
# Line 79 | Line 78 | public class LinkedTransferQueue<E> exte
78       * seems not to vary with number of CPUs (beyond 2) so is just
79       * a constant.
80       */
81 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
81 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
82  
83      /**
84       * The number of times to spin before blocking in untimed waits.
# Line 94 | Line 93 | public class LinkedTransferQueue<E> exte
93       */
94      static final long spinForTimeoutThreshold = 1000L;
95  
96 <    /**
97 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
98 <     * AtomicReference to represent item. Uses Object, not E, to allow
99 <     * setting item to "this" after use, to avoid garbage
100 <     * retention. Similarly, setting the next field to this is used as
101 <     * sentinel that node is off list.
96 >    /**
97 >     * Node class for LinkedTransferQueue. Opportunistically
98 >     * subclasses from AtomicReference to represent item. Uses Object,
99 >     * not E, to allow setting item to "this" after use, to avoid
100 >     * garbage retention. Similarly, setting the next field to this is
101 >     * used as sentinel that node is off list.
102       */
103 <    static final class QNode extends AtomicReference<Object> {
104 <        volatile QNode next;
103 >    static final class Node<E> extends AtomicReference<Object> {
104 >        volatile Node<E> next;
105          volatile Thread waiter;       // to control park/unpark
106          final boolean isData;
107 <        QNode(Object item, boolean isData) {
107 >
108 >        Node(E item, boolean isData) {
109              super(item);
110              this.isData = isData;
111          }
112  
113 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
113 >        @SuppressWarnings("rawtypes")
114 >        static final AtomicReferenceFieldUpdater<Node, Node>
115              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
116 <            (QNode.class, QNode.class, "next");
116 >            (Node.class, Node.class, "next");
117  
118 <        boolean casNext(QNode cmp, QNode val) {
118 >        final boolean casNext(Node<E> cmp, Node<E> val) {
119              return nextUpdater.compareAndSet(this, cmp, val);
120          }
121 +
122 +        final void clearNext() {
123 +            nextUpdater.lazySet(this, this);
124 +        }
125 +
126 +        private static final long serialVersionUID = -3375979862319811754L;
127      }
128  
129      /**
# Line 128 | Line 135 | public class LinkedTransferQueue<E> exte
135          // enough padding for 64bytes with 4byte refs
136          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
137          PaddedAtomicReference(T r) { super(r); }
138 +        private static final long serialVersionUID = 8170090609809740854L;
139      }
140  
141  
142 <    private final QNode dummy = new QNode(null, false);
143 <    private final PaddedAtomicReference<QNode> head =
144 <        new PaddedAtomicReference<QNode>(dummy);
145 <    private final PaddedAtomicReference<QNode> tail =
146 <        new PaddedAtomicReference<QNode>(dummy);
142 >    /** head of the queue */
143 >    private transient final PaddedAtomicReference<Node<E>> head;
144 >
145 >    /** tail of the queue */
146 >    private transient final PaddedAtomicReference<Node<E>> tail;
147  
148      /**
149       * Reference to a cancelled node that might not yet have been
150       * unlinked from queue because it was the last inserted node
151       * when it cancelled.
152       */
153 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
153 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
154  
155      /**
156       * Tries to cas nh as new head; if successful, unlink
157       * old head's next node to avoid garbage retention.
158       */
159 <    private boolean advanceHead(QNode h, QNode nh) {
159 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
160          if (h == head.get() && head.compareAndSet(h, nh)) {
161 <            h.next = h; // forget old next
161 >            h.clearNext(); // forget old next
162              return true;
163          }
164          return false;
165      }
166 <    
166 >
167      /**
168       * Puts or takes an item. Used for most queue operations (except
169 <     * poll() and tryTransfer())
170 <     * @param e the item or if null, signfies that this is a take
169 >     * poll() and tryTransfer()). See the similar code in
170 >     * SynchronousQueue for detailed explanation.
171 >     *
172 >     * @param e the item or if null, signifies that this is a take
173       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
174       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
175       * @return an item, or null on failure
176       */
177 <    private Object xfer(Object e, int mode, long nanos) {
177 >    private E xfer(E e, int mode, long nanos) {
178          boolean isData = (e != null);
179 <        QNode s = null;
180 <        final PaddedAtomicReference<QNode> head = this.head;
181 <        final PaddedAtomicReference<QNode> tail = this.tail;
179 >        Node<E> s = null;
180 >        final PaddedAtomicReference<Node<E>> head = this.head;
181 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
182  
183          for (;;) {
184 <            QNode t = tail.get();
185 <            QNode h = head.get();
184 >            Node<E> t = tail.get();
185 >            Node<E> h = head.get();
186  
187              if (t != null && (t == h || t.isData == isData)) {
188                  if (s == null)
189 <                    s = new QNode(e, isData);
190 <                QNode last = t.next;
189 >                    s = new Node<E>(e, isData);
190 >                Node<E> last = t.next;
191                  if (last != null) {
192                      if (t == tail.get())
193                          tail.compareAndSet(t, last);
# Line 188 | Line 197 | public class LinkedTransferQueue<E> exte
197                      return awaitFulfill(t, s, e, mode, nanos);
198                  }
199              }
200 <            
200 >
201              else if (h != null) {
202 <                QNode first = h.next;
203 <                if (t == tail.get() && first != null &&
202 >                Node<E> first = h.next;
203 >                if (t == tail.get() && first != null &&
204                      advanceHead(h, first)) {
205                      Object x = first.get();
206                      if (x != first && first.compareAndSet(x, e)) {
207                          LockSupport.unpark(first.waiter);
208 <                        return isData? e : x;
208 >                        return isData ? e : (E) x;
209                      }
210                  }
211              }
# Line 206 | Line 215 | public class LinkedTransferQueue<E> exte
215  
216      /**
217       * Version of xfer for poll() and tryTransfer, which
218 <     * simpifies control paths both here and in xfer
218 >     * simplifies control paths both here and in xfer.
219       */
220 <    private Object fulfill(Object e) {
220 >    private E fulfill(E e) {
221          boolean isData = (e != null);
222 <        final PaddedAtomicReference<QNode> head = this.head;
223 <        final PaddedAtomicReference<QNode> tail = this.tail;
222 >        final PaddedAtomicReference<Node<E>> head = this.head;
223 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
224  
225          for (;;) {
226 <            QNode t = tail.get();
227 <            QNode h = head.get();
226 >            Node<E> t = tail.get();
227 >            Node<E> h = head.get();
228  
229              if (t != null && (t == h || t.isData == isData)) {
230 <                QNode last = t.next;
230 >                Node<E> last = t.next;
231                  if (t == tail.get()) {
232                      if (last != null)
233                          tail.compareAndSet(t, last);
# Line 227 | Line 236 | public class LinkedTransferQueue<E> exte
236                  }
237              }
238              else if (h != null) {
239 <                QNode first = h.next;
240 <                if (t == tail.get() &&
239 >                Node<E> first = h.next;
240 >                if (t == tail.get() &&
241                      first != null &&
242                      advanceHead(h, first)) {
243                      Object x = first.get();
244                      if (x != first && first.compareAndSet(x, e)) {
245                          LockSupport.unpark(first.waiter);
246 <                        return isData? e : x;
246 >                        return isData ? e : (E) x;
247                      }
248                  }
249              }
# Line 252 | Line 261 | public class LinkedTransferQueue<E> exte
261       * @param nanos timeout value
262       * @return matched item, or s if cancelled
263       */
264 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
265 <                                int mode, long nanos) {
264 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
265 >                           int mode, long nanos) {
266          if (mode == NOWAIT)
267              return null;
268  
269 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
269 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
270          Thread w = Thread.currentThread();
271          int spins = -1; // set to desired spin count below
272          for (;;) {
# Line 266 | Line 275 | public class LinkedTransferQueue<E> exte
275              Object x = s.get();
276              if (x != e) {                 // Node was matched or cancelled
277                  advanceHead(pred, s);     // unlink if head
278 <                if (x == s)               // was cancelled
279 <                    return clean(pred, s);
280 <                else if (x != null) {    
278 >                if (x == s) {             // was cancelled
279 >                    clean(pred, s);
280 >                    return null;
281 >                }
282 >                else if (x != null) {
283                      s.set(s);             // avoid garbage retention
284 <                    return x;
284 >                    return (E) x;
285                  }
286                  else
287                      return e;
288              }
278
289              if (mode == TIMEOUT) {
290                  long now = System.nanoTime();
291                  nanos -= now - lastTime;
# Line 286 | Line 296 | public class LinkedTransferQueue<E> exte
296                  }
297              }
298              if (spins < 0) {
299 <                QNode h = head.get(); // only spin if at head
299 >                Node<E> h = head.get(); // only spin if at head
300                  spins = ((h != null && h.next == s) ?
301 <                         (mode == TIMEOUT?
301 >                         ((mode == TIMEOUT) ?
302                            maxTimedSpins : maxUntimedSpins) : 0);
303              }
304              if (spins > 0)
# Line 296 | Line 306 | public class LinkedTransferQueue<E> exte
306              else if (s.waiter == null)
307                  s.waiter = w;
308              else if (mode != TIMEOUT) {
309 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
309 >                LockSupport.park(this);
310                  s.waiter = null;
311                  spins = -1;
312              }
313              else if (nanos > spinForTimeoutThreshold) {
314 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
314 >                LockSupport.parkNanos(this, nanos);
315                  s.waiter = null;
316                  spins = -1;
317              }
# Line 311 | Line 319 | public class LinkedTransferQueue<E> exte
319      }
320  
321      /**
322 +     * Returns validated tail for use in cleaning methods.
323 +     */
324 +    private Node<E> getValidatedTail() {
325 +        for (;;) {
326 +            Node<E> h = head.get();
327 +            Node<E> first = h.next;
328 +            if (first != null && first.next == first) { // help advance
329 +                advanceHead(h, first);
330 +                continue;
331 +            }
332 +            Node<E> t = tail.get();
333 +            Node<E> last = t.next;
334 +            if (t == tail.get()) {
335 +                if (last != null)
336 +                    tail.compareAndSet(t, last); // help advance
337 +                else
338 +                    return t;
339 +            }
340 +        }
341 +    }
342 +
343 +    /**
344       * Gets rid of cancelled node s with original predecessor pred.
345 <     * @return null (to simplify use by callers)
345 >     *
346 >     * @param pred predecessor of cancelled node
347 >     * @param s the cancelled node
348       */
349 <    private Object clean(QNode pred, QNode s) {
349 >    private void clean(Node<E> pred, Node<E> s) {
350          Thread w = s.waiter;
351          if (w != null) {             // Wake up thread
352              s.waiter = null;
353              if (w != Thread.currentThread())
354                  LockSupport.unpark(w);
355          }
356 <        
357 <        for (;;) {
358 <            if (pred.next != s) // already cleaned
359 <                return null;
360 <            QNode h = head.get();
361 <            QNode hn = h.next;   // Absorb cancelled first node as head
362 <            if (hn != null && hn.next == hn) {
363 <                advanceHead(h, hn);
364 <                continue;
365 <            }
366 <            QNode t = tail.get();      // Ensure consistent read for tail
367 <            if (t == h)
368 <                return null;
369 <            QNode tn = t.next;
370 <            if (t != tail.get())
371 <                continue;
372 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
356 >
357 >        if (pred == null)
358 >            return;
359 >
360 >        /*
361 >         * At any given time, exactly one node on list cannot be
362 >         * deleted -- the last inserted node. To accommodate this, if
363 >         * we cannot delete s, we save its predecessor as "cleanMe",
364 >         * processing the previously saved version first. At least one
365 >         * of node s or the node previously saved can always be
366 >         * processed, so this always terminates.
367 >         */
368 >        while (pred.next == s) {
369 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
370 >            Node<E> t = getValidatedTail();
371 >            if (s != t) {               // If not tail, try to unsplice
372 >                Node<E> sn = s.next;      // s.next == s means s already off list
373                  if (sn == s || pred.casNext(s, sn))
374 <                    return null;
374 >                    break;
375 >            }
376 >            else if (oldpred == pred || // Already saved
377 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
378 >                break;                  // Postpone cleaning
379 >        }
380 >    }
381 >
382 >    /**
383 >     * Tries to unsplice the cancelled node held in cleanMe that was
384 >     * previously uncleanable because it was at tail.
385 >     *
386 >     * @return current cleanMe node (or null)
387 >     */
388 >    private Node<E> reclean() {
389 >        /*
390 >         * cleanMe is, or at one time was, predecessor of cancelled
391 >         * node s that was the tail so could not be unspliced.  If s
392 >         * is no longer the tail, try to unsplice if necessary and
393 >         * make cleanMe slot available.  This differs from similar
394 >         * code in clean() because we must check that pred still
395 >         * points to a cancelled node that must be unspliced -- if
396 >         * not, we can (must) clear cleanMe without unsplicing.
397 >         * This can loop only due to contention on casNext or
398 >         * clearing cleanMe.
399 >         */
400 >        Node<E> pred;
401 >        while ((pred = cleanMe.get()) != null) {
402 >            Node<E> t = getValidatedTail();
403 >            Node<E> s = pred.next;
404 >            if (s != t) {
405 >                Node<E> sn;
406 >                if (s == null || s == pred || s.get() != s ||
407 >                    (sn = s.next) == s || pred.casNext(s, sn))
408 >                    cleanMe.compareAndSet(pred, null);
409              }
410 <            QNode dp = cleanMe.get();
411 <            if (dp != null) {    // Try unlinking previous cancelled node
351 <                QNode d = dp.next;
352 <                QNode dn;
353 <                if (d == null ||               // d is gone or
354 <                    d == dp ||                 // d is off list or
355 <                    d.get() != d ||            // d not cancelled or
356 <                    (d != t &&                 // d not tail and
357 <                     (dn = d.next) != null &&  //   has successor
358 <                     dn != d &&                //   that is on list
359 <                     dp.casNext(d, dn)))       // d unspliced
360 <                    cleanMe.compareAndSet(dp, null);
361 <                if (dp == pred)
362 <                    return null;      // s is already saved node
363 <            }
364 <            else if (cleanMe.compareAndSet(null, pred))
365 <                return null;          // Postpone cleaning s
410 >            else // s is still tail; cannot clean
411 >                break;
412          }
413 +        return pred;
414      }
415 <    
415 >
416      /**
417 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
417 >     * Creates an initially empty {@code LinkedTransferQueue}.
418       */
419      public LinkedTransferQueue() {
420 +        Node<E> dummy = new Node<E>(null, false);
421 +        head = new PaddedAtomicReference<Node<E>>(dummy);
422 +        tail = new PaddedAtomicReference<Node<E>>(dummy);
423 +        cleanMe = new PaddedAtomicReference<Node<E>>(null);
424      }
425  
426      /**
427 <     * Creates a <tt>LinkedTransferQueue</tt>
427 >     * Creates a {@code LinkedTransferQueue}
428       * initially containing the elements of the given collection,
429       * added in traversal order of the collection's iterator.
430 +     *
431       * @param c the collection of elements to initially contain
432       * @throws NullPointerException if the specified collection or any
433       *         of its elements are null
434       */
435      public LinkedTransferQueue(Collection<? extends E> c) {
436 +        this();
437          addAll(c);
438      }
439  
# Line 390 | Line 443 | public class LinkedTransferQueue<E> exte
443          xfer(e, NOWAIT, 0);
444      }
445  
446 <    public boolean offer(E e, long timeout, TimeUnit unit)  
446 >    public boolean offer(E e, long timeout, TimeUnit unit)
447          throws InterruptedException {
448          if (e == null) throw new NullPointerException();
449          if (Thread.interrupted()) throw new InterruptedException();
# Line 404 | Line 457 | public class LinkedTransferQueue<E> exte
457          return true;
458      }
459  
460 +    public boolean add(E e) {
461 +        if (e == null) throw new NullPointerException();
462 +        xfer(e, NOWAIT, 0);
463 +        return true;
464 +    }
465 +
466      public void transfer(E e) throws InterruptedException {
467          if (e == null) throw new NullPointerException();
468          if (xfer(e, WAIT, 0) == null) {
469 <            Thread.interrupted();
469 >            Thread.interrupted();
470              throw new InterruptedException();
471 <        }
471 >        }
472      }
473  
474      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
# Line 430 | Line 489 | public class LinkedTransferQueue<E> exte
489      public E take() throws InterruptedException {
490          Object e = xfer(null, WAIT, 0);
491          if (e != null)
492 <            return (E)e;
493 <        Thread.interrupted();
492 >            return (E) e;
493 >        Thread.interrupted();
494          throw new InterruptedException();
495      }
496  
497      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
498          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
499          if (e != null || !Thread.interrupted())
500 <            return (E)e;
500 >            return (E) e;
501          throw new InterruptedException();
502      }
503  
504      public E poll() {
505 <        return (E)fulfill(null);
505 >        return fulfill(null);
506      }
507  
508      public int drainTo(Collection<? super E> c) {
# Line 477 | Line 536 | public class LinkedTransferQueue<E> exte
536      // Traversal-based methods
537  
538      /**
539 <     * Return head after performing any outstanding helping steps
539 >     * Returns head after performing any outstanding helping steps.
540       */
541 <    private QNode traversalHead() {
541 >    private Node<E> traversalHead() {
542          for (;;) {
543 <            QNode t = tail.get();
544 <            QNode h = head.get();
543 >            Node<E> t = tail.get();
544 >            Node<E> h = head.get();
545              if (h != null && t != null) {
546 <                QNode last = t.next;
547 <                QNode first = h.next;
546 >                Node<E> last = t.next;
547 >                Node<E> first = h.next;
548                  if (t == tail.get()) {
549 <                    if (last != null)
549 >                    if (last != null)
550                          tail.compareAndSet(t, last);
551                      else if (first != null) {
552                          Object x = first.get();
553 <                        if (x == first)
554 <                            advanceHead(h, first);    
553 >                        if (x == first)
554 >                            advanceHead(h, first);
555                          else
556                              return h;
557                      }
# Line 500 | Line 559 | public class LinkedTransferQueue<E> exte
559                          return h;
560                  }
561              }
562 +            reclean();
563          }
564      }
565  
# Line 509 | Line 569 | public class LinkedTransferQueue<E> exte
569      }
570  
571      /**
572 <     * Iterators. Basic strategy os to travers list, treating
572 >     * Iterators. Basic strategy is to traverse list, treating
573       * non-data (i.e., request) nodes as terminating list.
574       * Once a valid data node is found, the item is cached
575       * so that the next call to next() will return it even
576       * if subsequently removed.
577       */
578      class Itr implements Iterator<E> {
579 <        QNode nextNode;    // Next node to return next
580 <        QNode currentNode; // last returned node, for remove()
581 <        QNode prevNode;    // predecessor of last returned node
582 <        E nextItem;        // Cache of next item, once commited to in next
583 <        
579 >        Node<E> next;        // node to return next
580 >        Node<E> pnext;       // predecessor of next
581 >        Node<E> snext;       // successor of next
582 >        Node<E> curr;        // last returned node, for remove()
583 >        Node<E> pcurr;       // predecessor of curr, for remove()
584 >        E nextItem;        // Cache of next item, once committed to in next
585 >
586          Itr() {
587 <            nextNode = traversalHead();
526 <            advance();
587 >            findNext();
588          }
589 <        
590 <        E advance() {
591 <            prevNode = currentNode;
592 <            currentNode = nextNode;
593 <            E x = nextItem;
533 <            
534 <            QNode p = nextNode.next;
589 >
590 >        /**
591 >         * Ensures next points to next valid node, or null if none.
592 >         */
593 >        void findNext() {
594              for (;;) {
595 <                if (p == null || !p.isData) {
596 <                    nextNode = null;
597 <                    nextItem = null;
598 <                    return x;
599 <                }
600 <                Object item = p.get();
601 <                if (item != p && item != null) {
602 <                    nextNode = p;
603 <                    nextItem = (E)item;
604 <                    return x;
605 <                }
606 <                prevNode = p;
607 <                p = p.next;
595 >                Node<E> pred = pnext;
596 >                Node<E> q = next;
597 >                if (pred == null || pred == q) {
598 >                    pred = traversalHead();
599 >                    q = pred.next;
600 >                }
601 >                if (q == null || !q.isData) {
602 >                    next = null;
603 >                    return;
604 >                }
605 >                Object x = q.get();
606 >                Node<E> s = q.next;
607 >                if (x != null && q != x && q != s) {
608 >                    nextItem = (E) x;
609 >                    snext = s;
610 >                    pnext = pred;
611 >                    next = q;
612 >                    return;
613 >                }
614 >                pnext = q;
615 >                next = s;
616              }
617          }
618 <        
618 >
619          public boolean hasNext() {
620 <            return nextNode != null;
620 >            return next != null;
621          }
622 <        
622 >
623          public E next() {
624 <            if (nextNode == null) throw new NoSuchElementException();
625 <            return advance();
624 >            if (next == null) throw new NoSuchElementException();
625 >            pcurr = pnext;
626 >            curr = next;
627 >            pnext = next;
628 >            next = snext;
629 >            E x = nextItem;
630 >            findNext();
631 >            return x;
632          }
633 <        
633 >
634          public void remove() {
635 <            QNode p = currentNode;
636 <            QNode prev = prevNode;
564 <            if (prev == null || p == null)
635 >            Node<E> p = curr;
636 >            if (p == null)
637                  throw new IllegalStateException();
638              Object x = p.get();
639              if (x != null && x != p && p.compareAndSet(x, p))
640 <                clean(prev, p);
640 >                clean(pcurr, p);
641          }
642      }
643  
644      public E peek() {
645          for (;;) {
646 <            QNode h = traversalHead();
647 <            QNode p = h.next;
646 >            Node<E> h = traversalHead();
647 >            Node<E> p = h.next;
648              if (p == null)
649                  return null;
650              Object x = p.get();
# Line 580 | Line 652 | public class LinkedTransferQueue<E> exte
652                  if (!p.isData)
653                      return null;
654                  if (x != null)
655 <                    return (E)x;
655 >                    return (E) x;
656              }
657          }
658      }
659  
660      public boolean isEmpty() {
661          for (;;) {
662 <            QNode h = traversalHead();
663 <            QNode p = h.next;
662 >            Node<E> h = traversalHead();
663 >            Node<E> p = h.next;
664              if (p == null)
665                  return true;
666              Object x = p.get();
# Line 603 | Line 675 | public class LinkedTransferQueue<E> exte
675  
676      public boolean hasWaitingConsumer() {
677          for (;;) {
678 <            QNode h = traversalHead();
679 <            QNode p = h.next;
678 >            Node<E> h = traversalHead();
679 >            Node<E> p = h.next;
680              if (p == null)
681                  return false;
682              Object x = p.get();
683 <            if (p != x)
683 >            if (p != x)
684                  return !p.isData;
685          }
686      }
687 <    
687 >
688      /**
689       * Returns the number of elements in this queue.  If this queue
690 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
691 <     * <tt>Integer.MAX_VALUE</tt>.
690 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
691 >     * {@code Integer.MAX_VALUE}.
692       *
693       * <p>Beware that, unlike in most collections, this method is
694       * <em>NOT</em> a constant-time operation. Because of the
# Line 627 | Line 699 | public class LinkedTransferQueue<E> exte
699       */
700      public int size() {
701          int count = 0;
702 <        QNode h = traversalHead();
703 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
702 >        Node<E> h = traversalHead();
703 >        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
704              Object x = p.get();
705 <            if (x != null && x != p) {
705 >            if (x != null && x != p) {
706                  if (++count == Integer.MAX_VALUE) // saturated
707                      break;
708              }
# Line 640 | Line 712 | public class LinkedTransferQueue<E> exte
712  
713      public int getWaitingConsumerCount() {
714          int count = 0;
715 <        QNode h = traversalHead();
716 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
715 >        Node<E> h = traversalHead();
716 >        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
717              if (p.get() == null) {
718                  if (++count == Integer.MAX_VALUE)
719                      break;
# Line 654 | Line 726 | public class LinkedTransferQueue<E> exte
726          return Integer.MAX_VALUE;
727      }
728  
729 +    public boolean remove(Object o) {
730 +        if (o == null)
731 +            return false;
732 +        for (;;) {
733 +            Node<E> pred = traversalHead();
734 +            for (;;) {
735 +                Node<E> q = pred.next;
736 +                if (q == null || !q.isData)
737 +                    return false;
738 +                if (q == pred) // restart
739 +                    break;
740 +                Object x = q.get();
741 +                if (x != null && x != q && o.equals(x) &&
742 +                    q.compareAndSet(x, q)) {
743 +                    clean(pred, q);
744 +                    return true;
745 +                }
746 +                pred = q;
747 +            }
748 +        }
749 +    }
750 +
751      /**
752       * Save the state to a stream (that is, serialize it).
753       *
754 <     * @serialData All of the elements (each an <tt>E</tt>) in
754 >     * @serialData All of the elements (each an {@code E}) in
755       * the proper order, followed by a null
756       * @param s the stream
757       */
758      private void writeObject(java.io.ObjectOutputStream s)
759          throws java.io.IOException {
760          s.defaultWriteObject();
761 <        for (Iterator<E> it = iterator(); it.hasNext(); )
762 <            s.writeObject(it.next());
761 >        for (E e : this)
762 >            s.writeObject(e);
763          // Use trailing null as sentinel
764          s.writeObject(null);
765      }
# Line 673 | Line 767 | public class LinkedTransferQueue<E> exte
767      /**
768       * Reconstitute the Queue instance from a stream (that is,
769       * deserialize it).
770 +     *
771       * @param s the stream
772       */
773      private void readObject(java.io.ObjectInputStream s)
774          throws java.io.IOException, ClassNotFoundException {
775          s.defaultReadObject();
776 +        resetHeadAndTail();
777          for (;;) {
778 <            E item = (E)s.readObject();
778 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
779              if (item == null)
780                  break;
781              else
782                  offer(item);
783          }
784      }
785 +
786 +    // Support for resetting head/tail while deserializing
787 +    private void resetHeadAndTail() {
788 +        Node<E> dummy = new Node<E>(null, false);
789 +        UNSAFE.putObjectVolatile(this, headOffset,
790 +                                 new PaddedAtomicReference<Node<E>>(dummy));
791 +        UNSAFE.putObjectVolatile(this, tailOffset,
792 +                                 new PaddedAtomicReference<Node<E>>(dummy));
793 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
794 +                                 new PaddedAtomicReference<Node<E>>(null));
795 +    }
796 +
797 +    // Unsafe mechanics for jsr166y 3rd party package.
798 +    private static sun.misc.Unsafe getUnsafe() {
799 +        try {
800 +            return sun.misc.Unsafe.getUnsafe();
801 +        } catch (SecurityException se) {
802 +            try {
803 +                return java.security.AccessController.doPrivileged
804 +                    (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
805 +                        public sun.misc.Unsafe run() throws Exception {
806 +                            return getUnsafeByReflection();
807 +                        }});
808 +            } catch (java.security.PrivilegedActionException e) {
809 +                throw new RuntimeException("Could not initialize intrinsics",
810 +                                           e.getCause());
811 +            }
812 +        }
813 +    }
814 +
815 +    private static sun.misc.Unsafe getUnsafeByReflection()
816 +            throws NoSuchFieldException, IllegalAccessException {
817 +        java.lang.reflect.Field f =
818 +            sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
819 +        f.setAccessible(true);
820 +        return (sun.misc.Unsafe) f.get(null);
821 +    }
822 +
823 +    private static long fieldOffset(String fieldName, Class<?> klazz) {
824 +        try {
825 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
826 +        } catch (NoSuchFieldException e) {
827 +            // Convert Exception to Error
828 +            NoSuchFieldError error = new NoSuchFieldError(fieldName);
829 +            error.initCause(e);
830 +            throw error;
831 +        }
832 +    }
833 +
834 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
835 +    static final long headOffset =
836 +        fieldOffset("head", LinkedTransferQueue.class);
837 +    static final long tailOffset =
838 +        fieldOffset("tail", LinkedTransferQueue.class);
839 +    static final long cleanMeOffset =
840 +        fieldOffset("cleanMe", LinkedTransferQueue.class);
841 +
842   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines