ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java
(Generate patch)

Comparing jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java (file contents):
Revision 1.1 by jsr166, Sat Mar 26 06:22:50 2016 UTC vs.
Revision 1.2 by jsr166, Sat Dec 17 21:56:54 2016 UTC

# Line 10 | Line 10 | import java.util.AbstractQueue;
10   import java.util.Collection;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13 + import java.util.Objects;
14   import java.util.Spliterator;
15   import java.util.Spliterators;
16   import java.util.concurrent.atomic.AtomicInteger;
# Line 205 | Line 206 | public class LinkedBlockingQueue<E> exte
206          putLock.unlock();
207      }
208  
208 //     /**
209 //      * Tells whether both locks are held by current thread.
210 //      */
211 //     boolean isFullyLocked() {
212 //         return (putLock.isHeldByCurrentThread() &&
213 //                 takeLock.isHeldByCurrentThread());
214 //     }
215
209      /**
210       * Creates a {@code LinkedBlockingQueue} with a capacity of
211       * {@link Integer#MAX_VALUE}.
# Line 488 | Line 481 | public class LinkedBlockingQueue<E> exte
481       * Unlinks interior Node p with predecessor trail.
482       */
483      void unlink(Node<E> p, Node<E> trail) {
484 <        // assert isFullyLocked();
484 >        // assert putLock.isHeldByCurrentThread();
485 >        // assert takeLock.isHeldByCurrentThread();
486          // p.next is not changed, to allow iterators that are
487          // traversing p to maintain their weak-consistency guarantee.
488          p.item = null;
# Line 672 | Line 666 | public class LinkedBlockingQueue<E> exte
666       * @throws IllegalArgumentException      {@inheritDoc}
667       */
668      public int drainTo(Collection<? super E> c, int maxElements) {
669 <        if (c == null)
676 <            throw new NullPointerException();
669 >        Objects.requireNonNull(c);
670          if (c == this)
671              throw new IllegalArgumentException();
672          if (maxElements <= 0)
# Line 712 | Line 705 | public class LinkedBlockingQueue<E> exte
705      }
706  
707      /**
708 +     * Used for any element traversal that is not entirely under lock.
709 +     * Such traversals must handle both:
710 +     * - dequeued nodes (p.next == p)
711 +     * - (possibly multiple) interior removed nodes (p.item == null)
712 +     */
713 +    Node<E> succ(Node<E> p) {
714 +        return (p == (p = p.next)) ? head.next : p;
715 +    }
716 +
717 +    /**
718       * Returns an iterator over the elements in this queue in proper sequence.
719       * The elements will be returned in order from first (head) to last (tail).
720       *
# Line 738 | Line 741 | public class LinkedBlockingQueue<E> exte
741          Itr() {
742              fullyLock();
743              try {
744 <                current = head.next;
742 <                if (current != null)
744 >                if ((current = head.next) != null)
745                      currentElement = current.item;
746              } finally {
747                  fullyUnlock();
# Line 751 | Line 753 | public class LinkedBlockingQueue<E> exte
753          }
754  
755          public E next() {
756 +            Node<E> p;
757 +            if ((p = current) == null)
758 +                throw new NoSuchElementException();
759 +            E ret = currentElement, e = null;
760 +            lastRet = p;
761              fullyLock();
762              try {
763 <                if (current == null)
764 <                    throw new NoSuchElementException();
765 <                lastRet = current;
759 <                E item = null;
760 <                // Unlike other traversal methods, iterators must handle both:
761 <                // - dequeued nodes (p.next == p)
762 <                // - (possibly multiple) interior removed nodes (p.item == null)
763 <                for (Node<E> p = current, q;; p = q) {
764 <                    if ((q = p.next) == p)
765 <                        q = head.next;
766 <                    if (q == null || (item = q.item) != null) {
767 <                        current = q;
768 <                        E x = currentElement;
769 <                        currentElement = item;
770 <                        return x;
771 <                    }
772 <                }
763 >                for (p = p.next; p != null; p = succ(p))
764 >                    if ((e = p.item) != null)
765 >                        break;
766              } finally {
767                  fullyUnlock();
768              }
769 +            current = p;
770 +            currentElement = e;
771 +            return ret;
772 +        }
773 +
774 +        public void forEachRemaining(Consumer<? super E> action) {
775 +            // A variant of forEachFrom
776 +            Objects.requireNonNull(action);
777 +            Node<E> p;
778 +            if ((p = current) == null) return;
779 +            lastRet = current;
780 +            current = null;
781 +            final int batchSize = 32;
782 +            Object[] es = null;
783 +            int n, len = 1;
784 +            do {
785 +                fullyLock();
786 +                try {
787 +                    if (es == null) {
788 +                        p = p.next;
789 +                        for (Node<E> q = p; q != null; q = succ(q))
790 +                            if (q.item != null && ++len == batchSize)
791 +                                break;
792 +                        es = new Object[len];
793 +                        es[0] = currentElement;
794 +                        currentElement = null;
795 +                        n = 1;
796 +                    } else
797 +                        n = 0;
798 +                    for (; p != null && n < len; p = succ(p))
799 +                        if ((es[n] = p.item) != null) {
800 +                            lastRet = p;
801 +                            n++;
802 +                        }
803 +                } finally {
804 +                    fullyUnlock();
805 +                }
806 +                for (int i = 0; i < n; i++) {
807 +                    @SuppressWarnings("unchecked") E e = (E) es[i];
808 +                    action.accept(e);
809 +                }
810 +            } while (n > 0 && p != null);
811          }
812  
813          public void remove() {
# Line 796 | Line 831 | public class LinkedBlockingQueue<E> exte
831          }
832      }
833  
834 <    /** A customized variant of Spliterators.IteratorSpliterator */
835 <    static final class LBQSpliterator<E> implements Spliterator<E> {
834 >    /**
835 >     * A customized variant of Spliterators.IteratorSpliterator.
836 >     * Keep this class in sync with (very similar) LBDSpliterator.
837 >     */
838 >    private final class LBQSpliterator implements Spliterator<E> {
839          static final int MAX_BATCH = 1 << 25;  // max batch array size;
802        final LinkedBlockingQueue<E> queue;
840          Node<E> current;    // current node; null until initialized
841          int batch;          // batch size for splits
842          boolean exhausted;  // true when no more nodes
843 <        long est;           // size estimate
844 <        LBQSpliterator(LinkedBlockingQueue<E> queue) {
845 <            this.queue = queue;
809 <            this.est = queue.size();
810 <        }
843 >        long est = size();  // size estimate
844 >
845 >        LBQSpliterator() {}
846  
847          public long estimateSize() { return est; }
848  
849          public Spliterator<E> trySplit() {
850              Node<E> h;
816            final LinkedBlockingQueue<E> q = this.queue;
851              int b = batch;
852              int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
853              if (!exhausted &&
854 <                ((h = current) != null || (h = q.head.next) != null) &&
855 <                h.next != null) {
854 >                ((h = current) != null || (h = head.next) != null)
855 >                && h.next != null) {
856                  Object[] a = new Object[n];
857                  int i = 0;
858                  Node<E> p = current;
859 <                q.fullyLock();
859 >                fullyLock();
860                  try {
861 <                    if (p != null || (p = q.head.next) != null) {
862 <                        do {
861 >                    if (p != null || (p = head.next) != null)
862 >                        for (; p != null && i < n; p = succ(p))
863                              if ((a[i] = p.item) != null)
864 <                                ++i;
831 <                        } while ((p = p.next) != null && i < n);
832 <                    }
864 >                                i++;
865                  } finally {
866 <                    q.fullyUnlock();
866 >                    fullyUnlock();
867                  }
868                  if ((current = p) == null) {
869                      est = 0L;
# Line 850 | Line 882 | public class LinkedBlockingQueue<E> exte
882              return null;
883          }
884  
853        public void forEachRemaining(Consumer<? super E> action) {
854            if (action == null) throw new NullPointerException();
855            final LinkedBlockingQueue<E> q = this.queue;
856            if (!exhausted) {
857                exhausted = true;
858                Node<E> p = current;
859                do {
860                    E e = null;
861                    q.fullyLock();
862                    try {
863                        if (p == null)
864                            p = q.head.next;
865                        while (p != null) {
866                            e = p.item;
867                            p = p.next;
868                            if (e != null)
869                                break;
870                        }
871                    } finally {
872                        q.fullyUnlock();
873                    }
874                    if (e != null)
875                        action.accept(e);
876                } while (p != null);
877            }
878        }
879
885          public boolean tryAdvance(Consumer<? super E> action) {
886 <            if (action == null) throw new NullPointerException();
882 <            final LinkedBlockingQueue<E> q = this.queue;
886 >            Objects.requireNonNull(action);
887              if (!exhausted) {
888 +                Node<E> p = current;
889                  E e = null;
890 <                q.fullyLock();
890 >                fullyLock();
891                  try {
892 <                    if (current == null)
893 <                        current = q.head.next;
894 <                    while (current != null) {
895 <                        e = current.item;
896 <                        current = current.next;
892 <                        if (e != null)
893 <                            break;
894 <                    }
892 >                    if (p != null || (p = head.next) != null)
893 >                        do {
894 >                            e = p.item;
895 >                            p = succ(p);
896 >                        } while (e == null && p != null);
897                  } finally {
898 <                    q.fullyUnlock();
898 >                    fullyUnlock();
899                  }
900 <                if (current == null)
899 <                    exhausted = true;
900 >                exhausted = ((current = p) == null);
901                  if (e != null) {
902                      action.accept(e);
903                      return true;
# Line 905 | Line 906 | public class LinkedBlockingQueue<E> exte
906              return false;
907          }
908  
909 +        public void forEachRemaining(Consumer<? super E> action) {
910 +            Objects.requireNonNull(action);
911 +            if (!exhausted) {
912 +                exhausted = true;
913 +                Node<E> p = current;
914 +                current = null;
915 +                forEachFrom(action, p);
916 +            }
917 +        }
918 +
919          public int characteristics() {
920 <            return Spliterator.ORDERED | Spliterator.NONNULL |
921 <                Spliterator.CONCURRENT;
920 >            return (Spliterator.ORDERED |
921 >                    Spliterator.NONNULL |
922 >                    Spliterator.CONCURRENT);
923          }
924      }
925  
# Line 928 | Line 940 | public class LinkedBlockingQueue<E> exte
940       * @since 1.8
941       */
942      public Spliterator<E> spliterator() {
943 <        return new LBQSpliterator<E>(this);
943 >        return new LBQSpliterator();
944 >    }
945 >
946 >    /**
947 >     * @throws NullPointerException {@inheritDoc}
948 >     */
949 >    public void forEach(Consumer<? super E> action) {
950 >        Objects.requireNonNull(action);
951 >        forEachFrom(action, null);
952 >    }
953 >
954 >    /**
955 >     * Runs action on each element found during a traversal starting at p.
956 >     * If p is null, traversal starts at head.
957 >     */
958 >    void forEachFrom(Consumer<? super E> action, Node<E> p) {
959 >        // Extract batches of elements while holding the lock; then
960 >        // run the action on the elements while not
961 >        final int batchSize = 32;       // max number of elements per batch
962 >        Object[] es = null;             // container for batch of elements
963 >        int n, len = 0;
964 >        do {
965 >            fullyLock();
966 >            try {
967 >                if (es == null) {
968 >                    if (p == null) p = head.next;
969 >                    for (Node<E> q = p; q != null; q = succ(q))
970 >                        if (q.item != null && ++len == batchSize)
971 >                            break;
972 >                    es = new Object[len];
973 >                }
974 >                for (n = 0; p != null && n < len; p = succ(p))
975 >                    if ((es[n] = p.item) != null)
976 >                        n++;
977 >            } finally {
978 >                fullyUnlock();
979 >            }
980 >            for (int i = 0; i < n; i++) {
981 >                @SuppressWarnings("unchecked") E e = (E) es[i];
982 >                action.accept(e);
983 >            }
984 >        } while (n > 0 && p != null);
985      }
986  
987      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines