ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/jsr166y/LinkedTransferQueue.java (file contents):
Revision 1.9 by dl, Sun Nov 16 20:24:54 2008 UTC vs.
Revision 1.23 by jsr166, Thu Jul 23 23:07:57 2009 UTC

# Line 21 | Line 21 | import java.lang.reflect.*;
21   * producer.  The <em>tail</em> of the queue is that element that has
22   * been on the queue the shortest time for some producer.
23   *
24 < * <p>Beware that, unlike in most collections, the <tt>size</tt>
24 > * <p>Beware that, unlike in most collections, the {@code size}
25   * method is <em>NOT</em> a constant-time operation. Because of the
26   * asynchronous nature of these queues, determining the current number
27   * of elements requires a traversal of the elements.
# Line 44 | Line 44 | import java.lang.reflect.*;
44   * @since 1.7
45   * @author Doug Lea
46   * @param <E> the type of elements held in this collection
47 *
47   */
48   public class LinkedTransferQueue<E> extends AbstractQueue<E>
49      implements TransferQueue<E>, java.io.Serializable {
# Line 81 | Line 80 | public class LinkedTransferQueue<E> exte
80       * seems not to vary with number of CPUs (beyond 2) so is just
81       * a constant.
82       */
83 <    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
83 >    static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
84  
85      /**
86       * The number of times to spin before blocking in untimed waits.
# Line 116 | Line 115 | public class LinkedTransferQueue<E> exte
115              nextUpdater = AtomicReferenceFieldUpdater.newUpdater
116              (QNode.class, QNode.class, "next");
117  
118 <        boolean casNext(QNode cmp, QNode val) {
118 >        final boolean casNext(QNode cmp, QNode val) {
119              return nextUpdater.compareAndSet(this, cmp, val);
120          }
121 +
122 +        final void clearNext() {
123 +            nextUpdater.lazySet(this, this);
124 +        }
125 +
126      }
127  
128      /**
# Line 135 | Line 139 | public class LinkedTransferQueue<E> exte
139  
140      /** head of the queue */
141      private transient final PaddedAtomicReference<QNode> head;
142 +
143      /** tail of the queue */
144      private transient final PaddedAtomicReference<QNode> tail;
145  
# Line 151 | Line 156 | public class LinkedTransferQueue<E> exte
156       */
157      private boolean advanceHead(QNode h, QNode nh) {
158          if (h == head.get() && head.compareAndSet(h, nh)) {
159 <            h.next = h; // forget old next
159 >            h.clearNext(); // forget old next
160              return true;
161          }
162          return false;
# Line 161 | Line 166 | public class LinkedTransferQueue<E> exte
166       * Puts or takes an item. Used for most queue operations (except
167       * poll() and tryTransfer()). See the similar code in
168       * SynchronousQueue for detailed explanation.
169 +     *
170       * @param e the item or if null, signifies that this is a take
171       * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
172       * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
# Line 197 | Line 203 | public class LinkedTransferQueue<E> exte
203                      Object x = first.get();
204                      if (x != first && first.compareAndSet(x, e)) {
205                          LockSupport.unpark(first.waiter);
206 <                        return isData? e : x;
206 >                        return isData ? e : x;
207                      }
208                  }
209              }
# Line 207 | Line 213 | public class LinkedTransferQueue<E> exte
213  
214      /**
215       * Version of xfer for poll() and tryTransfer, which
216 <     * simplifies control paths both here and in xfer
216 >     * simplifies control paths both here and in xfer.
217       */
218      private Object fulfill(Object e) {
219          boolean isData = (e != null);
# Line 235 | Line 241 | public class LinkedTransferQueue<E> exte
241                      Object x = first.get();
242                      if (x != first && first.compareAndSet(x, e)) {
243                          LockSupport.unpark(first.waiter);
244 <                        return isData? e : x;
244 >                        return isData ? e : x;
245                      }
246                  }
247              }
# Line 258 | Line 264 | public class LinkedTransferQueue<E> exte
264          if (mode == NOWAIT)
265              return null;
266  
267 <        long lastTime = (mode == TIMEOUT)? System.nanoTime() : 0;
267 >        long lastTime = (mode == TIMEOUT) ? System.nanoTime() : 0;
268          Thread w = Thread.currentThread();
269          int spins = -1; // set to desired spin count below
270          for (;;) {
# Line 267 | Line 273 | public class LinkedTransferQueue<E> exte
273              Object x = s.get();
274              if (x != e) {                 // Node was matched or cancelled
275                  advanceHead(pred, s);     // unlink if head
276 <                if (x == s) {              // was cancelled
276 >                if (x == s) {             // was cancelled
277                      clean(pred, s);
278                      return null;
279                  }
# Line 290 | Line 296 | public class LinkedTransferQueue<E> exte
296              if (spins < 0) {
297                  QNode h = head.get(); // only spin if at head
298                  spins = ((h != null && h.next == s) ?
299 <                         (mode == TIMEOUT?
299 >                         ((mode == TIMEOUT) ?
300                            maxTimedSpins : maxUntimedSpins) : 0);
301              }
302              if (spins > 0)
# Line 298 | Line 304 | public class LinkedTransferQueue<E> exte
304              else if (s.waiter == null)
305                  s.waiter = w;
306              else if (mode != TIMEOUT) {
307 <                //                LockSupport.park(this);
302 <                LockSupport.park(); // allows run on java5
307 >                LockSupport.park(this);
308                  s.waiter = null;
309                  spins = -1;
310              }
311              else if (nanos > spinForTimeoutThreshold) {
312 <                //                LockSupport.parkNanos(this, nanos);
308 <                LockSupport.parkNanos(nanos);
312 >                LockSupport.parkNanos(this, nanos);
313                  s.waiter = null;
314                  spins = -1;
315              }
# Line 313 | Line 317 | public class LinkedTransferQueue<E> exte
317      }
318  
319      /**
320 <     * Returns validated tail for use in cleaning methods
320 >     * Returns validated tail for use in cleaning methods.
321       */
322      private QNode getValidatedTail() {
323          for (;;) {
# Line 332 | Line 336 | public class LinkedTransferQueue<E> exte
336                      return t;
337              }
338          }
339 <    }    
339 >    }
340  
341      /**
342       * Gets rid of cancelled node s with original predecessor pred.
343 +     *
344       * @param pred predecessor of cancelled node
345       * @param s the cancelled node
346       */
# Line 346 | Line 351 | public class LinkedTransferQueue<E> exte
351              if (w != Thread.currentThread())
352                  LockSupport.unpark(w);
353          }
354 +
355 +        if (pred == null)
356 +            return;
357 +
358          /*
359           * At any given time, exactly one node on list cannot be
360           * deleted -- the last inserted node. To accommodate this, if
# Line 359 | Line 368 | public class LinkedTransferQueue<E> exte
368              QNode t = getValidatedTail();
369              if (s != t) {               // If not tail, try to unsplice
370                  QNode sn = s.next;      // s.next == s means s already off list
371 <                if (sn == s || pred.casNext(s, sn))
371 >                if (sn == s || pred.casNext(s, sn))
372                      break;
373              }
374              else if (oldpred == pred || // Already saved
# Line 371 | Line 380 | public class LinkedTransferQueue<E> exte
380      /**
381       * Tries to unsplice the cancelled node held in cleanMe that was
382       * previously uncleanable because it was at tail.
383 +     *
384       * @return current cleanMe node (or null)
385       */
386      private QNode reclean() {
387 <        /*
387 >        /*
388           * cleanMe is, or at one time was, predecessor of cancelled
389           * node s that was the tail so could not be unspliced.  If s
390           * is no longer the tail, try to unsplice if necessary and
# Line 383 | Line 393 | public class LinkedTransferQueue<E> exte
393           * points to a cancelled node that must be unspliced -- if
394           * not, we can (must) clear cleanMe without unsplicing.
395           * This can loop only due to contention on casNext or
396 <         * clearing cleanMe.
396 >         * clearing cleanMe.
397           */
398          QNode pred;
399          while ((pred = cleanMe.get()) != null) {
400              QNode t = getValidatedTail();
401              QNode s = pred.next;
402 <            if (s != t) {
402 >            if (s != t) {
403                  QNode sn;
404                  if (s == null || s == pred || s.get() != s ||
405                      (sn = s.next) == s || pred.casNext(s, sn))
# Line 402 | Line 412 | public class LinkedTransferQueue<E> exte
412      }
413  
414      /**
415 <     * Creates an initially empty <tt>LinkedTransferQueue</tt>.
415 >     * Creates an initially empty {@code LinkedTransferQueue}.
416       */
417      public LinkedTransferQueue() {
418          QNode dummy = new QNode(null, false);
# Line 412 | Line 422 | public class LinkedTransferQueue<E> exte
422      }
423  
424      /**
425 <     * Creates a <tt>LinkedTransferQueue</tt>
425 >     * Creates a {@code LinkedTransferQueue}
426       * initially containing the elements of the given collection,
427       * added in traversal order of the collection's iterator.
428 +     *
429       * @param c the collection of elements to initially contain
430       * @throws NullPointerException if the specified collection or any
431       *         of its elements are null
# Line 444 | Line 455 | public class LinkedTransferQueue<E> exte
455          return true;
456      }
457  
458 +    public boolean add(E e) {
459 +        if (e == null) throw new NullPointerException();
460 +        xfer(e, NOWAIT, 0);
461 +        return true;
462 +    }
463 +
464      public void transfer(E e) throws InterruptedException {
465          if (e == null) throw new NullPointerException();
466          if (xfer(e, WAIT, 0) == null) {
# Line 470 | Line 487 | public class LinkedTransferQueue<E> exte
487      public E take() throws InterruptedException {
488          Object e = xfer(null, WAIT, 0);
489          if (e != null)
490 <            return (E)e;
490 >            return (E) e;
491          Thread.interrupted();
492          throw new InterruptedException();
493      }
# Line 478 | Line 495 | public class LinkedTransferQueue<E> exte
495      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
496          Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
497          if (e != null || !Thread.interrupted())
498 <            return (E)e;
498 >            return (E) e;
499          throw new InterruptedException();
500      }
501  
502      public E poll() {
503 <        return (E)fulfill(null);
503 >        return (E) fulfill(null);
504      }
505  
506      public int drainTo(Collection<? super E> c) {
# Line 517 | Line 534 | public class LinkedTransferQueue<E> exte
534      // Traversal-based methods
535  
536      /**
537 <     * Return head after performing any outstanding helping steps
537 >     * Returns head after performing any outstanding helping steps.
538       */
539      private QNode traversalHead() {
540          for (;;) {
# Line 540 | Line 557 | public class LinkedTransferQueue<E> exte
557                          return h;
558                  }
559              }
560 +            reclean();
561          }
562      }
563  
# Line 556 | Line 574 | public class LinkedTransferQueue<E> exte
574       * if subsequently removed.
575       */
576      class Itr implements Iterator<E> {
577 <        QNode nextNode;    // Next node to return next
578 <        QNode currentNode; // last returned node, for remove()
579 <        QNode prevNode;    // predecessor of last returned node
580 <        E nextItem;        // Cache of next item, once commited to in next
577 >        QNode next;        // node to return next
578 >        QNode pnext;       // predecessor of next
579 >        QNode snext;       // successor of next
580 >        QNode curr;        // last returned node, for remove()
581 >        QNode pcurr;       // predecessor of curr, for remove()
582 >        E nextItem;        // Cache of next item, once committed to in next
583  
584          Itr() {
585 <            nextNode = traversalHead();
566 <            advance();
585 >            findNext();
586          }
587  
588 <        E advance() {
589 <            prevNode = currentNode;
590 <            currentNode = nextNode;
591 <            E x = nextItem;
573 <
574 <            QNode p = nextNode.next;
588 >        /**
589 >         * Ensures next points to next valid node, or null if none.
590 >         */
591 >        void findNext() {
592              for (;;) {
593 <                if (p == null || !p.isData) {
594 <                    nextNode = null;
595 <                    nextItem = null;
596 <                    return x;
597 <                }
598 <                Object item = p.get();
599 <                if (item != p && item != null) {
600 <                    nextNode = p;
601 <                    nextItem = (E)item;
602 <                    return x;
593 >                QNode pred = pnext;
594 >                QNode q = next;
595 >                if (pred == null || pred == q) {
596 >                    pred = traversalHead();
597 >                    q = pred.next;
598 >                }
599 >                if (q == null || !q.isData) {
600 >                    next = null;
601 >                    return;
602 >                }
603 >                Object x = q.get();
604 >                QNode s = q.next;
605 >                if (x != null && q != x && q != s) {
606 >                    nextItem = (E) x;
607 >                    snext = s;
608 >                    pnext = pred;
609 >                    next = q;
610 >                    return;
611                  }
612 <                prevNode = p;
613 <                p = p.next;
612 >                pnext = q;
613 >                next = s;
614              }
615          }
616  
617          public boolean hasNext() {
618 <            return nextNode != null;
618 >            return next != null;
619          }
620  
621          public E next() {
622 <            if (nextNode == null) throw new NoSuchElementException();
623 <            return advance();
622 >            if (next == null) throw new NoSuchElementException();
623 >            pcurr = pnext;
624 >            curr = next;
625 >            pnext = next;
626 >            next = snext;
627 >            E x = nextItem;
628 >            findNext();
629 >            return x;
630          }
631  
632          public void remove() {
633 <            QNode p = currentNode;
634 <            QNode prev = prevNode;
604 <            if (prev == null || p == null)
633 >            QNode p = curr;
634 >            if (p == null)
635                  throw new IllegalStateException();
636              Object x = p.get();
637              if (x != null && x != p && p.compareAndSet(x, p))
638 <                clean(prev, p);
638 >                clean(pcurr, p);
639          }
640      }
641  
# Line 620 | Line 650 | public class LinkedTransferQueue<E> exte
650                  if (!p.isData)
651                      return null;
652                  if (x != null)
653 <                    return (E)x;
653 >                    return (E) x;
654              }
655          }
656      }
# Line 655 | Line 685 | public class LinkedTransferQueue<E> exte
685  
686      /**
687       * Returns the number of elements in this queue.  If this queue
688 <     * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
689 <     * <tt>Integer.MAX_VALUE</tt>.
688 >     * contains more than {@code Integer.MAX_VALUE} elements, returns
689 >     * {@code Integer.MAX_VALUE}.
690       *
691       * <p>Beware that, unlike in most collections, this method is
692       * <em>NOT</em> a constant-time operation. Because of the
# Line 694 | Line 724 | public class LinkedTransferQueue<E> exte
724          return Integer.MAX_VALUE;
725      }
726  
727 +    public boolean remove(Object o) {
728 +        if (o == null)
729 +            return false;
730 +        for (;;) {
731 +            QNode pred = traversalHead();
732 +            for (;;) {
733 +                QNode q = pred.next;
734 +                if (q == null || !q.isData)
735 +                    return false;
736 +                if (q == pred) // restart
737 +                    break;
738 +                Object x = q.get();
739 +                if (x != null && x != q && o.equals(x) &&
740 +                    q.compareAndSet(x, q)) {
741 +                    clean(pred, q);
742 +                    return true;
743 +                }
744 +                pred = q;
745 +            }
746 +        }
747 +    }
748 +
749      /**
750       * Save the state to a stream (that is, serialize it).
751       *
752 <     * @serialData All of the elements (each an <tt>E</tt>) in
752 >     * @serialData All of the elements (each an {@code E}) in
753       * the proper order, followed by a null
754       * @param s the stream
755       */
756      private void writeObject(java.io.ObjectOutputStream s)
757          throws java.io.IOException {
758          s.defaultWriteObject();
759 <        for (Iterator<E> it = iterator(); it.hasNext(); )
760 <            s.writeObject(it.next());
759 >        for (E e : this)
760 >            s.writeObject(e);
761          // Use trailing null as sentinel
762          s.writeObject(null);
763      }
# Line 713 | Line 765 | public class LinkedTransferQueue<E> exte
765      /**
766       * Reconstitute the Queue instance from a stream (that is,
767       * deserialize it).
768 +     *
769       * @param s the stream
770       */
771      private void readObject(java.io.ObjectInputStream s)
# Line 720 | Line 773 | public class LinkedTransferQueue<E> exte
773          s.defaultReadObject();
774          resetHeadAndTail();
775          for (;;) {
776 <            E item = (E)s.readObject();
776 >            E item = (E) s.readObject();
777              if (item == null)
778                  break;
779              else
# Line 730 | Line 783 | public class LinkedTransferQueue<E> exte
783  
784  
785      // Support for resetting head/tail while deserializing
786 +    private void resetHeadAndTail() {
787 +        QNode dummy = new QNode(null, false);
788 +        UNSAFE.putObjectVolatile(this, headOffset,
789 +                                  new PaddedAtomicReference<QNode>(dummy));
790 +        UNSAFE.putObjectVolatile(this, tailOffset,
791 +                                  new PaddedAtomicReference<QNode>(dummy));
792 +        UNSAFE.putObjectVolatile(this, cleanMeOffset,
793 +                                  new PaddedAtomicReference<QNode>(null));
794 +    }
795  
796      // Temporary Unsafe mechanics for preliminary release
797 <    private static final Unsafe _unsafe;
797 >    private static Unsafe getUnsafe() throws Throwable {
798 >        try {
799 >            return Unsafe.getUnsafe();
800 >        } catch (SecurityException se) {
801 >            try {
802 >                return java.security.AccessController.doPrivileged
803 >                    (new java.security.PrivilegedExceptionAction<Unsafe>() {
804 >                        public Unsafe run() throws Exception {
805 >                            return getUnsafePrivileged();
806 >                        }});
807 >            } catch (java.security.PrivilegedActionException e) {
808 >                throw e.getCause();
809 >            }
810 >        }
811 >    }
812 >
813 >    private static Unsafe getUnsafePrivileged()
814 >            throws NoSuchFieldException, IllegalAccessException {
815 >        Field f = Unsafe.class.getDeclaredField("theUnsafe");
816 >        f.setAccessible(true);
817 >        return (Unsafe) f.get(null);
818 >    }
819 >
820 >    private static long fieldOffset(String fieldName)
821 >            throws NoSuchFieldException {
822 >        return UNSAFE.objectFieldOffset
823 >            (LinkedTransferQueue.class.getDeclaredField(fieldName));
824 >    }
825 >
826 >    private static final Unsafe UNSAFE;
827      private static final long headOffset;
828      private static final long tailOffset;
829      private static final long cleanMeOffset;
830      static {
831          try {
832 <            if (LinkedTransferQueue.class.getClassLoader() != null) {
833 <                Field f = Unsafe.class.getDeclaredField("theUnsafe");
834 <                f.setAccessible(true);
835 <                _unsafe = (Unsafe)f.get(null);
836 <            }
746 <            else
747 <                _unsafe = Unsafe.getUnsafe();
748 <            headOffset = _unsafe.objectFieldOffset
749 <                (LinkedTransferQueue.class.getDeclaredField("head"));
750 <            tailOffset = _unsafe.objectFieldOffset
751 <                (LinkedTransferQueue.class.getDeclaredField("tail"));
752 <            cleanMeOffset = _unsafe.objectFieldOffset
753 <                (LinkedTransferQueue.class.getDeclaredField("cleanMe"));
754 <        } catch (Exception e) {
832 >            UNSAFE = getUnsafe();
833 >            headOffset = fieldOffset("head");
834 >            tailOffset = fieldOffset("tail");
835 >            cleanMeOffset = fieldOffset("cleanMe");
836 >        } catch (Throwable e) {
837              throw new RuntimeException("Could not initialize intrinsics", e);
838          }
839      }
840  
759    private void resetHeadAndTail() {
760        QNode dummy = new QNode(null, false);
761        _unsafe.putObjectVolatile(this, headOffset,
762                                  new PaddedAtomicReference<QNode>(dummy));
763        _unsafe.putObjectVolatile(this, tailOffset,
764                                  new PaddedAtomicReference<QNode>(dummy));
765        _unsafe.putObjectVolatile(this, cleanMeOffset,
766                                  new PaddedAtomicReference<QNode>(null));
767
768    }
769
841   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines