ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.5 by jsr166, Fri Jul 25 18:11:53 2008 UTC vs.
Revision 1.29 by jsr166, Mon Jul 27 03:21:19 2009 UTC

# Line 5 | Line 5
5   */
6  
7   package jsr166y;
8 +
9   import java.util.concurrent.*;
10 < import java.util.concurrent.locks.*;
11 < import java.util.concurrent.atomic.*;
12 < import java.util.*;
13 < import java.io.*;
10 >
11 > import java.util.AbstractQueue;
12 > import java.util.Collection;
13 > import java.util.Iterator;
14 > import java.util.NoSuchElementException;
15 > import java.util.concurrent.locks.LockSupport;
16 > import java.util.concurrent.atomic.AtomicReference;
17 > import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
18  
19   /**
20   * An unbounded {@linkplain TransferQueue} based on linked nodes.
# Line 19 | Line 24 | import java.io.*;
24   * producer.  The <em>tail</em> of the queue is that element that has
25   * been on the queue the shortest time for some producer.
26   *
27 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
27 > * <p>Beware that, unlike in most collections, the {@code size}
28   * method is <em>NOT</em> a constant-time operation. Because of the
29   * asynchronous nature of these queues, determining the current number
30   * of elements requires a traversal of the elements.
# Line 42 | Line 47 | import java.io.*;
47   * @since 1.7
48   * @author Doug Lea
49   * @param <E> the type of elements held in this collection
45 *
50   */
51   public class LinkedTransferQueue<E> extends AbstractQueue<E>
52      implements TransferQueue<E>, java.io.Serializable {
53      private static final long serialVersionUID = -3223113410248163686L;
54  
55      /*
52     * This is still a work in progress...
53     *
56       * This class extends the approach used in FIFO-mode
57       * SynchronousQueues. See the internal documentation, as well as
58       * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
59       * Lea & Scott
60       * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
61       *
62 <     * The main extension is to provide different Wait modes
63 <     * for the main "xfer" method that puts or takes items.
64 <     * These don't impact the basic dual-queue logic, but instead
65 <     * control whether or how threads block upon insertion
66 <     * of request or data nodes into the dual queue.
62 >     * The main extension is to provide different Wait modes for the
63 >     * main "xfer" method that puts or takes items.  These don't
64 >     * impact the basic dual-queue logic, but instead control whether
65 >     * or how threads block upon insertion of request or data nodes
66 >     * into the dual queue. It also uses slightly different
67 >     * conventions for tracking whether nodes are off-list or
68 >     * cancelled.
69       */
70  
71      // Wait modes for xfer method
# Line 79 | Line 83 | public class LinkedTransferQueue<E> exte
83       * seems not to vary with number of CPUs (beyond 2) so is just
84       * a constant.
85       */
86 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
86 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
87  
88      /**
89       * The number of times to spin before blocking in untimed waits.
# Line 95 | Line 99 | public class LinkedTransferQueue<E> exte
99      static final long spinForTimeoutThreshold = 1000L;
100  
101      /**
102 <     * Node class for LinkedTransferQueue. Opportunistically subclasses from
103 <     * AtomicReference to represent item. Uses Object, not E, to allow
104 <     * setting item to "this" after use, to avoid garbage
105 <     * retention. Similarly, setting the next field to this is used as
106 <     * sentinel that node is off list.
102 >     * Node class for LinkedTransferQueue. Opportunistically
103 >     * subclasses from AtomicReference to represent item. Uses Object,
104 >     * not E, to allow setting item to "this" after use, to avoid
105 >     * garbage retention. Similarly, setting the next field to this is
106 >     * used as sentinel that node is off list.
107       */
108 <    static final class QNode extends AtomicReference<Object> {
109 <        volatile QNode next;
108 >    static final class Node<E> extends AtomicReference<Object> {
109 >        volatile Node<E> next;
110          volatile Thread waiter;       // to control park/unpark
111          final boolean isData;
112 <        QNode(Object item, boolean isData) {
112 >
113 >        Node(E item, boolean isData) {
114              super(item);
115              this.isData = isData;
116          }
117  
118 <        static final AtomicReferenceFieldUpdater<QNode, QNode>
118 >        @SuppressWarnings("rawtypes")
119 >        static final AtomicReferenceFieldUpdater<Node, Node>
120              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
121 <            (QNode.class, QNode.class, "next");
121 >            (Node.class, Node.class, "next");
122  
123 <        boolean casNext(QNode cmp, QNode val) {
123 >        final boolean casNext(Node<E> cmp, Node<E> val) {
124              return nextUpdater.compareAndSet(this, cmp, val);
125          }
126 +
127 +        final void clearNext() {
128 +            nextUpdater.lazySet(this, this);
129 +        }
130 +
131 +        private static final long serialVersionUID = -3375979862319811754L;
132      }
133  
134      /**
# Line 128 | Line 140 | public class LinkedTransferQueue<E> exte
140          // enough padding for 64bytes with 4byte refs
141          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
142          PaddedAtomicReference(T r) { super(r); }
143 +        private static final long serialVersionUID = 8170090609809740854L;
144      }
145  
146  
147 <    private final QNode dummy = new QNode(null, false);
148 <    private final PaddedAtomicReference<QNode> head =
149 <        new PaddedAtomicReference<QNode>(dummy);
150 <    private final PaddedAtomicReference<QNode> tail =
151 <        new PaddedAtomicReference<QNode>(dummy);
147 >    /** head of the queue */
148 >    private transient final PaddedAtomicReference<Node<E>> head;
149 >
150 >    /** tail of the queue */
151 >    private transient final PaddedAtomicReference<Node<E>> tail;
152  
153      /**
154       * Reference to a cancelled node that might not yet have been
155       * unlinked from queue because it was the last inserted node
156       * when it cancelled.
157       */
158 <    private final PaddedAtomicReference<QNode> cleanMe =
146 <        new PaddedAtomicReference<QNode>(null);
158 >    private transient final PaddedAtomicReference<Node<E>> cleanMe;
159  
160      /**
161       * Tries to cas nh as new head; if successful, unlink
162       * old head's next node to avoid garbage retention.
163       */
164 <    private boolean advanceHead(QNode h, QNode nh) {
164 >    private boolean advanceHead(Node<E> h, Node<E> nh) {
165          if (h == head.get() && head.compareAndSet(h, nh)) {
166 <            h.next = h; // forget old next
166 >            h.clearNext(); // forget old next
167              return true;
168          }
169          return false;
# Line 159 | Line 171 | public class LinkedTransferQueue<E> exte
171  
172      /**
173       * Puts or takes an item. Used for most queue operations (except
174 <     * poll() and tryTransfer())
174 >     * poll() and tryTransfer()). See the similar code in
175 >     * SynchronousQueue for detailed explanation.
176 >     *
177       * @param e the item or if null, signifies that this is a take
178       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
179       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
180       * @return an item, or null on failure
181       */
182 <    private Object xfer(Object e, int mode, long nanos) {
182 >    private E xfer(E e, int mode, long nanos) {
183          boolean isData = (e != null);
184 <        QNode s = null;
185 <        final PaddedAtomicReference<QNode> head = this.head;
186 <        final PaddedAtomicReference<QNode> tail = this.tail;
184 >        Node<E> s = null;
185 >        final PaddedAtomicReference<Node<E>> head = this.head;
186 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
187  
188          for (;;) {
189 <            QNode t = tail.get();
190 <            QNode h = head.get();
189 >            Node<E> t = tail.get();
190 >            Node<E> h = head.get();
191  
192              if (t != null && (t == h || t.isData == isData)) {
193                  if (s == null)
194 <                    s = new QNode(e, isData);
195 <                QNode last = t.next;
194 >                    s = new Node<E>(e, isData);
195 >                Node<E> last = t.next;
196                  if (last != null) {
197                      if (t == tail.get())
198                          tail.compareAndSet(t, last);
# Line 190 | Line 204 | public class LinkedTransferQueue<E> exte
204              }
205  
206              else if (h != null) {
207 <                QNode first = h.next;
207 >                Node<E> first = h.next;
208                  if (t == tail.get() && first != null &&
209                      advanceHead(h, first)) {
210                      Object x = first.get();
211                      if (x != first && first.compareAndSet(x, e)) {
212                          LockSupport.unpark(first.waiter);
213 <                        return isData? e : x;
213 >                        return isData ? e : (E) x;
214                      }
215                  }
216              }
# Line 206 | Line 220 | public class LinkedTransferQueue<E> exte
220  
221      /**
222       * Version of xfer for poll() and tryTransfer, which
223 <     * simplifies control paths both here and in xfer
223 >     * simplifies control paths both here and in xfer.
224       */
225 <    private Object fulfill(Object e) {
225 >    private E fulfill(E e) {
226          boolean isData = (e != null);
227 <        final PaddedAtomicReference<QNode> head = this.head;
228 <        final PaddedAtomicReference<QNode> tail = this.tail;
227 >        final PaddedAtomicReference<Node<E>> head = this.head;
228 >        final PaddedAtomicReference<Node<E>> tail = this.tail;
229  
230          for (;;) {
231 <            QNode t = tail.get();
232 <            QNode h = head.get();
231 >            Node<E> t = tail.get();
232 >            Node<E> h = head.get();
233  
234              if (t != null && (t == h || t.isData == isData)) {
235 <                QNode last = t.next;
235 >                Node<E> last = t.next;
236                  if (t == tail.get()) {
237                      if (last != null)
238                          tail.compareAndSet(t, last);
# Line 227 | Line 241 | public class LinkedTransferQueue<E> exte
241                  }
242              }
243              else if (h != null) {
244 <                QNode first = h.next;
244 >                Node<E> first = h.next;
245                  if (t == tail.get() &&
246                      first != null &&
247                      advanceHead(h, first)) {
248                      Object x = first.get();
249                      if (x != first && first.compareAndSet(x, e)) {
250                          LockSupport.unpark(first.waiter);
251 <                        return isData? e : x;
251 >                        return isData ? e : (E) x;
252                      }
253                  }
254              }
# Line 252 | Line 266 | public class LinkedTransferQueue<E> exte
266       * @param nanos timeout value
267       * @return matched item, or s if cancelled
268       */
269 <    private Object awaitFulfill(QNode pred, QNode s, Object e,
270 <                                int mode, long nanos) {
269 >    private E awaitFulfill(Node<E> pred, Node<E> s, E e,
270 >                           int mode, long nanos) {
271          if (mode == NOWAIT)
272              return null;
273  
274 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
274 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
275          Thread w = Thread.currentThread();
276          int spins = -1; // set to desired spin count below
277          for (;;) {
# Line 266 | Line 280 | public class LinkedTransferQueue<E> exte
280              Object x = s.get();
281              if (x != e) {                 // Node was matched or cancelled
282                  advanceHead(pred, s);     // unlink if head
283 <                if (x == s)               // was cancelled
284 <                    return clean(pred, s);
283 >                if (x == s) {             // was cancelled
284 >                    clean(pred, s);
285 >                    return null;
286 >                }
287                  else if (x != null) {
288                      s.set(s);             // avoid garbage retention
289 <                    return x;
289 >                    return (E) x;
290                  }
291                  else
292                      return e;
293              }
278
294              if (mode == TIMEOUT) {
295                  long now = System.nanoTime();
296                  nanos -= now - lastTime;
# Line 286 | Line 301 | public class LinkedTransferQueue<E> exte
301                  }
302              }
303              if (spins < 0) {
304 <                QNode h = head.get(); // only spin if at head
304 >                Node<E> h = head.get(); // only spin if at head
305                  spins = ((h != null && h.next == s) ?
306 <                         (mode == TIMEOUT?
306 >                         ((mode == TIMEOUT) ?
307                            maxTimedSpins : maxUntimedSpins) : 0);
308              }
309              if (spins > 0)
# Line 296 | Line 311 | public class LinkedTransferQueue<E> exte
311              else if (s.waiter == null)
312                  s.waiter = w;
313              else if (mode != TIMEOUT) {
314 <                //                LockSupport.park(this);
300 <                LockSupport.park(); // allows run on java5
314 >                LockSupport.park(this);
315                  s.waiter = null;
316                  spins = -1;
317              }
318              else if (nanos > spinForTimeoutThreshold) {
319 <                //                LockSupport.parkNanos(this, nanos);
306 <                LockSupport.parkNanos(nanos);
319 >                LockSupport.parkNanos(this, nanos);
320                  s.waiter = null;
321                  spins = -1;
322              }
# Line 311 | Line 324 | public class LinkedTransferQueue<E> exte
324      }
325  
326      /**
327 +     * Returns validated tail for use in cleaning methods.
328 +     */
329 +    private Node<E> getValidatedTail() {
330 +        for (;;) {
331 +            Node<E> h = head.get();
332 +            Node<E> first = h.next;
333 +            if (first != null && first.next == first) { // help advance
334 +                advanceHead(h, first);
335 +                continue;
336 +            }
337 +            Node<E> t = tail.get();
338 +            Node<E> last = t.next;
339 +            if (t == tail.get()) {
340 +                if (last != null)
341 +                    tail.compareAndSet(t, last); // help advance
342 +                else
343 +                    return t;
344 +            }
345 +        }
346 +    }
347 +
348 +    /**
349       * Gets rid of cancelled node s with original predecessor pred.
350 <     * @return null (to simplify use by callers)
350 >     *
351 >     * @param pred predecessor of cancelled node
352 >     * @param s the cancelled node
353       */
354 <    private Object clean(QNode pred, QNode s) {
354 >    private void clean(Node<E> pred, Node<E> s) {
355          Thread w = s.waiter;
356          if (w != null) {             // Wake up thread
357              s.waiter = null;
# Line 322 | Line 359 | public class LinkedTransferQueue<E> exte
359                  LockSupport.unpark(w);
360          }
361  
362 <        for (;;) {
363 <            if (pred.next != s) // already cleaned
364 <                return null;
365 <            QNode h = head.get();
366 <            QNode hn = h.next;   // Absorb cancelled first node as head
367 <            if (hn != null && hn.next == hn) {
368 <                advanceHead(h, hn);
369 <                continue;
370 <            }
371 <            QNode t = tail.get();      // Ensure consistent read for tail
372 <            if (t == h)
373 <                return null;
374 <            QNode tn = t.next;
375 <            if (t != tail.get())
376 <                continue;
377 <            if (tn != null) {          // Help advance tail
341 <                tail.compareAndSet(t, tn);
342 <                continue;
343 <            }
344 <            if (s != t) {             // If not tail, try to unsplice
345 <                QNode sn = s.next;
362 >        if (pred == null)
363 >            return;
364 >
365 >        /*
366 >         * At any given time, exactly one node on list cannot be
367 >         * deleted -- the last inserted node. To accommodate this, if
368 >         * we cannot delete s, we save its predecessor as "cleanMe",
369 >         * processing the previously saved version first. At least one
370 >         * of node s or the node previously saved can always be
371 >         * processed, so this always terminates.
372 >         */
373 >        while (pred.next == s) {
374 >            Node<E> oldpred = reclean();  // First, help get rid of cleanMe
375 >            Node<E> t = getValidatedTail();
376 >            if (s != t) {               // If not tail, try to unsplice
377 >                Node<E> sn = s.next;      // s.next == s means s already off list
378                  if (sn == s || pred.casNext(s, sn))
379 <                    return null;
379 >                    break;
380              }
381 <            QNode dp = cleanMe.get();
382 <            if (dp != null) {    // Try unlinking previous cancelled node
383 <                QNode d = dp.next;
384 <                QNode dn;
385 <                if (d == null ||               // d is gone or
386 <                    d == dp ||                 // d is off list or
387 <                    d.get() != d ||            // d not cancelled or
388 <                    (d != t &&                 // d not tail and
389 <                     (dn = d.next) != null &&  //   has successor
390 <                     dn != d &&                //   that is on list
391 <                     dp.casNext(d, dn)))       // d unspliced
392 <                    cleanMe.compareAndSet(dp, null);
393 <                if (dp == pred)
394 <                    return null;      // s is already saved node
381 >            else if (oldpred == pred || // Already saved
382 >                     (oldpred == null && cleanMe.compareAndSet(null, pred)))
383 >                break;                  // Postpone cleaning
384 >        }
385 >    }
386 >
387 >    /**
388 >     * Tries to unsplice the cancelled node held in cleanMe that was
389 >     * previously uncleanable because it was at tail.
390 >     *
391 >     * @return current cleanMe node (or null)
392 >     */
393 >    private Node<E> reclean() {
394 >        /*
395 >         * cleanMe is, or at one time was, predecessor of cancelled
396 >         * node s that was the tail so could not be unspliced.  If s
397 >         * is no longer the tail, try to unsplice if necessary and
398 >         * make cleanMe slot available.  This differs from similar
399 >         * code in clean() because we must check that pred still
400 >         * points to a cancelled node that must be unspliced -- if
401 >         * not, we can (must) clear cleanMe without unsplicing.
402 >         * This can loop only due to contention on casNext or
403 >         * clearing cleanMe.
404 >         */
405 >        Node<E> pred;
406 >        while ((pred = cleanMe.get()) != null) {
407 >            Node<E> t = getValidatedTail();
408 >            Node<E> s = pred.next;
409 >            if (s != t) {
410 >                Node<E> sn;
411 >                if (s == null || s == pred || s.get() != s ||
412 >                    (sn = s.next) == s || pred.casNext(s, sn))
413 >                    cleanMe.compareAndSet(pred, null);
414              }
415 <            else if (cleanMe.compareAndSet(null, pred))
416 <                return null;          // Postpone cleaning s
415 >            else // s is still tail; cannot clean
416 >                break;
417          }
418 +        return pred;
419      }
420  
421      /**
422 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
422 >     * Creates an initially empty {@code LinkedTransferQueue}.
423       */
424      public LinkedTransferQueue() {
425 +        Node<E> dummy = new Node<E>(null, false);
426 +        head = new PaddedAtomicReference<Node<E>>(dummy);
427 +        tail = new PaddedAtomicReference<Node<E>>(dummy);
428 +        cleanMe = new PaddedAtomicReference<Node<E>>(null);
429      }
430  
431      /**
432 <     * Creates a <tt>LinkedTransferQueue</tt>
432 >     * Creates a {@code LinkedTransferQueue}
433       * initially containing the elements of the given collection,
434       * added in traversal order of the collection's iterator.
435 +     *
436       * @param c the collection of elements to initially contain
437       * @throws NullPointerException if the specified collection or any
438       *         of its elements are null
439       */
440      public LinkedTransferQueue(Collection<? extends E> c) {
441 +        this();
442          addAll(c);
443      }
444  
445 +    /**
446 +     * @throws InterruptedException {@inheritDoc}
447 +     * @throws NullPointerException {@inheritDoc}
448 +     */
449      public void put(E e) throws InterruptedException {
450          if (e == null) throw new NullPointerException();
451          if (Thread.interrupted()) throw new InterruptedException();
452          xfer(e, NOWAIT, 0);
453      }
454  
455 +    /**
456 +     * @throws InterruptedException {@inheritDoc}
457 +     * @throws NullPointerException {@inheritDoc}
458 +     */
459      public boolean offer(E e, long timeout, TimeUnit unit)
460          throws InterruptedException {
461          if (e == null) throw new NullPointerException();
# Line 398 | Line 464 | public class LinkedTransferQueue<E> exte
464          return true;
465      }
466  
467 +    /**
468 +     * @throws NullPointerException {@inheritDoc}
469 +     */
470      public boolean offer(E e) {
471          if (e == null) throw new NullPointerException();
472          xfer(e, NOWAIT, 0);
473          return true;
474      }
475  
476 +    /**
477 +     * @throws NullPointerException {@inheritDoc}
478 +     */
479 +    public boolean add(E e) {
480 +        if (e == null) throw new NullPointerException();
481 +        xfer(e, NOWAIT, 0);
482 +        return true;
483 +    }
484 +
485 +    /**
486 +     * @throws InterruptedException {@inheritDoc}
487 +     * @throws NullPointerException {@inheritDoc}
488 +     */
489      public void transfer(E e) throws InterruptedException {
490          if (e == null) throw new NullPointerException();
491          if (xfer(e, WAIT, 0) == null) {
492 <            Thread.interrupted();
492 >            Thread.interrupted();
493              throw new InterruptedException();
494 <        }
494 >        }
495      }
496  
497 +    /**
498 +     * @throws InterruptedException {@inheritDoc}
499 +     * @throws NullPointerException {@inheritDoc}
500 +     */
501      public boolean tryTransfer(E e, long timeout, TimeUnit unit)
502          throws InterruptedException {
503          if (e == null) throw new NullPointerException();
# Line 422 | Line 508 | public class LinkedTransferQueue<E> exte
508          throw new InterruptedException();
509      }
510  
511 +    /**
512 +     * @throws NullPointerException {@inheritDoc}
513 +     */
514      public boolean tryTransfer(E e) {
515          if (e == null) throw new NullPointerException();
516          return fulfill(e) != null;
517      }
518  
519 +    /**
520 +     * @throws InterruptedException {@inheritDoc}
521 +     */
522      public E take() throws InterruptedException {
523          Object e = xfer(null, WAIT, 0);
524          if (e != null)
525 <            return (E)e;
526 <        Thread.interrupted();
525 >            return (E) e;
526 >        Thread.interrupted();
527          throw new InterruptedException();
528      }
529  
530 +    /**
531 +     * @throws InterruptedException {@inheritDoc}
532 +     */
533      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
534          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
535          if (e != null || !Thread.interrupted())
536 <            return (E)e;
536 >            return (E) e;
537          throw new InterruptedException();
538      }
539  
540      public E poll() {
541 <        return (E)fulfill(null);
541 >        return fulfill(null);
542      }
543  
544 +    /**
545 +     * @throws NullPointerException    {@inheritDoc}
546 +     * @throwsIllegalArgumentException {@inheritDoc}
547 +     */
548      public int drainTo(Collection<? super E> c) {
549          if (c == null)
550              throw new NullPointerException();
# Line 460 | Line 559 | public class LinkedTransferQueue<E> exte
559          return n;
560      }
561  
562 +    /**
563 +     * @throws NullPointerException    {@inheritDoc}
564 +     * @throwsIllegalArgumentException {@inheritDoc}
565 +     */
566      public int drainTo(Collection<? super E> c, int maxElements) {
567          if (c == null)
568              throw new NullPointerException();
# Line 477 | Line 580 | public class LinkedTransferQueue<E> exte
580      // Traversal-based methods
581  
582      /**
583 <     * Return head after performing any outstanding helping steps
583 >     * Returns head after performing any outstanding helping steps.
584       */
585 <    private QNode traversalHead() {
585 >    private Node<E> traversalHead() {
586          for (;;) {
587 <            QNode t = tail.get();
588 <            QNode h = head.get();
587 >            Node<E> t = tail.get();
588 >            Node<E> h = head.get();
589              if (h != null && t != null) {
590 <                QNode last = t.next;
591 <                QNode first = h.next;
590 >                Node<E> last = t.next;
591 >                Node<E> first = h.next;
592                  if (t == tail.get()) {
593                      if (last != null)
594                          tail.compareAndSet(t, last);
# Line 500 | Line 603 | public class LinkedTransferQueue<E> exte
603                          return h;
604                  }
605              }
606 +            reclean();
607          }
608      }
609  
# Line 516 | Line 620 | public class LinkedTransferQueue<E> exte
620       * if subsequently removed.
621       */
622      class Itr implements Iterator<E> {
623 <        QNode nextNode;    // Next node to return next
624 <        QNode currentNode; // last returned node, for remove()
625 <        QNode prevNode;    // predecessor of last returned node
626 <        E nextItem;        // Cache of next item, once commited to in next
623 >        Node<E> next;        // node to return next
624 >        Node<E> pnext;       // predecessor of next
625 >        Node<E> snext;       // successor of next
626 >        Node<E> curr;        // last returned node, for remove()
627 >        Node<E> pcurr;       // predecessor of curr, for remove()
628 >        E nextItem;        // Cache of next item, once committed to in next
629  
630          Itr() {
631 <            nextNode = traversalHead();
526 <            advance();
631 >            findNext();
632          }
633  
634 <        E advance() {
635 <            prevNode = currentNode;
636 <            currentNode = nextNode;
637 <            E x = nextItem;
533 <
534 <            QNode p = nextNode.next;
634 >        /**
635 >         * Ensures next points to next valid node, or null if none.
636 >         */
637 >        void findNext() {
638              for (;;) {
639 <                if (p == null || !p.isData) {
640 <                    nextNode = null;
641 <                    nextItem = null;
642 <                    return x;
643 <                }
644 <                Object item = p.get();
645 <                if (item != p && item != null) {
646 <                    nextNode = p;
647 <                    nextItem = (E)item;
648 <                    return x;
639 >                Node<E> pred = pnext;
640 >                Node<E> q = next;
641 >                if (pred == null || pred == q) {
642 >                    pred = traversalHead();
643 >                    q = pred.next;
644 >                }
645 >                if (q == null || !q.isData) {
646 >                    next = null;
647 >                    return;
648 >                }
649 >                Object x = q.get();
650 >                Node<E> s = q.next;
651 >                if (x != null && q != x && q != s) {
652 >                    nextItem = (E) x;
653 >                    snext = s;
654 >                    pnext = pred;
655 >                    next = q;
656 >                    return;
657                  }
658 <                prevNode = p;
659 <                p = p.next;
658 >                pnext = q;
659 >                next = s;
660              }
661          }
662  
663          public boolean hasNext() {
664 <            return nextNode != null;
664 >            return next != null;
665          }
666  
667          public E next() {
668 <            if (nextNode == null) throw new NoSuchElementException();
669 <            return advance();
668 >            if (next == null) throw new NoSuchElementException();
669 >            pcurr = pnext;
670 >            curr = next;
671 >            pnext = next;
672 >            next = snext;
673 >            E x = nextItem;
674 >            findNext();
675 >            return x;
676          }
677  
678          public void remove() {
679 <            QNode p = currentNode;
680 <            QNode prev = prevNode;
564 <            if (prev == null || p == null)
679 >            Node<E> p = curr;
680 >            if (p == null)
681                  throw new IllegalStateException();
682              Object x = p.get();
683              if (x != null && x != p && p.compareAndSet(x, p))
684 <                clean(prev, p);
684 >                clean(pcurr, p);
685          }
686      }
687  
688      public E peek() {
689          for (;;) {
690 <            QNode h = traversalHead();
691 <            QNode p = h.next;
690 >            Node<E> h = traversalHead();
691 >            Node<E> p = h.next;
692              if (p == null)
693                  return null;
694              Object x = p.get();
# Line 580 | Line 696 | public class LinkedTransferQueue<E> exte
696                  if (!p.isData)
697                      return null;
698                  if (x != null)
699 <                    return (E)x;
699 >                    return (E) x;
700              }
701          }
702      }
703  
704      public boolean isEmpty() {
705          for (;;) {
706 <            QNode h = traversalHead();
707 <            QNode p = h.next;
706 >            Node<E> h = traversalHead();
707 >            Node<E> p = h.next;
708              if (p == null)
709                  return true;
710              Object x = p.get();
# Line 603 | Line 719 | public class LinkedTransferQueue<E> exte
719  
720      public boolean hasWaitingConsumer() {
721          for (;;) {
722 <            QNode h = traversalHead();
723 <            QNode p = h.next;
722 >            Node<E> h = traversalHead();
723 >            Node<E> p = h.next;
724              if (p == null)
725                  return false;
726              Object x = p.get();
# Line 615 | Line 731 | public class LinkedTransferQueue<E> exte
731  
732      /**
733       * Returns the number of elements in this queue.  If this queue
734 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
735 <     * <tt>Integer.MAX_VALUE</tt>.
734 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
735 >     * {@code Integer.MAX_VALUE}.
736       *
737       * <p>Beware that, unlike in most collections, this method is
738       * <em>NOT</em> a constant-time operation. Because of the
# Line 627 | Line 743 | public class LinkedTransferQueue<E> exte
743       */
744      public int size() {
745          int count = 0;
746 <        QNode h = traversalHead();
747 <        for (QNode p = h.next; p != null && p.isData; p = p.next) {
746 >        Node<E> h = traversalHead();
747 >        for (Node<E> p = h.next; p != null && p.isData; p = p.next) {
748              Object x = p.get();
749              if (x != null && x != p) {
750                  if (++count == Integer.MAX_VALUE) // saturated
# Line 640 | Line 756 | public class LinkedTransferQueue<E> exte
756  
757      public int getWaitingConsumerCount() {
758          int count = 0;
759 <        QNode h = traversalHead();
760 <        for (QNode p = h.next; p != null && !p.isData; p = p.next) {
759 >        Node<E> h = traversalHead();
760 >        for (Node<E> p = h.next; p != null && !p.isData; p = p.next) {
761              if (p.get() == null) {
762                  if (++count == Integer.MAX_VALUE)
763                      break;
# Line 654 | Line 770 | public class LinkedTransferQueue<E> exte
770          return Integer.MAX_VALUE;
771      }
772  
773 +    public boolean remove(Object o) {
774 +        if (o == null)
775 +            return false;
776 +        for (;;) {
777 +            Node<E> pred = traversalHead();
778 +            for (;;) {
779 +                Node<E> q = pred.next;
780 +                if (q == null || !q.isData)
781 +                    return false;
782 +                if (q == pred) // restart
783 +                    break;
784 +                Object x = q.get();
785 +                if (x != null && x != q && o.equals(x) &&
786 +                    q.compareAndSet(x, q)) {
787 +                    clean(pred, q);
788 +                    return true;
789 +                }
790 +                pred = q;
791 +            }
792 +        }
793 +    }
794 +
795      /**
796       * Save the state to a stream (that is, serialize it).
797       *
798 <     * @serialData All of the elements (each an <tt>E</tt>) in
798 >     * @serialData All of the elements (each an {@code E}) in
799       * the proper order, followed by a null
800       * @param s the stream
801       */
802      private void writeObject(java.io.ObjectOutputStream s)
803          throws java.io.IOException {
804          s.defaultWriteObject();
805 <        for (Iterator<E> it = iterator(); it.hasNext(); )
806 <            s.writeObject(it.next());
805 >        for (E e : this)
806 >            s.writeObject(e);
807          // Use trailing null as sentinel
808          s.writeObject(null);
809      }
# Line 673 | Line 811 | public class LinkedTransferQueue<E> exte
811      /**
812       * Reconstitute the Queue instance from a stream (that is,
813       * deserialize it).
814 +     *
815       * @param s the stream
816       */
817      private void readObject(java.io.ObjectInputStream s)
818          throws java.io.IOException, ClassNotFoundException {
819          s.defaultReadObject();
820 +        resetHeadAndTail();
821          for (;;) {
822 <            E item = (E)s.readObject();
822 >            @SuppressWarnings("unchecked") E item = (E) s.readObject();
823              if (item == null)
824                  break;
825              else
826                  offer(item);
827          }
828      }
829 +
830 +    // Support for resetting head/tail while deserializing
831 +    private void resetHeadAndTail() {
832 +        Node<E> dummy = new Node<E>(null, false);
833 +        UNSAFE.putObjectVolatile(this, headOffset,
834 +                                 new PaddedAtomicReference<Node<E>>(dummy));
835 +        UNSAFE.putObjectVolatile(this, tailOffset,
836 +                                 new PaddedAtomicReference<Node<E>>(dummy));
837 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
838 +                                 new PaddedAtomicReference<Node<E>>(null));
839 +    }
840 +
841 +    // Unsafe mechanics
842 +
843 +    private static final sun.misc.Unsafe UNSAFE = getUnsafe();
844 +    private static final long headOffset =
845 +        objectFieldOffset("head", LinkedTransferQueue.class);
846 +    private static final long tailOffset =
847 +        objectFieldOffset("tail", LinkedTransferQueue.class);
848 +    private static final long cleanMeOffset =
849 +        objectFieldOffset("cleanMe", LinkedTransferQueue.class);
850 +
851 +    private static long objectFieldOffset(String field, Class<?> klazz) {
852 +        try {
853 +            return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
854 +        } catch (NoSuchFieldException e) {
855 +            // Convert Exception to corresponding Error
856 +            NoSuchFieldError error = new NoSuchFieldError(field);
857 +            error.initCause(e);
858 +            throw error;
859 +        }
860 +    }
861 +
862 +    /**
863 +     * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
864 +     * Replace with a simple call to Unsafe.getUnsafe when integrating
865 +     * into a jdk.
866 +     *
867 +     * @return a sun.misc.Unsafe
868 +     */
869 +    private static sun.misc.Unsafe getUnsafe() {
870 +        try {
871 +            return sun.misc.Unsafe.getUnsafe();
872 +        } catch (SecurityException se) {
873 +            try {
874 +                return java.security.AccessController.doPrivileged
875 +                    (new java.security
876 +                     .PrivilegedExceptionAction<sun.misc.Unsafe>() {
877 +                        public sun.misc.Unsafe run() throws Exception {
878 +                            java.lang.reflect.Field f = sun.misc
879 +                                .Unsafe.class.getDeclaredField("theUnsafe");
880 +                            f.setAccessible(true);
881 +                            return (sun.misc.Unsafe) f.get(null);
882 +                        }});
883 +            } catch (java.security.PrivilegedActionException e) {
884 +                throw new RuntimeException("Could not initialize intrinsics",
885 +                                           e.getCause());
886 +            }
887 +        }
888 +    }
889   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines