ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java (file contents):
Revision 1.73 by jsr166, Tue Feb 5 19:54:06 2013 UTC vs.
Revision 1.74 by dl, Sun Feb 17 23:36:34 2013 UTC

# Line 11 | Line 11 | import java.util.concurrent.locks.Condit
11   import java.util.concurrent.locks.ReentrantLock;
12   import java.util.AbstractQueue;
13   import java.util.Collection;
14 + import java.util.Collections;
15   import java.util.Iterator;
16   import java.util.NoSuchElementException;
17 + import java.util.Spliterator;
18 + import java.util.stream.Stream;
19 + import java.util.stream.Streams;
20 + import java.util.function.Consumer;
21  
22   /**
23   * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
# Line 827 | Line 832 | public class LinkedBlockingQueue<E> exte
832          }
833      }
834  
835 +    static final class LBQSpliterator<E> implements Spliterator<E> {
836 +        // Similar idea to ConcurrentLinkedQueue spliterator
837 +        static final int MAX_BATCH = 1 << 11;  // saturate batch size
838 +        final LinkedBlockingQueue<E> queue;
839 +        Node<E> current;    // current node; null until initialized
840 +        int batch;          // batch size for splits
841 +        boolean exhausted;  // true when no more nodes
842 +        long est;           // size estimate
843 +        LBQSpliterator(LinkedBlockingQueue<E> queue) {
844 +            this.queue = queue;
845 +            this.est = queue.size();
846 +        }
847 +
848 +        public long estimateSize() { return est; }
849 +
850 +        public Spliterator<E> trySplit() {
851 +            int n;
852 +            final LinkedBlockingQueue<E> q = this.queue;
853 +            if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) {
854 +                Object[] a = new Object[batch = n];
855 +                int i = 0;
856 +                Node<E> p = current;
857 +                q.fullyLock();
858 +                try {
859 +                    if (p != null || (p = q.head.next) != null) {
860 +                        do {
861 +                            if ((a[i] = p.item) != null)
862 +                                ++i;
863 +                        } while ((p = p.next) != null && i < n);
864 +                    }
865 +                } finally {
866 +                    q.fullyUnlock();
867 +                }
868 +                if ((current = p) == null) {
869 +                    est = 0L;
870 +                    exhausted = true;
871 +                }
872 +                else if ((est -= i) <= 0L)
873 +                    est = 1L;
874 +                return Collections.arraySnapshotSpliterator
875 +                    (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
876 +                     Spliterator.CONCURRENT);
877 +            }
878 +            return null;
879 +        }
880 +
881 +        public void forEach(Consumer<? super E> action) {
882 +            if (action == null) throw new NullPointerException();
883 +            final LinkedBlockingQueue<E> q = this.queue;
884 +            if (!exhausted) {
885 +                exhausted = true;
886 +                Node<E> p = current;
887 +                do {
888 +                    E e = null;
889 +                    q.fullyLock();
890 +                    try {
891 +                        if (p == null)
892 +                            p = q.head.next;
893 +                        while (p != null) {
894 +                            e = p.item;
895 +                            p = p.next;
896 +                            if (e != null)
897 +                                break;
898 +                        }
899 +                    } finally {
900 +                        q.fullyUnlock();
901 +                    }
902 +                    if (e != null)
903 +                        action.accept(e);
904 +                } while (p != null);
905 +            }
906 +        }
907 +
908 +        public boolean tryAdvance(Consumer<? super E> action) {
909 +            if (action == null) throw new NullPointerException();
910 +            final LinkedBlockingQueue<E> q = this.queue;
911 +            if (!exhausted) {
912 +                E e = null;
913 +                q.fullyLock();
914 +                try {
915 +                    if (current == null)
916 +                        current = q.head.next;
917 +                    while (current != null) {
918 +                        e = current.item;
919 +                        current = current.next;
920 +                        if (e != null)
921 +                            break;
922 +                    }
923 +                } finally {
924 +                    q.fullyUnlock();
925 +                }
926 +                if (e != null) {
927 +                    action.accept(e);
928 +                    return true;
929 +                }
930 +                exhausted = true;
931 +            }
932 +            return false;
933 +        }
934 +
935 +        public int characteristics() {
936 +            return Spliterator.ORDERED | Spliterator.NONNULL |
937 +                Spliterator.CONCURRENT;
938 +        }
939 +    }
940 +
941 +    Spliterator<E> spliterator() {
942 +        return new LBQSpliterator<E>(this);
943 +    }
944 +
945 +    public Stream<E> stream() {
946 +        return Streams.stream(spliterator());
947 +    }
948 +
949 +    public Stream<E> parallelStream() {
950 +        return Streams.parallelStream(spliterator());
951 +    }
952 +
953      /**
954       * Saves this queue to a stream (that is, serializes it).
955       *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines