ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingDeque.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/LinkedBlockingDeque.java (file contents):
Revision 1.66 by jsr166, Sun Dec 11 19:59:51 2016 UTC vs.
Revision 1.67 by jsr166, Tue Dec 13 18:55:57 2016 UTC

# Line 10 | Line 10 | import java.util.AbstractQueue;
10   import java.util.Collection;
11   import java.util.Iterator;
12   import java.util.NoSuchElementException;
13 + import java.util.Objects;
14   import java.util.Spliterator;
15   import java.util.Spliterators;
16   import java.util.concurrent.locks.Condition;
# Line 729 | Line 730 | public class LinkedBlockingDeque<E>
730       * @throws IllegalArgumentException      {@inheritDoc}
731       */
732      public int drainTo(Collection<? super E> c, int maxElements) {
733 <        if (c == null)
733 <            throw new NullPointerException();
733 >        Objects.requireNonNull(c);
734          if (c == this)
735              throw new IllegalArgumentException();
736          if (maxElements <= 0)
# Line 981 | Line 981 | public class LinkedBlockingDeque<E>
981      }
982  
983      /**
984 +     * Used for any element traversal that is not entirely under lock.
985 +     * Such traversals must handle both:
986 +     * - dequeued nodes (p.next == p)
987 +     * - (possibly multiple) interior removed nodes (p.item == null)
988 +     */
989 +    Node<E> succ(Node<E> p) {
990 +        return (p == (p = p.next)) ? first : p;
991 +    }
992 +
993 +    /**
994       * Returns an iterator over the elements in this deque in proper sequence.
995       * The elements will be returned in order from first (head) to last (tail).
996       *
# Line 1019 | Line 1029 | public class LinkedBlockingDeque<E>
1029          /**
1030           * nextItem holds on to item fields because once we claim that
1031           * an element exists in hasNext(), we must return item read
1032 <         * under lock (in advance()) even if it was in the process of
1033 <         * being removed when hasNext() was called.
1032 >         * under lock even if it was in the process of being removed
1033 >         * when hasNext() was called.
1034           */
1035          E nextItem;
1036  
# Line 1050 | Line 1060 | public class LinkedBlockingDeque<E>
1060              }
1061          }
1062  
1063 <        /**
1064 <         * Advances next.
1065 <         */
1066 <        void advance() {
1067 <            // assert next != null;
1063 >        public boolean hasNext() {
1064 >            return next != null;
1065 >        }
1066 >
1067 >        public E next() {
1068 >            Node<E> p;
1069 >            if ((p = next) == null)
1070 >                throw new NoSuchElementException();
1071 >            E ret = nextItem, e = null;
1072 >            lastRet = p;
1073              final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1074              lock.lock();
1075              try {
1076 <                // Chains of deleted nodes ending in null or self-links
1077 <                // are possible if multiple interior nodes are removed.
1063 <                for (next = nextNode(next);; next = succ(next)) {
1064 <                    if (next == null) {
1065 <                        nextItem = null;
1066 <                        break;
1067 <                    } else if ((nextItem = next.item) != null)
1076 >                for (p = nextNode(p); p != null; p = succ(p))
1077 >                    if ((e = p.item) != null)
1078                          break;
1069                }
1079              } finally {
1080                  // checkInvariants();
1081                  lock.unlock();
1082              }
1083 +            next = p;
1084 +            nextItem = e;
1085 +            return ret;
1086          }
1087  
1088 <        public boolean hasNext() {
1089 <            return next != null;
1090 <        }
1091 <
1092 <        public E next() {
1081 <            if (next == null)
1082 <                throw new NoSuchElementException();
1088 >        public void forEachRemaining(Consumer<? super E> action) {
1089 >            // A variant of forEachFrom
1090 >            Objects.requireNonNull(action);
1091 >            Node<E> p;
1092 >            if ((p = next) == null) return;
1093              lastRet = next;
1094 <            E x = nextItem;
1095 <            advance();
1096 <            return x;
1094 >            next = null;
1095 >            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1096 >            final int batchSize = 32;
1097 >            Object[] es = null;
1098 >            int n, len = 1;
1099 >            do {
1100 >                lock.lock();
1101 >                try {
1102 >                    if (es == null) {
1103 >                        p = nextNode(p);
1104 >                        for (Node<E> q = p; q != null; q = succ(q))
1105 >                            if (q.item != null && ++len == batchSize)
1106 >                                break;
1107 >                        es = new Object[len];
1108 >                        es[0] = nextItem;
1109 >                        nextItem = null;
1110 >                        n = 1;
1111 >                    } else
1112 >                        n = 0;
1113 >                    for (; p != null && n < len; p = succ(p))
1114 >                        if ((es[n] = p.item) != null) {
1115 >                            lastRet = p;
1116 >                            n++;
1117 >                        }
1118 >                } finally {
1119 >                    // checkInvariants();
1120 >                    lock.unlock();
1121 >                }
1122 >                for (int i = 0; i < n; i++) {
1123 >                    @SuppressWarnings("unchecked") E e = (E) es[i];
1124 >                    action.accept(e);
1125 >                }
1126 >            } while (n > 0 && p != null);
1127          }
1128  
1129          public void remove() {
# Line 1128 | Line 1168 | public class LinkedBlockingDeque<E>
1168  
1169          LBDSpliterator() {}
1170  
1131        private Node<E> succ(Node<E> p) {
1132            return (p == (p = p.next)) ? first : p;
1133        }
1134
1171          public long estimateSize() { return est; }
1172  
1173          public Spliterator<E> trySplit() {
# Line 1172 | Line 1208 | public class LinkedBlockingDeque<E>
1208              return null;
1209          }
1210  
1175        public void forEachRemaining(Consumer<? super E> action) {
1176            if (action == null) throw new NullPointerException();
1177            if (!exhausted) {
1178                exhausted = true;
1179                final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1180                Node<E> p = current;
1181                current = null;
1182                do {
1183                    E e = null;
1184                    lock.lock();
1185                    try {
1186                        if (p != null || (p = first) != null)
1187                            do {
1188                                e = p.item;
1189                                p = succ(p);
1190                            } while (e == null && p != null);
1191                    } finally {
1192                        // checkInvariants();
1193                        lock.unlock();
1194                    }
1195                    if (e != null)
1196                        action.accept(e);
1197                } while (p != null);
1198            }
1199        }
1200
1211          public boolean tryAdvance(Consumer<? super E> action) {
1212 <            if (action == null) throw new NullPointerException();
1212 >            Objects.requireNonNull(action);
1213              if (!exhausted) {
1214                  final ReentrantLock lock = LinkedBlockingDeque.this.lock;
1215                  Node<E> p = current;
# Line 1224 | Line 1234 | public class LinkedBlockingDeque<E>
1234              return false;
1235          }
1236  
1237 +        public void forEachRemaining(Consumer<? super E> action) {
1238 +            Objects.requireNonNull(action);
1239 +            if (!exhausted) {
1240 +                exhausted = true;
1241 +                Node<E> p = current;
1242 +                current = null;
1243 +                forEachFrom(action, p);
1244 +            }
1245 +        }
1246 +
1247          public int characteristics() {
1248              return (Spliterator.ORDERED |
1249                      Spliterator.NONNULL |
# Line 1252 | Line 1272 | public class LinkedBlockingDeque<E>
1272      }
1273  
1274      /**
1275 +     * @throws NullPointerException {@inheritDoc}
1276 +     */
1277 +    public void forEach(Consumer<? super E> action) {
1278 +        Objects.requireNonNull(action);
1279 +        forEachFrom(action, null);
1280 +    }
1281 +
1282 +    /**
1283 +     * Runs action on each element found during a traversal starting at p.
1284 +     * If p is null, traversal starts at head.
1285 +     */
1286 +    void forEachFrom(Consumer<? super E> action, Node<E> p) {
1287 +        // Extract batches of elements while holding the lock; then
1288 +        // run the action on the elements while not
1289 +        final ReentrantLock lock = this.lock;
1290 +        final int batchSize = 32;       // max number of elements per batch
1291 +        Object[] es = null;             // container for batch of elements
1292 +        int n, len = 0;
1293 +        do {
1294 +            lock.lock();
1295 +            try {
1296 +                if (es == null) {
1297 +                    if (p == null) p = first;
1298 +                    for (Node<E> q = p; q != null; q = succ(q))
1299 +                        if (q.item != null && ++len == batchSize)
1300 +                            break;
1301 +                    es = new Object[len];
1302 +                }
1303 +                for (n = 0; p != null && n < len; p = succ(p))
1304 +                    if ((es[n] = p.item) != null)
1305 +                        n++;
1306 +            } finally {
1307 +                // checkInvariants();
1308 +                lock.unlock();
1309 +            }
1310 +            for (int i = 0; i < n; i++) {
1311 +                @SuppressWarnings("unchecked") E e = (E) es[i];
1312 +                action.accept(e);
1313 +            }
1314 +        } while (n > 0 && p != null);
1315 +    }
1316 +
1317 +    /**
1318       * Saves this deque to a stream (that is, serializes it).
1319       *
1320       * @param s the stream
# Line 1292 | Line 1355 | public class LinkedBlockingDeque<E>
1355          last = null;
1356          // Read in all elements and place in queue
1357          for (;;) {
1358 <            @SuppressWarnings("unchecked")
1296 <            E item = (E)s.readObject();
1358 >            @SuppressWarnings("unchecked") E item = (E)s.readObject();
1359              if (item == null)
1360                  break;
1361              add(item);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines