10 |
|
import java.util.Collection; |
11 |
|
import java.util.Iterator; |
12 |
|
import java.util.NoSuchElementException; |
13 |
+ |
import java.util.Objects; |
14 |
|
import java.util.Spliterator; |
15 |
|
import java.util.Spliterators; |
16 |
|
import java.util.concurrent.atomic.AtomicInteger; |
206 |
|
putLock.unlock(); |
207 |
|
} |
208 |
|
|
208 |
– |
// /** |
209 |
– |
// * Tells whether both locks are held by current thread. |
210 |
– |
// */ |
211 |
– |
// boolean isFullyLocked() { |
212 |
– |
// return (putLock.isHeldByCurrentThread() && |
213 |
– |
// takeLock.isHeldByCurrentThread()); |
214 |
– |
// } |
215 |
– |
|
209 |
|
/** |
210 |
|
* Creates a {@code LinkedBlockingQueue} with a capacity of |
211 |
|
* {@link Integer#MAX_VALUE}. |
481 |
|
* Unlinks interior Node p with predecessor trail. |
482 |
|
*/ |
483 |
|
void unlink(Node<E> p, Node<E> trail) { |
484 |
< |
// assert isFullyLocked(); |
484 |
> |
// assert putLock.isHeldByCurrentThread(); |
485 |
> |
// assert takeLock.isHeldByCurrentThread(); |
486 |
|
// p.next is not changed, to allow iterators that are |
487 |
|
// traversing p to maintain their weak-consistency guarantee. |
488 |
|
p.item = null; |
666 |
|
* @throws IllegalArgumentException {@inheritDoc} |
667 |
|
*/ |
668 |
|
public int drainTo(Collection<? super E> c, int maxElements) { |
669 |
< |
if (c == null) |
676 |
< |
throw new NullPointerException(); |
669 |
> |
Objects.requireNonNull(c); |
670 |
|
if (c == this) |
671 |
|
throw new IllegalArgumentException(); |
672 |
|
if (maxElements <= 0) |
705 |
|
} |
706 |
|
|
707 |
|
/** |
708 |
+ |
* Used for any element traversal that is not entirely under lock. |
709 |
+ |
* Such traversals must handle both: |
710 |
+ |
* - dequeued nodes (p.next == p) |
711 |
+ |
* - (possibly multiple) interior removed nodes (p.item == null) |
712 |
+ |
*/ |
713 |
+ |
Node<E> succ(Node<E> p) { |
714 |
+ |
return (p == (p = p.next)) ? head.next : p; |
715 |
+ |
} |
716 |
+ |
|
717 |
+ |
/** |
718 |
|
* Returns an iterator over the elements in this queue in proper sequence. |
719 |
|
* The elements will be returned in order from first (head) to last (tail). |
720 |
|
* |
741 |
|
Itr() { |
742 |
|
fullyLock(); |
743 |
|
try { |
744 |
< |
current = head.next; |
742 |
< |
if (current != null) |
744 |
> |
if ((current = head.next) != null) |
745 |
|
currentElement = current.item; |
746 |
|
} finally { |
747 |
|
fullyUnlock(); |
753 |
|
} |
754 |
|
|
755 |
|
public E next() { |
756 |
+ |
Node<E> p; |
757 |
+ |
if ((p = current) == null) |
758 |
+ |
throw new NoSuchElementException(); |
759 |
+ |
E ret = currentElement, e = null; |
760 |
+ |
lastRet = p; |
761 |
|
fullyLock(); |
762 |
|
try { |
763 |
< |
if (current == null) |
764 |
< |
throw new NoSuchElementException(); |
765 |
< |
lastRet = current; |
759 |
< |
E item = null; |
760 |
< |
// Unlike other traversal methods, iterators must handle both: |
761 |
< |
// - dequeued nodes (p.next == p) |
762 |
< |
// - (possibly multiple) interior removed nodes (p.item == null) |
763 |
< |
for (Node<E> p = current, q;; p = q) { |
764 |
< |
if ((q = p.next) == p) |
765 |
< |
q = head.next; |
766 |
< |
if (q == null || (item = q.item) != null) { |
767 |
< |
current = q; |
768 |
< |
E x = currentElement; |
769 |
< |
currentElement = item; |
770 |
< |
return x; |
771 |
< |
} |
772 |
< |
} |
763 |
> |
for (p = p.next; p != null; p = succ(p)) |
764 |
> |
if ((e = p.item) != null) |
765 |
> |
break; |
766 |
|
} finally { |
767 |
|
fullyUnlock(); |
768 |
|
} |
769 |
+ |
current = p; |
770 |
+ |
currentElement = e; |
771 |
+ |
return ret; |
772 |
+ |
} |
773 |
+ |
|
774 |
+ |
public void forEachRemaining(Consumer<? super E> action) { |
775 |
+ |
// A variant of forEachFrom |
776 |
+ |
Objects.requireNonNull(action); |
777 |
+ |
Node<E> p; |
778 |
+ |
if ((p = current) == null) return; |
779 |
+ |
lastRet = current; |
780 |
+ |
current = null; |
781 |
+ |
final int batchSize = 32; |
782 |
+ |
Object[] es = null; |
783 |
+ |
int n, len = 1; |
784 |
+ |
do { |
785 |
+ |
fullyLock(); |
786 |
+ |
try { |
787 |
+ |
if (es == null) { |
788 |
+ |
p = p.next; |
789 |
+ |
for (Node<E> q = p; q != null; q = succ(q)) |
790 |
+ |
if (q.item != null && ++len == batchSize) |
791 |
+ |
break; |
792 |
+ |
es = new Object[len]; |
793 |
+ |
es[0] = currentElement; |
794 |
+ |
currentElement = null; |
795 |
+ |
n = 1; |
796 |
+ |
} else |
797 |
+ |
n = 0; |
798 |
+ |
for (; p != null && n < len; p = succ(p)) |
799 |
+ |
if ((es[n] = p.item) != null) { |
800 |
+ |
lastRet = p; |
801 |
+ |
n++; |
802 |
+ |
} |
803 |
+ |
} finally { |
804 |
+ |
fullyUnlock(); |
805 |
+ |
} |
806 |
+ |
for (int i = 0; i < n; i++) { |
807 |
+ |
@SuppressWarnings("unchecked") E e = (E) es[i]; |
808 |
+ |
action.accept(e); |
809 |
+ |
} |
810 |
+ |
} while (n > 0 && p != null); |
811 |
|
} |
812 |
|
|
813 |
|
public void remove() { |
831 |
|
} |
832 |
|
} |
833 |
|
|
834 |
< |
/** A customized variant of Spliterators.IteratorSpliterator */ |
835 |
< |
static final class LBQSpliterator<E> implements Spliterator<E> { |
834 |
> |
/** |
835 |
> |
* A customized variant of Spliterators.IteratorSpliterator. |
836 |
> |
* Keep this class in sync with (very similar) LBDSpliterator. |
837 |
> |
*/ |
838 |
> |
private final class LBQSpliterator implements Spliterator<E> { |
839 |
|
static final int MAX_BATCH = 1 << 25; // max batch array size; |
802 |
– |
final LinkedBlockingQueue<E> queue; |
840 |
|
Node<E> current; // current node; null until initialized |
841 |
|
int batch; // batch size for splits |
842 |
|
boolean exhausted; // true when no more nodes |
843 |
< |
long est; // size estimate |
844 |
< |
LBQSpliterator(LinkedBlockingQueue<E> queue) { |
845 |
< |
this.queue = queue; |
809 |
< |
this.est = queue.size(); |
810 |
< |
} |
843 |
> |
long est = size(); // size estimate |
844 |
> |
|
845 |
> |
LBQSpliterator() {} |
846 |
|
|
847 |
|
public long estimateSize() { return est; } |
848 |
|
|
849 |
|
public Spliterator<E> trySplit() { |
850 |
|
Node<E> h; |
816 |
– |
final LinkedBlockingQueue<E> q = this.queue; |
851 |
|
int b = batch; |
852 |
|
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1; |
853 |
|
if (!exhausted && |
854 |
< |
((h = current) != null || (h = q.head.next) != null) && |
855 |
< |
h.next != null) { |
854 |
> |
((h = current) != null || (h = head.next) != null) |
855 |
> |
&& h.next != null) { |
856 |
|
Object[] a = new Object[n]; |
857 |
|
int i = 0; |
858 |
|
Node<E> p = current; |
859 |
< |
q.fullyLock(); |
859 |
> |
fullyLock(); |
860 |
|
try { |
861 |
< |
if (p != null || (p = q.head.next) != null) { |
862 |
< |
do { |
861 |
> |
if (p != null || (p = head.next) != null) |
862 |
> |
for (; p != null && i < n; p = succ(p)) |
863 |
|
if ((a[i] = p.item) != null) |
864 |
< |
++i; |
831 |
< |
} while ((p = p.next) != null && i < n); |
832 |
< |
} |
864 |
> |
i++; |
865 |
|
} finally { |
866 |
< |
q.fullyUnlock(); |
866 |
> |
fullyUnlock(); |
867 |
|
} |
868 |
|
if ((current = p) == null) { |
869 |
|
est = 0L; |
882 |
|
return null; |
883 |
|
} |
884 |
|
|
853 |
– |
public void forEachRemaining(Consumer<? super E> action) { |
854 |
– |
if (action == null) throw new NullPointerException(); |
855 |
– |
final LinkedBlockingQueue<E> q = this.queue; |
856 |
– |
if (!exhausted) { |
857 |
– |
exhausted = true; |
858 |
– |
Node<E> p = current; |
859 |
– |
do { |
860 |
– |
E e = null; |
861 |
– |
q.fullyLock(); |
862 |
– |
try { |
863 |
– |
if (p == null) |
864 |
– |
p = q.head.next; |
865 |
– |
while (p != null) { |
866 |
– |
e = p.item; |
867 |
– |
p = p.next; |
868 |
– |
if (e != null) |
869 |
– |
break; |
870 |
– |
} |
871 |
– |
} finally { |
872 |
– |
q.fullyUnlock(); |
873 |
– |
} |
874 |
– |
if (e != null) |
875 |
– |
action.accept(e); |
876 |
– |
} while (p != null); |
877 |
– |
} |
878 |
– |
} |
879 |
– |
|
885 |
|
public boolean tryAdvance(Consumer<? super E> action) { |
886 |
< |
if (action == null) throw new NullPointerException(); |
882 |
< |
final LinkedBlockingQueue<E> q = this.queue; |
886 |
> |
Objects.requireNonNull(action); |
887 |
|
if (!exhausted) { |
888 |
+ |
Node<E> p = current; |
889 |
|
E e = null; |
890 |
< |
q.fullyLock(); |
890 |
> |
fullyLock(); |
891 |
|
try { |
892 |
< |
if (current == null) |
893 |
< |
current = q.head.next; |
894 |
< |
while (current != null) { |
895 |
< |
e = current.item; |
896 |
< |
current = current.next; |
892 |
< |
if (e != null) |
893 |
< |
break; |
894 |
< |
} |
892 |
> |
if (p != null || (p = head.next) != null) |
893 |
> |
do { |
894 |
> |
e = p.item; |
895 |
> |
p = succ(p); |
896 |
> |
} while (e == null && p != null); |
897 |
|
} finally { |
898 |
< |
q.fullyUnlock(); |
898 |
> |
fullyUnlock(); |
899 |
|
} |
900 |
< |
if (current == null) |
899 |
< |
exhausted = true; |
900 |
> |
exhausted = ((current = p) == null); |
901 |
|
if (e != null) { |
902 |
|
action.accept(e); |
903 |
|
return true; |
906 |
|
return false; |
907 |
|
} |
908 |
|
|
909 |
+ |
public void forEachRemaining(Consumer<? super E> action) { |
910 |
+ |
Objects.requireNonNull(action); |
911 |
+ |
if (!exhausted) { |
912 |
+ |
exhausted = true; |
913 |
+ |
Node<E> p = current; |
914 |
+ |
current = null; |
915 |
+ |
forEachFrom(action, p); |
916 |
+ |
} |
917 |
+ |
} |
918 |
+ |
|
919 |
|
public int characteristics() { |
920 |
< |
return Spliterator.ORDERED | Spliterator.NONNULL | |
921 |
< |
Spliterator.CONCURRENT; |
920 |
> |
return (Spliterator.ORDERED | |
921 |
> |
Spliterator.NONNULL | |
922 |
> |
Spliterator.CONCURRENT); |
923 |
|
} |
924 |
|
} |
925 |
|
|
940 |
|
* @since 1.8 |
941 |
|
*/ |
942 |
|
public Spliterator<E> spliterator() { |
943 |
< |
return new LBQSpliterator<E>(this); |
943 |
> |
return new LBQSpliterator(); |
944 |
> |
} |
945 |
> |
|
946 |
> |
/** |
947 |
> |
* @throws NullPointerException {@inheritDoc} |
948 |
> |
*/ |
949 |
> |
public void forEach(Consumer<? super E> action) { |
950 |
> |
Objects.requireNonNull(action); |
951 |
> |
forEachFrom(action, null); |
952 |
> |
} |
953 |
> |
|
954 |
> |
/** |
955 |
> |
* Runs action on each element found during a traversal starting at p. |
956 |
> |
* If p is null, traversal starts at head. |
957 |
> |
*/ |
958 |
> |
void forEachFrom(Consumer<? super E> action, Node<E> p) { |
959 |
> |
// Extract batches of elements while holding the lock; then |
960 |
> |
// run the action on the elements while not |
961 |
> |
final int batchSize = 32; // max number of elements per batch |
962 |
> |
Object[] es = null; // container for batch of elements |
963 |
> |
int n, len = 0; |
964 |
> |
do { |
965 |
> |
fullyLock(); |
966 |
> |
try { |
967 |
> |
if (es == null) { |
968 |
> |
if (p == null) p = head.next; |
969 |
> |
for (Node<E> q = p; q != null; q = succ(q)) |
970 |
> |
if (q.item != null && ++len == batchSize) |
971 |
> |
break; |
972 |
> |
es = new Object[len]; |
973 |
> |
} |
974 |
> |
for (n = 0; p != null && n < len; p = succ(p)) |
975 |
> |
if ((es[n] = p.item) != null) |
976 |
> |
n++; |
977 |
> |
} finally { |
978 |
> |
fullyUnlock(); |
979 |
> |
} |
980 |
> |
for (int i = 0; i < n; i++) { |
981 |
> |
@SuppressWarnings("unchecked") E e = (E) es[i]; |
982 |
> |
action.accept(e); |
983 |
> |
} |
984 |
> |
} while (n > 0 && p != null); |
985 |
|
} |
986 |
|
|
987 |
|
/** |