ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingDeque.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedBlockingDeque.java (file contents):
Revision 1.35 by jsr166, Sun Nov 18 18:03:11 2012 UTC vs.
Revision 1.36 by dl, Sun Feb 17 23:36:34 2013 UTC

# Line 8 | Line 8 | package java.util.concurrent;
8  
9   import java.util.AbstractQueue;
10   import java.util.Collection;
11 + import java.util.Collections;
12   import java.util.Iterator;
13   import java.util.NoSuchElementException;
14   import java.util.concurrent.locks.Condition;
15   import java.util.concurrent.locks.ReentrantLock;
16 + import java.util.Spliterator;
17 + import java.util.stream.Stream;
18 + import java.util.stream.Streams;
19 + import java.util.function.Consumer;
20  
21   /**
22   * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on
# Line 1114 | Line 1119 | public class LinkedBlockingDeque<E>
1119      private class Itr extends AbstractItr {
1120          Node<E> firstNode() { return first; }
1121          Node<E> nextNode(Node<E> n) { return n.next; }
1122 +        // minimal, unsplittable Spliterator implementation
1123 +        public boolean tryAdvance(Consumer<? super E> action) {
1124 +            if (hasNext()) {
1125 +                action.accept(next());
1126 +                return true;
1127 +            }
1128 +            return false;
1129 +        }
1130 +        public void forEach(Consumer<? super E> action) {
1131 +            while (hasNext())
1132 +                action.accept(next());
1133 +        }
1134 +        public int characteristics() {
1135 +            return Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT;
1136 +        }
1137      }
1138  
1139      /** Descending iterator */
# Line 1122 | Line 1142 | public class LinkedBlockingDeque<E>
1142          Node<E> nextNode(Node<E> n) { return n.prev; }
1143      }
1144  
1145 +    static final class LBDSpliterator<E> implements Spliterator<E> {
1146 +        // Similar idea to ConcurrentLinkedQueue spliterator
1147 +        static final int MAX_BATCH = 1 << 11;  // saturate batch size
1148 +        final LinkedBlockingDeque<E> queue;
1149 +        Node<E> current;    // current node; null until initialized
1150 +        int batch;          // batch size for splits
1151 +        boolean exhausted;  // true when no more nodes
1152 +        long est;           // size estimate
1153 +        LBDSpliterator(LinkedBlockingDeque<E> queue) {
1154 +            this.queue = queue;
1155 +            this.est = queue.size();
1156 +        }
1157 +
1158 +        public long estimateSize() { return est; }
1159 +
1160 +        public Spliterator<E> trySplit() {
1161 +            int n;
1162 +            final LinkedBlockingDeque<E> q = this.queue;
1163 +            final ReentrantLock lock = q.lock;
1164 +            if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) {
1165 +                Object[] a = new Object[batch = n];
1166 +                int i = 0;
1167 +                Node<E> p = current;
1168 +                lock.lock();
1169 +                try {
1170 +                    if (p != null || (p = q.first) != null) {
1171 +                        do {
1172 +                            if ((a[i] = p.item) != null)
1173 +                                ++i;
1174 +                        } while ((p = p.next) != null && i < n);
1175 +                    }
1176 +                } finally {
1177 +                    lock.unlock();
1178 +                }
1179 +                if ((current = p) == null) {
1180 +                    est = 0L;
1181 +                    exhausted = true;
1182 +                }
1183 +                else if ((est -= i) <= 0L)
1184 +                    est = 1L;
1185 +                return Collections.arraySnapshotSpliterator
1186 +                    (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
1187 +                     Spliterator.CONCURRENT);
1188 +            }
1189 +            return null;
1190 +        }
1191 +
1192 +        public void forEach(Consumer<? super E> action) {
1193 +            if (action == null) throw new NullPointerException();
1194 +            final LinkedBlockingDeque<E> q = this.queue;
1195 +            final ReentrantLock lock = q.lock;
1196 +            if (!exhausted) {
1197 +                exhausted = true;
1198 +                Node<E> p = current;
1199 +                do {
1200 +                    E e = null;
1201 +                    lock.lock();
1202 +                    try {
1203 +                        if (p == null)
1204 +                            p = q.first;
1205 +                        while (p != null) {
1206 +                            e = p.item;
1207 +                            p = p.next;
1208 +                            if (e != null)
1209 +                                break;
1210 +                        }
1211 +                    } finally {
1212 +                        lock.unlock();
1213 +                    }
1214 +                    if (e != null)
1215 +                        action.accept(e);
1216 +                } while (p != null);
1217 +            }
1218 +        }
1219 +
1220 +        public boolean tryAdvance(Consumer<? super E> action) {
1221 +            if (action == null) throw new NullPointerException();
1222 +            final LinkedBlockingDeque<E> q = this.queue;
1223 +            final ReentrantLock lock = q.lock;
1224 +            if (!exhausted) {
1225 +                E e = null;
1226 +                lock.lock();
1227 +                try {
1228 +                    if (current == null)
1229 +                        current = q.first;
1230 +                    while (current != null) {
1231 +                        e = current.item;
1232 +                        current = current.next;
1233 +                        if (e != null)
1234 +                            break;
1235 +                    }
1236 +                } finally {
1237 +                    lock.unlock();
1238 +                }
1239 +                if (e != null) {
1240 +                    action.accept(e);
1241 +                    return true;
1242 +                }
1243 +                exhausted = true;
1244 +            }
1245 +            return false;
1246 +        }
1247 +
1248 +        public int characteristics() {
1249 +            return Spliterator.ORDERED | Spliterator.NONNULL |
1250 +                Spliterator.CONCURRENT;
1251 +        }
1252 +    }
1253 +
1254 +    Spliterator<E> spliterator() {
1255 +        return new LBDSpliterator<E>(this);
1256 +    }
1257 +    public Stream<E> stream() {
1258 +        return Streams.stream(spliterator());
1259 +    }
1260 +
1261 +    public Stream<E> parallelStream() {
1262 +        return Streams.parallelStream(spliterator());
1263 +    }
1264 +
1265      /**
1266       * Saves this deque to a stream (that is, serializes it).
1267       *

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines