[cvs] / jsr166 / src / main / java / util / concurrent / LinkedTransferQueue.java Repository:
ViewVC logotype

Diff of /jsr166/src/main/java/util/concurrent/LinkedTransferQueue.java

Parent Directory Parent Directory | Revision Log Revision Log | View Patch Patch

revision 1.51, Mon Dec 19 19:58:00 2011 UTC revision 1.52, Sun Feb 17 23:36:34 2013 UTC
# Line 7  Line 7 
7  package java.util.concurrent;  package java.util.concurrent;
8    
9  import java.util.AbstractQueue;  import java.util.AbstractQueue;
10    import java.util.Arrays;
11  import java.util.Collection;  import java.util.Collection;
12    import java.util.Collections;
13  import java.util.Iterator;  import java.util.Iterator;
14  import java.util.NoSuchElementException;  import java.util.NoSuchElementException;
15  import java.util.Queue;  import java.util.Queue;
16  import java.util.concurrent.TimeUnit;  import java.util.concurrent.TimeUnit;
17  import java.util.concurrent.locks.LockSupport;  import java.util.concurrent.locks.LockSupport;
18    import java.util.Spliterator;
19    import java.util.stream.Stream;
20    import java.util.stream.Streams;
21    import java.util.function.Consumer;
22    
23  /**  /**
24   * An unbounded {@link TransferQueue} based on linked nodes.   * An unbounded {@link TransferQueue} based on linked nodes.
# Line 748  Line 754 
754      }      }
755    
756      /**      /**
757         * Version of firstOfMode used by Spliterator
758         */
759        final Node firstDataNode() {
760            for (Node p = head; p != null;) {
761                Object item = p.item;
762                if (p.isData) {
763                    if (item != null && item != p)
764                        return p;
765                }
766                else if (item == null)
767                    break;
768                if (p == (p = p.next))
769                    p = head;
770            }
771            return null;
772        }
773    
774        /**
775       * Returns the item in the first unmatched node with isData; or       * Returns the item in the first unmatched node with isData; or
776       * null if none.  Used by peek.       * null if none.  Used by peek.
777       */       */
# Line 881  Line 905 
905          }          }
906      }      }
907    
908        // Very similar to ConcurrentLinkedQueue spliterator
909        static final class LTQSpliterator<E> implements Spliterator<E> {
910            static final int MAX_BATCH = 1 << 10;  // saturate batch size
911            final LinkedTransferQueue<E> queue;
912            Node current;    // current node; null until initialized
913            int batch;          // batch size for splits
914            boolean exhausted;  // true when no more nodes
915            LTQSpliterator(LinkedTransferQueue<E> queue) {
916                this.queue = queue;
917            }
918    
919            /*
920             * Split into arrays of arithmetically increasing batch sizes,
921             * giving up at MAX_BATCH.  Treat the result as a
922             * CopyOnWriteArrayList array snapshot.  This will only
923             * improve parallel performance if per-element forEach actions
924             * are more costly than transfering them into an array. If
925             * not, we limit slowdowns by eventually returning null split.
926             */
927            public Spliterator<E> trySplit() {
928                Node p; int n;
929                final LinkedTransferQueue<E> q = this.queue;
930                if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH &&
931                    ((p = current) != null || (p = q.firstDataNode()) != null)) {
932                    Object[] a = new Object[batch = n];
933                    int i = 0;
934                    do {
935                        if ((a[i] = p.item) != null)
936                            ++i;
937                        if (p == (p = p.next))
938                            p = q.firstDataNode();
939                    } while (p != null && i < n);
940                    if ((current = p) == null)
941                        exhausted = true;
942                    return Collections.arraySnapshotSpliterator
943                        (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
944                         Spliterator.CONCURRENT);
945                }
946                return null;
947            }
948    
949            @SuppressWarnings("unchecked")
950            public void forEach(Consumer<? super E> action) {
951                Node p;
952                if (action == null) throw new NullPointerException();
953                final LinkedTransferQueue<E> q = this.queue;
954                if (!exhausted &&
955                    ((p = current) != null || (p = q.firstDataNode()) != null)) {
956                    exhausted = true;
957                    do {
958                        Object e = p.item;
959                        if (p == (p = p.next))
960                            p = q.firstDataNode();
961                        if (e != null)
962                            action.accept((E)e);
963                    } while (p != null);
964                }
965            }
966    
967            @SuppressWarnings("unchecked")
968            public boolean tryAdvance(Consumer<? super E> action) {
969                Node p;
970                if (action == null) throw new NullPointerException();
971                final LinkedTransferQueue<E> q = this.queue;
972                if (!exhausted &&
973                    ((p = current) != null || (p = q.firstDataNode()) != null)) {
974                    Object e;
975                    do {
976                        e = p.item;
977                        if (p == (p = p.next))
978                            p = q.firstDataNode();
979                    } while (e == null && p != null);
980                    if ((current = p) == null)
981                        exhausted = true;
982                    if (e != null) {
983                        action.accept((E)e);
984                        return true;
985                    }
986                }
987                return false;
988            }
989    
990            public int characteristics() {
991                return Spliterator.ORDERED | Spliterator.NONNULL |
992                    Spliterator.CONCURRENT;
993            }
994        }
995    
996    
997        Spliterator<E> spliterator() {
998            return new LTQSpliterator<E>(this);
999        }
1000    
1001        public Stream<E> stream() {
1002            return Streams.stream(spliterator());
1003        }
1004    
1005        public Stream<E> parallelStream() {
1006            return Streams.parallelStream(spliterator());
1007        }
1008    
1009      /* -------------- Removal methods -------------- */      /* -------------- Removal methods -------------- */
1010    
1011      /**      /**

Legend:
Removed from v.1.51  
changed lines
  Added in v.1.52

Doug Lea
ViewVC Help
Powered by ViewVC 1.0.8