11 |
|
import java.util.concurrent.locks.ReentrantLock; |
12 |
|
import java.util.AbstractQueue; |
13 |
|
import java.util.Collection; |
14 |
+ |
import java.util.Collections; |
15 |
|
import java.util.Iterator; |
16 |
|
import java.util.NoSuchElementException; |
17 |
+ |
import java.util.Spliterator; |
18 |
+ |
import java.util.stream.Stream; |
19 |
+ |
import java.util.stream.Streams; |
20 |
+ |
import java.util.function.Consumer; |
21 |
|
|
22 |
|
/** |
23 |
|
* An optionally-bounded {@linkplain BlockingQueue blocking queue} based on |
832 |
|
} |
833 |
|
} |
834 |
|
|
835 |
+ |
static final class LBQSpliterator<E> implements Spliterator<E> { |
836 |
+ |
// Similar idea to ConcurrentLinkedQueue spliterator |
837 |
+ |
static final int MAX_BATCH = 1 << 11; // saturate batch size |
838 |
+ |
final LinkedBlockingQueue<E> queue; |
839 |
+ |
Node<E> current; // current node; null until initialized |
840 |
+ |
int batch; // batch size for splits |
841 |
+ |
boolean exhausted; // true when no more nodes |
842 |
+ |
long est; // size estimate |
843 |
+ |
LBQSpliterator(LinkedBlockingQueue<E> queue) { |
844 |
+ |
this.queue = queue; |
845 |
+ |
this.est = queue.size(); |
846 |
+ |
} |
847 |
+ |
|
848 |
+ |
public long estimateSize() { return est; } |
849 |
+ |
|
850 |
+ |
public Spliterator<E> trySplit() { |
851 |
+ |
int n; |
852 |
+ |
final LinkedBlockingQueue<E> q = this.queue; |
853 |
+ |
if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) { |
854 |
+ |
Object[] a = new Object[batch = n]; |
855 |
+ |
int i = 0; |
856 |
+ |
Node<E> p = current; |
857 |
+ |
q.fullyLock(); |
858 |
+ |
try { |
859 |
+ |
if (p != null || (p = q.head.next) != null) { |
860 |
+ |
do { |
861 |
+ |
if ((a[i] = p.item) != null) |
862 |
+ |
++i; |
863 |
+ |
} while ((p = p.next) != null && i < n); |
864 |
+ |
} |
865 |
+ |
} finally { |
866 |
+ |
q.fullyUnlock(); |
867 |
+ |
} |
868 |
+ |
if ((current = p) == null) { |
869 |
+ |
est = 0L; |
870 |
+ |
exhausted = true; |
871 |
+ |
} |
872 |
+ |
else if ((est -= i) <= 0L) |
873 |
+ |
est = 1L; |
874 |
+ |
return Collections.arraySnapshotSpliterator |
875 |
+ |
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | |
876 |
+ |
Spliterator.CONCURRENT); |
877 |
+ |
} |
878 |
+ |
return null; |
879 |
+ |
} |
880 |
+ |
|
881 |
+ |
public void forEach(Consumer<? super E> action) { |
882 |
+ |
if (action == null) throw new NullPointerException(); |
883 |
+ |
final LinkedBlockingQueue<E> q = this.queue; |
884 |
+ |
if (!exhausted) { |
885 |
+ |
exhausted = true; |
886 |
+ |
Node<E> p = current; |
887 |
+ |
do { |
888 |
+ |
E e = null; |
889 |
+ |
q.fullyLock(); |
890 |
+ |
try { |
891 |
+ |
if (p == null) |
892 |
+ |
p = q.head.next; |
893 |
+ |
while (p != null) { |
894 |
+ |
e = p.item; |
895 |
+ |
p = p.next; |
896 |
+ |
if (e != null) |
897 |
+ |
break; |
898 |
+ |
} |
899 |
+ |
} finally { |
900 |
+ |
q.fullyUnlock(); |
901 |
+ |
} |
902 |
+ |
if (e != null) |
903 |
+ |
action.accept(e); |
904 |
+ |
} while (p != null); |
905 |
+ |
} |
906 |
+ |
} |
907 |
+ |
|
908 |
+ |
public boolean tryAdvance(Consumer<? super E> action) { |
909 |
+ |
if (action == null) throw new NullPointerException(); |
910 |
+ |
final LinkedBlockingQueue<E> q = this.queue; |
911 |
+ |
if (!exhausted) { |
912 |
+ |
E e = null; |
913 |
+ |
q.fullyLock(); |
914 |
+ |
try { |
915 |
+ |
if (current == null) |
916 |
+ |
current = q.head.next; |
917 |
+ |
while (current != null) { |
918 |
+ |
e = current.item; |
919 |
+ |
current = current.next; |
920 |
+ |
if (e != null) |
921 |
+ |
break; |
922 |
+ |
} |
923 |
+ |
} finally { |
924 |
+ |
q.fullyUnlock(); |
925 |
+ |
} |
926 |
+ |
if (e != null) { |
927 |
+ |
action.accept(e); |
928 |
+ |
return true; |
929 |
+ |
} |
930 |
+ |
exhausted = true; |
931 |
+ |
} |
932 |
+ |
return false; |
933 |
+ |
} |
934 |
+ |
|
935 |
+ |
public int characteristics() { |
936 |
+ |
return Spliterator.ORDERED | Spliterator.NONNULL | |
937 |
+ |
Spliterator.CONCURRENT; |
938 |
+ |
} |
939 |
+ |
} |
940 |
+ |
|
941 |
+ |
Spliterator<E> spliterator() { |
942 |
+ |
return new LBQSpliterator<E>(this); |
943 |
+ |
} |
944 |
+ |
|
945 |
+ |
public Stream<E> stream() { |
946 |
+ |
return Streams.stream(spliterator()); |
947 |
+ |
} |
948 |
+ |
|
949 |
+ |
public Stream<E> parallelStream() { |
950 |
+ |
return Streams.parallelStream(spliterator()); |
951 |
+ |
} |
952 |
+ |
|
953 |
|
/** |
954 |
|
* Saves this queue to a stream (that is, serializes it). |
955 |
|
* |