7 |
|
package java.util.concurrent; |
8 |
|
|
9 |
|
import java.util.AbstractQueue; |
10 |
+ |
import java.util.Arrays; |
11 |
|
import java.util.Collection; |
12 |
+ |
import java.util.Collections; |
13 |
|
import java.util.Iterator; |
14 |
|
import java.util.NoSuchElementException; |
15 |
|
import java.util.Queue; |
16 |
|
import java.util.concurrent.TimeUnit; |
17 |
|
import java.util.concurrent.locks.LockSupport; |
18 |
+ |
import java.util.Spliterator; |
19 |
+ |
import java.util.stream.Stream; |
20 |
+ |
import java.util.stream.Streams; |
21 |
+ |
import java.util.function.Consumer; |
22 |
|
|
23 |
|
/** |
24 |
|
* An unbounded {@link TransferQueue} based on linked nodes. |
754 |
|
} |
755 |
|
|
756 |
|
/** |
757 |
+ |
* Version of firstOfMode used by Spliterator |
758 |
+ |
*/ |
759 |
+ |
final Node firstDataNode() { |
760 |
+ |
for (Node p = head; p != null;) { |
761 |
+ |
Object item = p.item; |
762 |
+ |
if (p.isData) { |
763 |
+ |
if (item != null && item != p) |
764 |
+ |
return p; |
765 |
+ |
} |
766 |
+ |
else if (item == null) |
767 |
+ |
break; |
768 |
+ |
if (p == (p = p.next)) |
769 |
+ |
p = head; |
770 |
+ |
} |
771 |
+ |
return null; |
772 |
+ |
} |
773 |
+ |
|
774 |
+ |
/** |
775 |
|
* Returns the item in the first unmatched node with isData; or |
776 |
|
* null if none. Used by peek. |
777 |
|
*/ |
904 |
|
unsplice(lastPred, lastRet); |
905 |
|
} |
906 |
|
} |
907 |
+ |
|
908 |
+ |
// Very similar to ConcurrentLinkedQueue spliterator |
909 |
+ |
static final class LTQSpliterator<E> implements Spliterator<E> { |
910 |
+ |
static final int MAX_BATCH = 1 << 10; // saturate batch size |
911 |
+ |
final LinkedTransferQueue<E> queue; |
912 |
+ |
Node current; // current node; null until initialized |
913 |
+ |
int batch; // batch size for splits |
914 |
+ |
boolean exhausted; // true when no more nodes |
915 |
+ |
LTQSpliterator(LinkedTransferQueue<E> queue) { |
916 |
+ |
this.queue = queue; |
917 |
+ |
} |
918 |
+ |
|
919 |
+ |
/* |
920 |
+ |
* Split into arrays of arithmetically increasing batch sizes, |
921 |
+ |
* giving up at MAX_BATCH. Treat the result as a |
922 |
+ |
* CopyOnWriteArrayList array snapshot. This will only |
923 |
+ |
* improve parallel performance if per-element forEach actions |
924 |
+ |
* are more costly than transfering them into an array. If |
925 |
+ |
* not, we limit slowdowns by eventually returning null split. |
926 |
+ |
*/ |
927 |
+ |
public Spliterator<E> trySplit() { |
928 |
+ |
Node p; int n; |
929 |
+ |
final LinkedTransferQueue<E> q = this.queue; |
930 |
+ |
if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH && |
931 |
+ |
((p = current) != null || (p = q.firstDataNode()) != null)) { |
932 |
+ |
Object[] a = new Object[batch = n]; |
933 |
+ |
int i = 0; |
934 |
+ |
do { |
935 |
+ |
if ((a[i] = p.item) != null) |
936 |
+ |
++i; |
937 |
+ |
if (p == (p = p.next)) |
938 |
+ |
p = q.firstDataNode(); |
939 |
+ |
} while (p != null && i < n); |
940 |
+ |
if ((current = p) == null) |
941 |
+ |
exhausted = true; |
942 |
+ |
return Collections.arraySnapshotSpliterator |
943 |
+ |
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | |
944 |
+ |
Spliterator.CONCURRENT); |
945 |
+ |
} |
946 |
+ |
return null; |
947 |
+ |
} |
948 |
+ |
|
949 |
+ |
@SuppressWarnings("unchecked") |
950 |
+ |
public void forEach(Consumer<? super E> action) { |
951 |
+ |
Node p; |
952 |
+ |
if (action == null) throw new NullPointerException(); |
953 |
+ |
final LinkedTransferQueue<E> q = this.queue; |
954 |
+ |
if (!exhausted && |
955 |
+ |
((p = current) != null || (p = q.firstDataNode()) != null)) { |
956 |
+ |
exhausted = true; |
957 |
+ |
do { |
958 |
+ |
Object e = p.item; |
959 |
+ |
if (p == (p = p.next)) |
960 |
+ |
p = q.firstDataNode(); |
961 |
+ |
if (e != null) |
962 |
+ |
action.accept((E)e); |
963 |
+ |
} while (p != null); |
964 |
+ |
} |
965 |
+ |
} |
966 |
+ |
|
967 |
+ |
@SuppressWarnings("unchecked") |
968 |
+ |
public boolean tryAdvance(Consumer<? super E> action) { |
969 |
+ |
Node p; |
970 |
+ |
if (action == null) throw new NullPointerException(); |
971 |
+ |
final LinkedTransferQueue<E> q = this.queue; |
972 |
+ |
if (!exhausted && |
973 |
+ |
((p = current) != null || (p = q.firstDataNode()) != null)) { |
974 |
+ |
Object e; |
975 |
+ |
do { |
976 |
+ |
e = p.item; |
977 |
+ |
if (p == (p = p.next)) |
978 |
+ |
p = q.firstDataNode(); |
979 |
+ |
} while (e == null && p != null); |
980 |
+ |
if ((current = p) == null) |
981 |
+ |
exhausted = true; |
982 |
+ |
if (e != null) { |
983 |
+ |
action.accept((E)e); |
984 |
+ |
return true; |
985 |
+ |
} |
986 |
+ |
} |
987 |
+ |
return false; |
988 |
+ |
} |
989 |
+ |
|
990 |
+ |
public int characteristics() { |
991 |
+ |
return Spliterator.ORDERED | Spliterator.NONNULL | |
992 |
+ |
Spliterator.CONCURRENT; |
993 |
+ |
} |
994 |
+ |
} |
995 |
+ |
|
996 |
+ |
|
997 |
+ |
Spliterator<E> spliterator() { |
998 |
+ |
return new LTQSpliterator<E>(this); |
999 |
+ |
} |
1000 |
+ |
|
1001 |
+ |
public Stream<E> stream() { |
1002 |
+ |
return Streams.stream(spliterator()); |
1003 |
+ |
} |
1004 |
+ |
|
1005 |
+ |
public Stream<E> parallelStream() { |
1006 |
+ |
return Streams.parallelStream(spliterator()); |
1007 |
+ |
} |
1008 |
|
|
1009 |
|
/* -------------- Removal methods -------------- */ |
1010 |
|
|