ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java (file contents):
Revision 1.100 by jsr166, Sun Dec 11 19:59:51 2016 UTC vs.
Revision 1.101 by jsr166, Tue Dec 13 18:55:57 2016 UTC

# Line 10 | Line 10 | import java.util.AbstractQueue;
10   import java.util.Collection;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13 + import java.util.Objects;
14   import java.util.Spliterator;
15   import java.util.Spliterators;
16   import java.util.concurrent.atomic.AtomicInteger;
# Line 665 | Line 666 | public class LinkedBlockingQueue<E> exte
666       * @throws IllegalArgumentException      {@inheritDoc}
667       */
668      public int drainTo(Collection<? super E> c, int maxElements) {
669 <        if (c == null)
669 <            throw new NullPointerException();
669 >        Objects.requireNonNull(c);
670          if (c == this)
671              throw new IllegalArgumentException();
672          if (maxElements <= 0)
# Line 705 | Line 705 | public class LinkedBlockingQueue<E> exte
705      }
706  
707      /**
708 +     * Used for any element traversal that is not entirely under lock.
709 +     * Such traversals must handle both:
710 +     * - dequeued nodes (p.next == p)
711 +     * - (possibly multiple) interior removed nodes (p.item == null)
712 +     */
713 +    Node<E> succ(Node<E> p) {
714 +        return (p == (p = p.next)) ? head.next : p;
715 +    }
716 +
717 +    /**
718       * Returns an iterator over the elements in this queue in proper sequence.
719       * The elements will be returned in order from first (head) to last (tail).
720       *
# Line 743 | Line 753 | public class LinkedBlockingQueue<E> exte
753          }
754  
755          public E next() {
756 <            if (current == null)
756 >            Node<E> p;
757 >            if ((p = current) == null)
758                  throw new NoSuchElementException();
759 +            E ret = currentElement, e = null;
760 +            lastRet = p;
761              fullyLock();
762              try {
763 <                lastRet = current;
764 <                E item = null;
765 <                // Unlike other traversal methods, iterators must handle both:
753 <                // - dequeued nodes (p.next == p)
754 <                // - (possibly multiple) interior removed nodes (p.item == null)
755 <                for (Node<E> p = current, q;; p = q) {
756 <                    if ((q = p.next) == p)
757 <                        q = head.next;
758 <                    if (q == null || (item = q.item) != null) {
759 <                        current = q;
760 <                        E x = currentElement;
761 <                        currentElement = item;
762 <                        return x;
763 <                    }
764 <                }
763 >                for (p = p.next; p != null; p = succ(p))
764 >                    if ((e = p.item) != null)
765 >                        break;
766              } finally {
767                  fullyUnlock();
768              }
769 +            current = p;
770 +            currentElement = e;
771 +            return ret;
772 +        }
773 +
774 +        public void forEachRemaining(Consumer<? super E> action) {
775 +            // A variant of forEachFrom
776 +            Objects.requireNonNull(action);
777 +            Node<E> p;
778 +            if ((p = current) == null) return;
779 +            lastRet = current;
780 +            current = null;
781 +            final int batchSize = 32;
782 +            Object[] es = null;
783 +            int n, len = 1;
784 +            do {
785 +                fullyLock();
786 +                try {
787 +                    if (es == null) {
788 +                        p = p.next;
789 +                        for (Node<E> q = p; q != null; q = succ(q))
790 +                            if (q.item != null && ++len == batchSize)
791 +                                break;
792 +                        es = new Object[len];
793 +                        es[0] = currentElement;
794 +                        currentElement = null;
795 +                        n = 1;
796 +                    } else
797 +                        n = 0;
798 +                    for (; p != null && n < len; p = succ(p))
799 +                        if ((es[n] = p.item) != null) {
800 +                            lastRet = p;
801 +                            n++;
802 +                        }
803 +                } finally {
804 +                    fullyUnlock();
805 +                }
806 +                for (int i = 0; i < n; i++) {
807 +                    @SuppressWarnings("unchecked") E e = (E) es[i];
808 +                    action.accept(e);
809 +                }
810 +            } while (n > 0 && p != null);
811          }
812  
813          public void remove() {
# Line 801 | Line 844 | public class LinkedBlockingQueue<E> exte
844  
845          LBQSpliterator() {}
846  
804        private Node<E> succ(Node<E> p) {
805            return (p == (p = p.next)) ? head.next : p;
806        }
807
847          public long estimateSize() { return est; }
848  
849          public Spliterator<E> trySplit() {
# Line 843 | Line 882 | public class LinkedBlockingQueue<E> exte
882              return null;
883          }
884  
846        public void forEachRemaining(Consumer<? super E> action) {
847            if (action == null) throw new NullPointerException();
848            if (!exhausted) {
849                exhausted = true;
850                Node<E> p = current;
851                current = null;
852                do {
853                    E e = null;
854                    fullyLock();
855                    try {
856                        if (p != null || (p = head.next) != null)
857                            do {
858                                e = p.item;
859                                p = succ(p);
860                            } while (e == null && p != null);
861                    } finally {
862                        fullyUnlock();
863                    }
864                    if (e != null)
865                        action.accept(e);
866                } while (p != null);
867            }
868        }
869
885          public boolean tryAdvance(Consumer<? super E> action) {
886 <            if (action == null) throw new NullPointerException();
886 >            Objects.requireNonNull(action);
887              if (!exhausted) {
888                  Node<E> p = current;
889                  E e = null;
# Line 891 | Line 906 | public class LinkedBlockingQueue<E> exte
906              return false;
907          }
908  
909 +        public void forEachRemaining(Consumer<? super E> action) {
910 +            Objects.requireNonNull(action);
911 +            if (!exhausted) {
912 +                exhausted = true;
913 +                Node<E> p = current;
914 +                current = null;
915 +                forEachFrom(action, p);
916 +            }
917 +        }
918 +
919          public int characteristics() {
920              return (Spliterator.ORDERED |
921                      Spliterator.NONNULL |
# Line 919 | Line 944 | public class LinkedBlockingQueue<E> exte
944      }
945  
946      /**
947 +     * @throws NullPointerException {@inheritDoc}
948 +     */
949 +    public void forEach(Consumer<? super E> action) {
950 +        Objects.requireNonNull(action);
951 +        forEachFrom(action, null);
952 +    }
953 +
954 +    /**
955 +     * Runs action on each element found during a traversal starting at p.
956 +     * If p is null, traversal starts at head.
957 +     */
958 +    void forEachFrom(Consumer<? super E> action, Node<E> p) {
959 +        // Extract batches of elements while holding the lock; then
960 +        // run the action on the elements while not
961 +        final int batchSize = 32;       // max number of elements per batch
962 +        Object[] es = null;             // container for batch of elements
963 +        int n, len = 0;
964 +        do {
965 +            fullyLock();
966 +            try {
967 +                if (es == null) {
968 +                    if (p == null) p = head.next;
969 +                    for (Node<E> q = p; q != null; q = succ(q))
970 +                        if (q.item != null && ++len == batchSize)
971 +                            break;
972 +                    es = new Object[len];
973 +                }
974 +                for (n = 0; p != null && n < len; p = succ(p))
975 +                    if ((es[n] = p.item) != null)
976 +                        n++;
977 +            } finally {
978 +                fullyUnlock();
979 +            }
980 +            for (int i = 0; i < n; i++) {
981 +                @SuppressWarnings("unchecked") E e = (E) es[i];
982 +                action.accept(e);
983 +            }
984 +        } while (n > 0 && p != null);
985 +    }
986 +
987 +    /**
988       * Saves this queue to a stream (that is, serializes it).
989       *
990       * @param s the stream

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines