ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.10 by jsr166, Mon Jan 5 03:43:07 2009 UTC vs.
Revision 1.24 by jsr166, Thu Jul 23 23:23:41 2009 UTC

# Line 21 | Line 21 | import java.lang.reflect.*;
21   * producer.  The <em>tail</em> of the queue is that element that has
22   * been on the queue the shortest time for some producer.
23   *
24 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
24 > * <p>Beware that, unlike in most collections, the {@code size}
25   * method is <em>NOT</em> a constant-time operation. Because of the
26   * asynchronous nature of these queues, determining the current number
27   * of elements requires a traversal of the elements.
# Line 44 | Line 44 | import java.lang.reflect.*;
44   * @since 1.7
45   * @author Doug Lea
46   * @param <E> the type of elements held in this collection
47 *
47   */
48   public class LinkedTransferQueue<E> extends AbstractQueue<E>
49      implements TransferQueue<E>, java.io.Serializable {
# Line 81 | Line 80 | public class LinkedTransferQueue<E> exte
80       * seems not to vary with number of CPUs (beyond 2) so is just
81       * a constant.
82       */
83 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
83 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
84  
85      /**
86       * The number of times to spin before blocking in untimed waits.
# Line 116 | Line 115 | public class LinkedTransferQueue<E> exte
115              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
116              (QNode.class, QNode.class, "next");
117  
118 <        boolean casNext(QNode cmp, QNode val) {
118 >        final boolean casNext(QNode cmp, QNode val) {
119              return nextUpdater.compareAndSet(this, cmp, val);
120          }
121 +
122 +        final void clearNext() {
123 +            nextUpdater.lazySet(this, this);
124 +        }
125 +
126 +        private static final long serialVersionUID = -3375979862319811754L;
127      }
128  
129      /**
# Line 130 | Line 135 | public class LinkedTransferQueue<E> exte
135          // enough padding for 64bytes with 4byte refs
136          Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
137          PaddedAtomicReference(T r) { super(r); }
138 +        private static final long serialVersionUID = 8170090609809740854L;
139      }
140  
141  
142      /** head of the queue */
143      private transient final PaddedAtomicReference<QNode> head;
144 +
145      /** tail of the queue */
146      private transient final PaddedAtomicReference<QNode> tail;
147  
# Line 151 | Line 158 | public class LinkedTransferQueue<E> exte
158       */
159      private boolean advanceHead(QNode h, QNode nh) {
160          if (h == head.get() && head.compareAndSet(h, nh)) {
161 <            h.next = h; // forget old next
161 >            h.clearNext(); // forget old next
162              return true;
163          }
164          return false;
# Line 161 | Line 168 | public class LinkedTransferQueue<E> exte
168       * Puts or takes an item. Used for most queue operations (except
169       * poll() and tryTransfer()). See the similar code in
170       * SynchronousQueue for detailed explanation.
171 +     *
172       * @param e the item or if null, signifies that this is a take
173       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
174       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 197 | Line 205 | public class LinkedTransferQueue<E> exte
205                      Object x = first.get();
206                      if (x != first && first.compareAndSet(x, e)) {
207                          LockSupport.unpark(first.waiter);
208 <                        return isData? e : x;
208 >                        return isData ? e : x;
209                      }
210                  }
211              }
# Line 207 | Line 215 | public class LinkedTransferQueue<E> exte
215  
216      /**
217       * Version of xfer for poll() and tryTransfer, which
218 <     * simplifies control paths both here and in xfer
218 >     * simplifies control paths both here and in xfer.
219       */
220      private Object fulfill(Object e) {
221          boolean isData = (e != null);
# Line 235 | Line 243 | public class LinkedTransferQueue<E> exte
243                      Object x = first.get();
244                      if (x != first && first.compareAndSet(x, e)) {
245                          LockSupport.unpark(first.waiter);
246 <                        return isData? e : x;
246 >                        return isData ? e : x;
247                      }
248                  }
249              }
# Line 258 | Line 266 | public class LinkedTransferQueue<E> exte
266          if (mode == NOWAIT)
267              return null;
268  
269 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
269 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
270          Thread w = Thread.currentThread();
271          int spins = -1; // set to desired spin count below
272          for (;;) {
# Line 267 | Line 275 | public class LinkedTransferQueue<E> exte
275              Object x = s.get();
276              if (x != e) {                 // Node was matched or cancelled
277                  advanceHead(pred, s);     // unlink if head
278 <                if (x == s) {              // was cancelled
278 >                if (x == s) {             // was cancelled
279                      clean(pred, s);
280                      return null;
281                  }
# Line 290 | Line 298 | public class LinkedTransferQueue<E> exte
298              if (spins < 0) {
299                  QNode h = head.get(); // only spin if at head
300                  spins = ((h != null && h.next == s) ?
301 <                         (mode == TIMEOUT?
301 >                         ((mode == TIMEOUT) ?
302                            maxTimedSpins : maxUntimedSpins) : 0);
303              }
304              if (spins > 0)
# Line 298 | Line 306 | public class LinkedTransferQueue<E> exte
306              else if (s.waiter == null)
307                  s.waiter = w;
308              else if (mode != TIMEOUT) {
309 <                //                LockSupport.park(this);
302 <                LockSupport.park(); // allows run on java5
309 >                LockSupport.park(this);
310                  s.waiter = null;
311                  spins = -1;
312              }
313              else if (nanos > spinForTimeoutThreshold) {
314 <                //                LockSupport.parkNanos(this, nanos);
308 <                LockSupport.parkNanos(nanos);
314 >                LockSupport.parkNanos(this, nanos);
315                  s.waiter = null;
316                  spins = -1;
317              }
# Line 313 | Line 319 | public class LinkedTransferQueue<E> exte
319      }
320  
321      /**
322 <     * Returns validated tail for use in cleaning methods
322 >     * Returns validated tail for use in cleaning methods.
323       */
324      private QNode getValidatedTail() {
325          for (;;) {
# Line 336 | Line 342 | public class LinkedTransferQueue<E> exte
342  
343      /**
344       * Gets rid of cancelled node s with original predecessor pred.
345 +     *
346       * @param pred predecessor of cancelled node
347       * @param s the cancelled node
348       */
# Line 346 | Line 353 | public class LinkedTransferQueue<E> exte
353              if (w != Thread.currentThread())
354                  LockSupport.unpark(w);
355          }
356 +
357 +        if (pred == null)
358 +            return;
359 +
360          /*
361           * At any given time, exactly one node on list cannot be
362           * deleted -- the last inserted node. To accommodate this, if
# Line 371 | Line 382 | public class LinkedTransferQueue<E> exte
382      /**
383       * Tries to unsplice the cancelled node held in cleanMe that was
384       * previously uncleanable because it was at tail.
385 +     *
386       * @return current cleanMe node (or null)
387       */
388      private QNode reclean() {
# Line 402 | Line 414 | public class LinkedTransferQueue<E> exte
414      }
415  
416      /**
417 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
417 >     * Creates an initially empty {@code LinkedTransferQueue}.
418       */
419      public LinkedTransferQueue() {
420          QNode dummy = new QNode(null, false);
# Line 412 | Line 424 | public class LinkedTransferQueue<E> exte
424      }
425  
426      /**
427 <     * Creates a <tt>LinkedTransferQueue</tt>
427 >     * Creates a {@code LinkedTransferQueue}
428       * initially containing the elements of the given collection,
429       * added in traversal order of the collection's iterator.
430 +     *
431       * @param c the collection of elements to initially contain
432       * @throws NullPointerException if the specified collection or any
433       *         of its elements are null
# Line 444 | Line 457 | public class LinkedTransferQueue<E> exte
457          return true;
458      }
459  
460 +    public boolean add(E e) {
461 +        if (e == null) throw new NullPointerException();
462 +        xfer(e, NOWAIT, 0);
463 +        return true;
464 +    }
465 +
466      public void transfer(E e) throws InterruptedException {
467          if (e == null) throw new NullPointerException();
468          if (xfer(e, WAIT, 0) == null) {
# Line 470 | Line 489 | public class LinkedTransferQueue<E> exte
489      public E take() throws InterruptedException {
490          Object e = xfer(null, WAIT, 0);
491          if (e != null)
492 <            return (E)e;
492 >            return (E) e;
493          Thread.interrupted();
494          throw new InterruptedException();
495      }
# Line 478 | Line 497 | public class LinkedTransferQueue<E> exte
497      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
498          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
499          if (e != null || !Thread.interrupted())
500 <            return (E)e;
500 >            return (E) e;
501          throw new InterruptedException();
502      }
503  
504      public E poll() {
505 <        return (E)fulfill(null);
505 >        return (E) fulfill(null);
506      }
507  
508      public int drainTo(Collection<? super E> c) {
# Line 517 | Line 536 | public class LinkedTransferQueue<E> exte
536      // Traversal-based methods
537  
538      /**
539 <     * Return head after performing any outstanding helping steps
539 >     * Returns head after performing any outstanding helping steps.
540       */
541      private QNode traversalHead() {
542          for (;;) {
# Line 540 | Line 559 | public class LinkedTransferQueue<E> exte
559                          return h;
560                  }
561              }
562 +            reclean();
563          }
564      }
565  
# Line 556 | Line 576 | public class LinkedTransferQueue<E> exte
576       * if subsequently removed.
577       */
578      class Itr implements Iterator<E> {
579 <        QNode nextNode;    // Next node to return next
580 <        QNode currentNode; // last returned node, for remove()
581 <        QNode prevNode;    // predecessor of last returned node
582 <        E nextItem;        // Cache of next item, once commited to in next
579 >        QNode next;        // node to return next
580 >        QNode pnext;       // predecessor of next
581 >        QNode snext;       // successor of next
582 >        QNode curr;        // last returned node, for remove()
583 >        QNode pcurr;       // predecessor of curr, for remove()
584 >        E nextItem;        // Cache of next item, once committed to in next
585  
586          Itr() {
587 <            nextNode = traversalHead();
566 <            advance();
587 >            findNext();
588          }
589  
590 <        E advance() {
591 <            prevNode = currentNode;
592 <            currentNode = nextNode;
593 <            E x = nextItem;
573 <
574 <            QNode p = nextNode.next;
590 >        /**
591 >         * Ensures next points to next valid node, or null if none.
592 >         */
593 >        void findNext() {
594              for (;;) {
595 <                if (p == null || !p.isData) {
596 <                    nextNode = null;
597 <                    nextItem = null;
598 <                    return x;
595 >                QNode pred = pnext;
596 >                QNode q = next;
597 >                if (pred == null || pred == q) {
598 >                    pred = traversalHead();
599 >                    q = pred.next;
600 >                }
601 >                if (q == null || !q.isData) {
602 >                    next = null;
603 >                    return;
604 >                }
605 >                Object x = q.get();
606 >                QNode s = q.next;
607 >                if (x != null && q != x && q != s) {
608 >                    nextItem = (E) x;
609 >                    snext = s;
610 >                    pnext = pred;
611 >                    next = q;
612 >                    return;
613                  }
614 <                Object item = p.get();
615 <                if (item != p && item != null) {
583 <                    nextNode = p;
584 <                    nextItem = (E)item;
585 <                    return x;
586 <                }
587 <                prevNode = p;
588 <                p = p.next;
614 >                pnext = q;
615 >                next = s;
616              }
617          }
618  
619          public boolean hasNext() {
620 <            return nextNode != null;
620 >            return next != null;
621          }
622  
623          public E next() {
624 <            if (nextNode == null) throw new NoSuchElementException();
625 <            return advance();
624 >            if (next == null) throw new NoSuchElementException();
625 >            pcurr = pnext;
626 >            curr = next;
627 >            pnext = next;
628 >            next = snext;
629 >            E x = nextItem;
630 >            findNext();
631 >            return x;
632          }
633  
634          public void remove() {
635 <            QNode p = currentNode;
636 <            QNode prev = prevNode;
604 <            if (prev == null || p == null)
635 >            QNode p = curr;
636 >            if (p == null)
637                  throw new IllegalStateException();
638              Object x = p.get();
639              if (x != null && x != p && p.compareAndSet(x, p))
640 <                clean(prev, p);
640 >                clean(pcurr, p);
641          }
642      }
643  
# Line 620 | Line 652 | public class LinkedTransferQueue<E> exte
652                  if (!p.isData)
653                      return null;
654                  if (x != null)
655 <                    return (E)x;
655 >                    return (E) x;
656              }
657          }
658      }
# Line 655 | Line 687 | public class LinkedTransferQueue<E> exte
687  
688      /**
689       * Returns the number of elements in this queue.  If this queue
690 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
691 <     * <tt>Integer.MAX_VALUE</tt>.
690 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
691 >     * {@code Integer.MAX_VALUE}.
692       *
693       * <p>Beware that, unlike in most collections, this method is
694       * <em>NOT</em> a constant-time operation. Because of the
# Line 694 | Line 726 | public class LinkedTransferQueue<E> exte
726          return Integer.MAX_VALUE;
727      }
728  
729 +    public boolean remove(Object o) {
730 +        if (o == null)
731 +            return false;
732 +        for (;;) {
733 +            QNode pred = traversalHead();
734 +            for (;;) {
735 +                QNode q = pred.next;
736 +                if (q == null || !q.isData)
737 +                    return false;
738 +                if (q == pred) // restart
739 +                    break;
740 +                Object x = q.get();
741 +                if (x != null && x != q && o.equals(x) &&
742 +                    q.compareAndSet(x, q)) {
743 +                    clean(pred, q);
744 +                    return true;
745 +                }
746 +                pred = q;
747 +            }
748 +        }
749 +    }
750 +
751      /**
752       * Save the state to a stream (that is, serialize it).
753       *
754 <     * @serialData All of the elements (each an <tt>E</tt>) in
754 >     * @serialData All of the elements (each an {@code E}) in
755       * the proper order, followed by a null
756       * @param s the stream
757       */
758      private void writeObject(java.io.ObjectOutputStream s)
759          throws java.io.IOException {
760          s.defaultWriteObject();
761 <        for (Iterator<E> it = iterator(); it.hasNext(); )
762 <            s.writeObject(it.next());
761 >        for (E e : this)
762 >            s.writeObject(e);
763          // Use trailing null as sentinel
764          s.writeObject(null);
765      }
# Line 713 | Line 767 | public class LinkedTransferQueue<E> exte
767      /**
768       * Reconstitute the Queue instance from a stream (that is,
769       * deserialize it).
770 +     *
771       * @param s the stream
772       */
773      private void readObject(java.io.ObjectInputStream s)
# Line 720 | Line 775 | public class LinkedTransferQueue<E> exte
775          s.defaultReadObject();
776          resetHeadAndTail();
777          for (;;) {
778 <            E item = (E)s.readObject();
778 >            E item = (E) s.readObject();
779              if (item == null)
780                  break;
781              else
# Line 730 | Line 785 | public class LinkedTransferQueue<E> exte
785  
786  
787      // Support for resetting head/tail while deserializing
788 +    private void resetHeadAndTail() {
789 +        QNode dummy = new QNode(null, false);
790 +        UNSAFE.putObjectVolatile(this, headOffset,
791 +                                  new PaddedAtomicReference<QNode>(dummy));
792 +        UNSAFE.putObjectVolatile(this, tailOffset,
793 +                                  new PaddedAtomicReference<QNode>(dummy));
794 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
795 +                                  new PaddedAtomicReference<QNode>(null));
796 +    }
797  
798      // Temporary Unsafe mechanics for preliminary release
799 <    private static final Unsafe _unsafe;
799 >    private static Unsafe getUnsafe() throws Throwable {
800 >        try {
801 >            return Unsafe.getUnsafe();
802 >        } catch (SecurityException se) {
803 >            try {
804 >                return java.security.AccessController.doPrivileged
805 >                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
806 >                        public Unsafe run() throws Exception {
807 >                            return getUnsafePrivileged();
808 >                        }});
809 >            } catch (java.security.PrivilegedActionException e) {
810 >                throw e.getCause();
811 >            }
812 >        }
813 >    }
814 >
815 >    private static Unsafe getUnsafePrivileged()
816 >            throws NoSuchFieldException, IllegalAccessException {
817 >        Field f = Unsafe.class.getDeclaredField("theUnsafe");
818 >        f.setAccessible(true);
819 >        return (Unsafe) f.get(null);
820 >    }
821 >
822 >    private static long fieldOffset(String fieldName)
823 >            throws NoSuchFieldException {
824 >        return UNSAFE.objectFieldOffset
825 >            (LinkedTransferQueue.class.getDeclaredField(fieldName));
826 >    }
827 >
828 >    private static final Unsafe UNSAFE;
829      private static final long headOffset;
830      private static final long tailOffset;
831      private static final long cleanMeOffset;
832      static {
833          try {
834 <            if (LinkedTransferQueue.class.getClassLoader() != null) {
835 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
836 <                f.setAccessible(true);
837 <                _unsafe = (Unsafe)f.get(null);
838 <            }
746 <            else
747 <                _unsafe = Unsafe.getUnsafe();
748 <            headOffset = _unsafe.objectFieldOffset
749 <                (LinkedTransferQueue.class.getDeclaredField("head"));
750 <            tailOffset = _unsafe.objectFieldOffset
751 <                (LinkedTransferQueue.class.getDeclaredField("tail"));
752 <            cleanMeOffset = _unsafe.objectFieldOffset
753 <                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
754 <        } catch (Exception e) {
834 >            UNSAFE = getUnsafe();
835 >            headOffset = fieldOffset("head");
836 >            tailOffset = fieldOffset("tail");
837 >            cleanMeOffset = fieldOffset("cleanMe");
838 >        } catch (Throwable e) {
839              throw new RuntimeException("Could not initialize intrinsics", e);
840          }
841      }
842  
759    private void resetHeadAndTail() {
760        QNode dummy = new QNode(null, false);
761        _unsafe.putObjectVolatile(this, headOffset,
762                                  new PaddedAtomicReference<QNode>(dummy));
763        _unsafe.putObjectVolatile(this, tailOffset,
764                                  new PaddedAtomicReference<QNode>(dummy));
765        _unsafe.putObjectVolatile(this, cleanMeOffset,
766                                  new PaddedAtomicReference<QNode>(null));
767
768    }
769
843   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines