ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java (file contents):
Revision 1.51 by jsr166, Mon Dec 19 19:58:00 2011 UTC vs.
Revision 1.52 by dl, Sun Feb 17 23:36:34 2013 UTC

# Line 7 | Line 7
7   package java.util.concurrent;
8  
9   import java.util.AbstractQueue;
10 + import java.util.Arrays;
11   import java.util.Collection;
12 + import java.util.Collections;
13   import java.util.Iterator;
14   import java.util.NoSuchElementException;
15   import java.util.Queue;
16   import java.util.concurrent.TimeUnit;
17   import java.util.concurrent.locks.LockSupport;
18 + import java.util.Spliterator;
19 + import java.util.stream.Stream;
20 + import java.util.stream.Streams;
21 + import java.util.function.Consumer;
22  
23   /**
24   * An unbounded {@link TransferQueue} based on linked nodes.
# Line 748 | Line 754 | public class LinkedTransferQueue<E> exte
754      }
755  
756      /**
757 +     * Version of firstOfMode used by Spliterator
758 +     */
759 +    final Node firstDataNode() {
760 +        for (Node p = head; p != null;) {
761 +            Object item = p.item;
762 +            if (p.isData) {
763 +                if (item != null && item != p)
764 +                    return p;
765 +            }
766 +            else if (item == null)
767 +                break;
768 +            if (p == (p = p.next))
769 +                p = head;
770 +        }
771 +        return null;
772 +    }
773 +
774 +    /**
775       * Returns the item in the first unmatched node with isData; or
776       * null if none.  Used by peek.
777       */
# Line 880 | Line 904 | public class LinkedTransferQueue<E> exte
904                  unsplice(lastPred, lastRet);
905          }
906      }
907 +    
908 +    // Very similar to ConcurrentLinkedQueue spliterator
909 +    static final class LTQSpliterator<E> implements Spliterator<E> {
910 +        static final int MAX_BATCH = 1 << 10;  // saturate batch size
911 +        final LinkedTransferQueue<E> queue;
912 +        Node current;    // current node; null until initialized
913 +        int batch;          // batch size for splits
914 +        boolean exhausted;  // true when no more nodes
915 +        LTQSpliterator(LinkedTransferQueue<E> queue) {
916 +            this.queue = queue;
917 +        }
918 +
919 +        /*
920 +         * Split into arrays of arithmetically increasing batch sizes,
921 +         * giving up at MAX_BATCH.  Treat the result as a
922 +         * CopyOnWriteArrayList array snapshot.  This will only
923 +         * improve parallel performance if per-element forEach actions
924 +         * are more costly than transfering them into an array. If
925 +         * not, we limit slowdowns by eventually returning null split.
926 +         */
927 +        public Spliterator<E> trySplit() {
928 +            Node p; int n;
929 +            final LinkedTransferQueue<E> q = this.queue;
930 +            if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH &&
931 +                ((p = current) != null || (p = q.firstDataNode()) != null)) {
932 +                Object[] a = new Object[batch = n];
933 +                int i = 0;
934 +                do {
935 +                    if ((a[i] = p.item) != null)
936 +                        ++i;
937 +                    if (p == (p = p.next))
938 +                        p = q.firstDataNode();
939 +                } while (p != null && i < n);
940 +                if ((current = p) == null)
941 +                    exhausted = true;
942 +                return Collections.arraySnapshotSpliterator
943 +                    (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
944 +                     Spliterator.CONCURRENT);
945 +            }
946 +            return null;
947 +        }
948 +
949 +        @SuppressWarnings("unchecked")
950 +        public void forEach(Consumer<? super E> action) {
951 +            Node p;
952 +            if (action == null) throw new NullPointerException();
953 +            final LinkedTransferQueue<E> q = this.queue;
954 +            if (!exhausted &&
955 +                ((p = current) != null || (p = q.firstDataNode()) != null)) {
956 +                exhausted = true;
957 +                do {
958 +                    Object e = p.item;
959 +                    if (p == (p = p.next))
960 +                        p = q.firstDataNode();
961 +                    if (e != null)
962 +                        action.accept((E)e);
963 +                } while (p != null);
964 +            }
965 +        }
966 +
967 +        @SuppressWarnings("unchecked")
968 +        public boolean tryAdvance(Consumer<? super E> action) {
969 +            Node p;
970 +            if (action == null) throw new NullPointerException();
971 +            final LinkedTransferQueue<E> q = this.queue;
972 +            if (!exhausted &&
973 +                ((p = current) != null || (p = q.firstDataNode()) != null)) {
974 +                Object e;
975 +                do {
976 +                    e = p.item;
977 +                    if (p == (p = p.next))
978 +                        p = q.firstDataNode();
979 +                } while (e == null && p != null);
980 +                if ((current = p) == null)
981 +                    exhausted = true;
982 +                if (e != null) {
983 +                    action.accept((E)e);
984 +                    return true;
985 +                }
986 +            }
987 +            return false;
988 +        }
989 +
990 +        public int characteristics() {
991 +            return Spliterator.ORDERED | Spliterator.NONNULL |
992 +                Spliterator.CONCURRENT;
993 +        }
994 +    }
995 +
996 +
997 +    Spliterator<E> spliterator() {
998 +        return new LTQSpliterator<E>(this);
999 +    }
1000 +
1001 +    public Stream<E> stream() {
1002 +        return Streams.stream(spliterator());
1003 +    }
1004 +
1005 +    public Stream<E> parallelStream() {
1006 +        return Streams.parallelStream(spliterator());
1007 +    }
1008  
1009      /* -------------- Removal methods -------------- */
1010  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines