--- jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java 2016/03/26 06:22:50 1.1 +++ jsr166/src/jdk8/java/util/concurrent/LinkedBlockingQueue.java 2016/12/17 21:56:54 1.2 @@ -10,6 +10,7 @@ import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Spliterator; import java.util.Spliterators; import java.util.concurrent.atomic.AtomicInteger; @@ -205,14 +206,6 @@ public class LinkedBlockingQueue exte putLock.unlock(); } -// /** -// * Tells whether both locks are held by current thread. -// */ -// boolean isFullyLocked() { -// return (putLock.isHeldByCurrentThread() && -// takeLock.isHeldByCurrentThread()); -// } - /** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. @@ -488,7 +481,8 @@ public class LinkedBlockingQueue exte * Unlinks interior Node p with predecessor trail. */ void unlink(Node p, Node trail) { - // assert isFullyLocked(); + // assert putLock.isHeldByCurrentThread(); + // assert takeLock.isHeldByCurrentThread(); // p.next is not changed, to allow iterators that are // traversing p to maintain their weak-consistency guarantee. p.item = null; @@ -672,8 +666,7 @@ public class LinkedBlockingQueue exte * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection c, int maxElements) { - if (c == null) - throw new NullPointerException(); + Objects.requireNonNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) @@ -712,6 +705,16 @@ public class LinkedBlockingQueue exte } /** + * Used for any element traversal that is not entirely under lock. + * Such traversals must handle both: + * - dequeued nodes (p.next == p) + * - (possibly multiple) interior removed nodes (p.item == null) + */ + Node succ(Node p) { + return (p == (p = p.next)) ? head.next : p; + } + + /** * Returns an iterator over the elements in this queue in proper sequence. * The elements will be returned in order from first (head) to last (tail). * @@ -738,8 +741,7 @@ public class LinkedBlockingQueue exte Itr() { fullyLock(); try { - current = head.next; - if (current != null) + if ((current = head.next) != null) currentElement = current.item; } finally { fullyUnlock(); @@ -751,28 +753,61 @@ public class LinkedBlockingQueue exte } public E next() { + Node p; + if ((p = current) == null) + throw new NoSuchElementException(); + E ret = currentElement, e = null; + lastRet = p; fullyLock(); try { - if (current == null) - throw new NoSuchElementException(); - lastRet = current; - E item = null; - // Unlike other traversal methods, iterators must handle both: - // - dequeued nodes (p.next == p) - // - (possibly multiple) interior removed nodes (p.item == null) - for (Node p = current, q;; p = q) { - if ((q = p.next) == p) - q = head.next; - if (q == null || (item = q.item) != null) { - current = q; - E x = currentElement; - currentElement = item; - return x; - } - } + for (p = p.next; p != null; p = succ(p)) + if ((e = p.item) != null) + break; } finally { fullyUnlock(); } + current = p; + currentElement = e; + return ret; + } + + public void forEachRemaining(Consumer action) { + // A variant of forEachFrom + Objects.requireNonNull(action); + Node p; + if ((p = current) == null) return; + lastRet = current; + current = null; + final int batchSize = 32; + Object[] es = null; + int n, len = 1; + do { + fullyLock(); + try { + if (es == null) { + p = p.next; + for (Node q = p; q != null; q = succ(q)) + if (q.item != null && ++len == batchSize) + break; + es = new Object[len]; + es[0] = currentElement; + currentElement = null; + n = 1; + } else + n = 0; + for (; p != null && n < len; p = succ(p)) + if ((es[n] = p.item) != null) { + lastRet = p; + n++; + } + } finally { + fullyUnlock(); + } + for (int i = 0; i < n; i++) { + @SuppressWarnings("unchecked") E e = (E) es[i]; + action.accept(e); + } + } while (n > 0 && p != null); } public void remove() { @@ -796,42 +831,39 @@ public class LinkedBlockingQueue exte } } - /** A customized variant of Spliterators.IteratorSpliterator */ - static final class LBQSpliterator implements Spliterator { + /** + * A customized variant of Spliterators.IteratorSpliterator. + * Keep this class in sync with (very similar) LBDSpliterator. + */ + private final class LBQSpliterator implements Spliterator { static final int MAX_BATCH = 1 << 25; // max batch array size; - final LinkedBlockingQueue queue; Node current; // current node; null until initialized int batch; // batch size for splits boolean exhausted; // true when no more nodes - long est; // size estimate - LBQSpliterator(LinkedBlockingQueue queue) { - this.queue = queue; - this.est = queue.size(); - } + long est = size(); // size estimate + + LBQSpliterator() {} public long estimateSize() { return est; } public Spliterator trySplit() { Node h; - final LinkedBlockingQueue q = this.queue; int b = batch; int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; if (!exhausted && - ((h = current) != null || (h = q.head.next) != null) && - h.next != null) { + ((h = current) != null || (h = head.next) != null) + && h.next != null) { Object[] a = new Object[n]; int i = 0; Node p = current; - q.fullyLock(); + fullyLock(); try { - if (p != null || (p = q.head.next) != null) { - do { + if (p != null || (p = head.next) != null) + for (; p != null && i < n; p = succ(p)) if ((a[i] = p.item) != null) - ++i; - } while ((p = p.next) != null && i < n); - } + i++; } finally { - q.fullyUnlock(); + fullyUnlock(); } if ((current = p) == null) { est = 0L; @@ -850,53 +882,22 @@ public class LinkedBlockingQueue exte return null; } - public void forEachRemaining(Consumer action) { - if (action == null) throw new NullPointerException(); - final LinkedBlockingQueue q = this.queue; - if (!exhausted) { - exhausted = true; - Node p = current; - do { - E e = null; - q.fullyLock(); - try { - if (p == null) - p = q.head.next; - while (p != null) { - e = p.item; - p = p.next; - if (e != null) - break; - } - } finally { - q.fullyUnlock(); - } - if (e != null) - action.accept(e); - } while (p != null); - } - } - public boolean tryAdvance(Consumer action) { - if (action == null) throw new NullPointerException(); - final LinkedBlockingQueue q = this.queue; + Objects.requireNonNull(action); if (!exhausted) { + Node p = current; E e = null; - q.fullyLock(); + fullyLock(); try { - if (current == null) - current = q.head.next; - while (current != null) { - e = current.item; - current = current.next; - if (e != null) - break; - } + if (p != null || (p = head.next) != null) + do { + e = p.item; + p = succ(p); + } while (e == null && p != null); } finally { - q.fullyUnlock(); + fullyUnlock(); } - if (current == null) - exhausted = true; + exhausted = ((current = p) == null); if (e != null) { action.accept(e); return true; @@ -905,9 +906,20 @@ public class LinkedBlockingQueue exte return false; } + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + if (!exhausted) { + exhausted = true; + Node p = current; + current = null; + forEachFrom(action, p); + } + } + public int characteristics() { - return Spliterator.ORDERED | Spliterator.NONNULL | - Spliterator.CONCURRENT; + return (Spliterator.ORDERED | + Spliterator.NONNULL | + Spliterator.CONCURRENT); } } @@ -928,7 +940,48 @@ public class LinkedBlockingQueue exte * @since 1.8 */ public Spliterator spliterator() { - return new LBQSpliterator(this); + return new LBQSpliterator(); + } + + /** + * @throws NullPointerException {@inheritDoc} + */ + public void forEach(Consumer action) { + Objects.requireNonNull(action); + forEachFrom(action, null); + } + + /** + * Runs action on each element found during a traversal starting at p. + * If p is null, traversal starts at head. + */ + void forEachFrom(Consumer action, Node p) { + // Extract batches of elements while holding the lock; then + // run the action on the elements while not + final int batchSize = 32; // max number of elements per batch + Object[] es = null; // container for batch of elements + int n, len = 0; + do { + fullyLock(); + try { + if (es == null) { + if (p == null) p = head.next; + for (Node q = p; q != null; q = succ(q)) + if (q.item != null && ++len == batchSize) + break; + es = new Object[len]; + } + for (n = 0; p != null && n < len; p = succ(p)) + if ((es[n] = p.item) != null) + n++; + } finally { + fullyUnlock(); + } + for (int i = 0; i < n; i++) { + @SuppressWarnings("unchecked") E e = (E) es[i]; + action.accept(e); + } + } while (n > 0 && p != null); } /**