ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.14 by jsr166, Thu Mar 19 05:10:42 2009 UTC vs.
Revision 1.23 by jsr166, Thu Jul 23 23:07:57 2009 UTC

# Line 44 | Line 44 | import java.lang.reflect.*;
44   * @since 1.7
45   * @author Doug Lea
46   * @param <E> the type of elements held in this collection
47 *
47   */
48   public class LinkedTransferQueue<E> extends AbstractQueue<E>
49      implements TransferQueue<E>, java.io.Serializable {
# Line 81 | Line 80 | public class LinkedTransferQueue<E> exte
80       * seems not to vary with number of CPUs (beyond 2) so is just
81       * a constant.
82       */
83 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
83 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
84  
85      /**
86       * The number of times to spin before blocking in untimed waits.
# Line 116 | Line 115 | public class LinkedTransferQueue<E> exte
115              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
116              (QNode.class, QNode.class, "next");
117  
118 <        boolean casNext(QNode cmp, QNode val) {
118 >        final boolean casNext(QNode cmp, QNode val) {
119              return nextUpdater.compareAndSet(this, cmp, val);
120          }
121 +
122 +        final void clearNext() {
123 +            nextUpdater.lazySet(this, this);
124 +        }
125 +
126      }
127  
128      /**
# Line 135 | Line 139 | public class LinkedTransferQueue<E> exte
139  
140      /** head of the queue */
141      private transient final PaddedAtomicReference<QNode> head;
142 +
143      /** tail of the queue */
144      private transient final PaddedAtomicReference<QNode> tail;
145  
# Line 151 | Line 156 | public class LinkedTransferQueue<E> exte
156       */
157      private boolean advanceHead(QNode h, QNode nh) {
158          if (h == head.get() && head.compareAndSet(h, nh)) {
159 <            h.next = h; // forget old next
159 >            h.clearNext(); // forget old next
160              return true;
161          }
162          return false;
# Line 161 | Line 166 | public class LinkedTransferQueue<E> exte
166       * Puts or takes an item. Used for most queue operations (except
167       * poll() and tryTransfer()). See the similar code in
168       * SynchronousQueue for detailed explanation.
169 +     *
170       * @param e the item or if null, signifies that this is a take
171       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
172       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 197 | Line 203 | public class LinkedTransferQueue<E> exte
203                      Object x = first.get();
204                      if (x != first && first.compareAndSet(x, e)) {
205                          LockSupport.unpark(first.waiter);
206 <                        return isData? e : x;
206 >                        return isData ? e : x;
207                      }
208                  }
209              }
# Line 207 | Line 213 | public class LinkedTransferQueue<E> exte
213  
214      /**
215       * Version of xfer for poll() and tryTransfer, which
216 <     * simplifies control paths both here and in xfer
216 >     * simplifies control paths both here and in xfer.
217       */
218      private Object fulfill(Object e) {
219          boolean isData = (e != null);
# Line 235 | Line 241 | public class LinkedTransferQueue<E> exte
241                      Object x = first.get();
242                      if (x != first && first.compareAndSet(x, e)) {
243                          LockSupport.unpark(first.waiter);
244 <                        return isData? e : x;
244 >                        return isData ? e : x;
245                      }
246                  }
247              }
# Line 258 | Line 264 | public class LinkedTransferQueue<E> exte
264          if (mode == NOWAIT)
265              return null;
266  
267 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
267 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
268          Thread w = Thread.currentThread();
269          int spins = -1; // set to desired spin count below
270          for (;;) {
# Line 267 | Line 273 | public class LinkedTransferQueue<E> exte
273              Object x = s.get();
274              if (x != e) {                 // Node was matched or cancelled
275                  advanceHead(pred, s);     // unlink if head
276 <                if (x == s) {              // was cancelled
276 >                if (x == s) {             // was cancelled
277                      clean(pred, s);
278                      return null;
279                  }
# Line 290 | Line 296 | public class LinkedTransferQueue<E> exte
296              if (spins < 0) {
297                  QNode h = head.get(); // only spin if at head
298                  spins = ((h != null && h.next == s) ?
299 <                         (mode == TIMEOUT?
299 >                         ((mode == TIMEOUT) ?
300                            maxTimedSpins : maxUntimedSpins) : 0);
301              }
302              if (spins > 0)
# Line 311 | Line 317 | public class LinkedTransferQueue<E> exte
317      }
318  
319      /**
320 <     * Returns validated tail for use in cleaning methods
320 >     * Returns validated tail for use in cleaning methods.
321       */
322      private QNode getValidatedTail() {
323          for (;;) {
# Line 334 | Line 340 | public class LinkedTransferQueue<E> exte
340  
341      /**
342       * Gets rid of cancelled node s with original predecessor pred.
343 +     *
344       * @param pred predecessor of cancelled node
345       * @param s the cancelled node
346       */
# Line 344 | Line 351 | public class LinkedTransferQueue<E> exte
351              if (w != Thread.currentThread())
352                  LockSupport.unpark(w);
353          }
354 +
355 +        if (pred == null)
356 +            return;
357 +
358          /*
359           * At any given time, exactly one node on list cannot be
360           * deleted -- the last inserted node. To accommodate this, if
# Line 369 | Line 380 | public class LinkedTransferQueue<E> exte
380      /**
381       * Tries to unsplice the cancelled node held in cleanMe that was
382       * previously uncleanable because it was at tail.
383 +     *
384       * @return current cleanMe node (or null)
385       */
386      private QNode reclean() {
# Line 413 | Line 425 | public class LinkedTransferQueue<E> exte
425       * Creates a {@code LinkedTransferQueue}
426       * initially containing the elements of the given collection,
427       * added in traversal order of the collection's iterator.
428 +     *
429       * @param c the collection of elements to initially contain
430       * @throws NullPointerException if the specified collection or any
431       *         of its elements are null
# Line 442 | Line 455 | public class LinkedTransferQueue<E> exte
455          return true;
456      }
457  
458 +    public boolean add(E e) {
459 +        if (e == null) throw new NullPointerException();
460 +        xfer(e, NOWAIT, 0);
461 +        return true;
462 +    }
463 +
464      public void transfer(E e) throws InterruptedException {
465          if (e == null) throw new NullPointerException();
466          if (xfer(e, WAIT, 0) == null) {
# Line 468 | Line 487 | public class LinkedTransferQueue<E> exte
487      public E take() throws InterruptedException {
488          Object e = xfer(null, WAIT, 0);
489          if (e != null)
490 <            return (E)e;
490 >            return (E) e;
491          Thread.interrupted();
492          throw new InterruptedException();
493      }
# Line 476 | Line 495 | public class LinkedTransferQueue<E> exte
495      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
496          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
497          if (e != null || !Thread.interrupted())
498 <            return (E)e;
498 >            return (E) e;
499          throw new InterruptedException();
500      }
501  
502      public E poll() {
503 <        return (E)fulfill(null);
503 >        return (E) fulfill(null);
504      }
505  
506      public int drainTo(Collection<? super E> c) {
# Line 515 | Line 534 | public class LinkedTransferQueue<E> exte
534      // Traversal-based methods
535  
536      /**
537 <     * Return head after performing any outstanding helping steps
537 >     * Returns head after performing any outstanding helping steps.
538       */
539      private QNode traversalHead() {
540          for (;;) {
# Line 538 | Line 557 | public class LinkedTransferQueue<E> exte
557                          return h;
558                  }
559              }
560 +            reclean();
561          }
562      }
563  
# Line 554 | Line 574 | public class LinkedTransferQueue<E> exte
574       * if subsequently removed.
575       */
576      class Itr implements Iterator<E> {
577 <        QNode nextNode;    // Next node to return next
578 <        QNode currentNode; // last returned node, for remove()
579 <        QNode prevNode;    // predecessor of last returned node
580 <        E nextItem;        // Cache of next item, once commited to in next
577 >        QNode next;        // node to return next
578 >        QNode pnext;       // predecessor of next
579 >        QNode snext;       // successor of next
580 >        QNode curr;        // last returned node, for remove()
581 >        QNode pcurr;       // predecessor of curr, for remove()
582 >        E nextItem;        // Cache of next item, once committed to in next
583  
584          Itr() {
585 <            nextNode = traversalHead();
564 <            advance();
585 >            findNext();
586          }
587  
588 <        E advance() {
589 <            prevNode = currentNode;
590 <            currentNode = nextNode;
591 <            E x = nextItem;
571 <
572 <            QNode p = nextNode.next;
588 >        /**
589 >         * Ensures next points to next valid node, or null if none.
590 >         */
591 >        void findNext() {
592              for (;;) {
593 <                if (p == null || !p.isData) {
594 <                    nextNode = null;
595 <                    nextItem = null;
596 <                    return x;
597 <                }
598 <                Object item = p.get();
599 <                if (item != p && item != null) {
600 <                    nextNode = p;
601 <                    nextItem = (E)item;
602 <                    return x;
593 >                QNode pred = pnext;
594 >                QNode q = next;
595 >                if (pred == null || pred == q) {
596 >                    pred = traversalHead();
597 >                    q = pred.next;
598 >                }
599 >                if (q == null || !q.isData) {
600 >                    next = null;
601 >                    return;
602 >                }
603 >                Object x = q.get();
604 >                QNode s = q.next;
605 >                if (x != null && q != x && q != s) {
606 >                    nextItem = (E) x;
607 >                    snext = s;
608 >                    pnext = pred;
609 >                    next = q;
610 >                    return;
611                  }
612 <                prevNode = p;
613 <                p = p.next;
612 >                pnext = q;
613 >                next = s;
614              }
615          }
616  
617          public boolean hasNext() {
618 <            return nextNode != null;
618 >            return next != null;
619          }
620  
621          public E next() {
622 <            if (nextNode == null) throw new NoSuchElementException();
623 <            return advance();
622 >            if (next == null) throw new NoSuchElementException();
623 >            pcurr = pnext;
624 >            curr = next;
625 >            pnext = next;
626 >            next = snext;
627 >            E x = nextItem;
628 >            findNext();
629 >            return x;
630          }
631  
632          public void remove() {
633 <            QNode p = currentNode;
634 <            QNode prev = prevNode;
602 <            if (prev == null || p == null)
633 >            QNode p = curr;
634 >            if (p == null)
635                  throw new IllegalStateException();
636              Object x = p.get();
637              if (x != null && x != p && p.compareAndSet(x, p))
638 <                clean(prev, p);
638 >                clean(pcurr, p);
639          }
640      }
641  
# Line 618 | Line 650 | public class LinkedTransferQueue<E> exte
650                  if (!p.isData)
651                      return null;
652                  if (x != null)
653 <                    return (E)x;
653 >                    return (E) x;
654              }
655          }
656      }
# Line 692 | Line 724 | public class LinkedTransferQueue<E> exte
724          return Integer.MAX_VALUE;
725      }
726  
727 +    public boolean remove(Object o) {
728 +        if (o == null)
729 +            return false;
730 +        for (;;) {
731 +            QNode pred = traversalHead();
732 +            for (;;) {
733 +                QNode q = pred.next;
734 +                if (q == null || !q.isData)
735 +                    return false;
736 +                if (q == pred) // restart
737 +                    break;
738 +                Object x = q.get();
739 +                if (x != null && x != q && o.equals(x) &&
740 +                    q.compareAndSet(x, q)) {
741 +                    clean(pred, q);
742 +                    return true;
743 +                }
744 +                pred = q;
745 +            }
746 +        }
747 +    }
748 +
749      /**
750       * Save the state to a stream (that is, serialize it).
751       *
# Line 702 | Line 756 | public class LinkedTransferQueue<E> exte
756      private void writeObject(java.io.ObjectOutputStream s)
757          throws java.io.IOException {
758          s.defaultWriteObject();
759 <        for (Iterator<E> it = iterator(); it.hasNext(); )
760 <            s.writeObject(it.next());
759 >        for (E e : this)
760 >            s.writeObject(e);
761          // Use trailing null as sentinel
762          s.writeObject(null);
763      }
# Line 711 | Line 765 | public class LinkedTransferQueue<E> exte
765      /**
766       * Reconstitute the Queue instance from a stream (that is,
767       * deserialize it).
768 +     *
769       * @param s the stream
770       */
771      private void readObject(java.io.ObjectInputStream s)
# Line 718 | Line 773 | public class LinkedTransferQueue<E> exte
773          s.defaultReadObject();
774          resetHeadAndTail();
775          for (;;) {
776 <            E item = (E)s.readObject();
776 >            E item = (E) s.readObject();
777              if (item == null)
778                  break;
779              else
# Line 730 | Line 785 | public class LinkedTransferQueue<E> exte
785      // Support for resetting head/tail while deserializing
786      private void resetHeadAndTail() {
787          QNode dummy = new QNode(null, false);
788 <        _unsafe.putObjectVolatile(this, headOffset,
788 >        UNSAFE.putObjectVolatile(this, headOffset,
789                                    new PaddedAtomicReference<QNode>(dummy));
790 <        _unsafe.putObjectVolatile(this, tailOffset,
790 >        UNSAFE.putObjectVolatile(this, tailOffset,
791                                    new PaddedAtomicReference<QNode>(dummy));
792 <        _unsafe.putObjectVolatile(this, cleanMeOffset,
792 >        UNSAFE.putObjectVolatile(this, cleanMeOffset,
793                                    new PaddedAtomicReference<QNode>(null));
794      }
795  
# Line 764 | Line 819 | public class LinkedTransferQueue<E> exte
819  
820      private static long fieldOffset(String fieldName)
821              throws NoSuchFieldException {
822 <        return _unsafe.objectFieldOffset
822 >        return UNSAFE.objectFieldOffset
823              (LinkedTransferQueue.class.getDeclaredField(fieldName));
824      }
825  
826 <    private static final Unsafe _unsafe;
826 >    private static final Unsafe UNSAFE;
827      private static final long headOffset;
828      private static final long tailOffset;
829      private static final long cleanMeOffset;
830      static {
831          try {
832 <            _unsafe = getUnsafe();
832 >            UNSAFE = getUnsafe();
833              headOffset = fieldOffset("head");
834              tailOffset = fieldOffset("tail");
835              cleanMeOffset = fieldOffset("cleanMe");

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines