7 |
package java.util.concurrent; |
package java.util.concurrent; |
8 |
|
|
9 |
import java.util.AbstractQueue; |
import java.util.AbstractQueue; |
10 |
|
import java.util.Arrays; |
11 |
import java.util.Collection; |
import java.util.Collection; |
12 |
|
import java.util.Collections; |
13 |
import java.util.Iterator; |
import java.util.Iterator; |
14 |
import java.util.NoSuchElementException; |
import java.util.NoSuchElementException; |
15 |
import java.util.Queue; |
import java.util.Queue; |
16 |
import java.util.concurrent.TimeUnit; |
import java.util.concurrent.TimeUnit; |
17 |
import java.util.concurrent.locks.LockSupport; |
import java.util.concurrent.locks.LockSupport; |
18 |
|
import java.util.Spliterator; |
19 |
|
import java.util.stream.Stream; |
20 |
|
import java.util.stream.Streams; |
21 |
|
import java.util.function.Consumer; |
22 |
|
|
23 |
/** |
/** |
24 |
* An unbounded {@link TransferQueue} based on linked nodes. |
* An unbounded {@link TransferQueue} based on linked nodes. |
754 |
} |
} |
755 |
|
|
756 |
/** |
/** |
757 |
|
* Version of firstOfMode used by Spliterator |
758 |
|
*/ |
759 |
|
final Node firstDataNode() { |
760 |
|
for (Node p = head; p != null;) { |
761 |
|
Object item = p.item; |
762 |
|
if (p.isData) { |
763 |
|
if (item != null && item != p) |
764 |
|
return p; |
765 |
|
} |
766 |
|
else if (item == null) |
767 |
|
break; |
768 |
|
if (p == (p = p.next)) |
769 |
|
p = head; |
770 |
|
} |
771 |
|
return null; |
772 |
|
} |
773 |
|
|
774 |
|
/** |
775 |
* Returns the item in the first unmatched node with isData; or |
* Returns the item in the first unmatched node with isData; or |
776 |
* null if none. Used by peek. |
* null if none. Used by peek. |
777 |
*/ |
*/ |
905 |
} |
} |
906 |
} |
} |
907 |
|
|
908 |
|
// Very similar to ConcurrentLinkedQueue spliterator |
909 |
|
static final class LTQSpliterator<E> implements Spliterator<E> { |
910 |
|
static final int MAX_BATCH = 1 << 10; // saturate batch size |
911 |
|
final LinkedTransferQueue<E> queue; |
912 |
|
Node current; // current node; null until initialized |
913 |
|
int batch; // batch size for splits |
914 |
|
boolean exhausted; // true when no more nodes |
915 |
|
LTQSpliterator(LinkedTransferQueue<E> queue) { |
916 |
|
this.queue = queue; |
917 |
|
} |
918 |
|
|
919 |
|
/* |
920 |
|
* Split into arrays of arithmetically increasing batch sizes, |
921 |
|
* giving up at MAX_BATCH. Treat the result as a |
922 |
|
* CopyOnWriteArrayList array snapshot. This will only |
923 |
|
* improve parallel performance if per-element forEach actions |
924 |
|
* are more costly than transfering them into an array. If |
925 |
|
* not, we limit slowdowns by eventually returning null split. |
926 |
|
*/ |
927 |
|
public Spliterator<E> trySplit() { |
928 |
|
Node p; int n; |
929 |
|
final LinkedTransferQueue<E> q = this.queue; |
930 |
|
if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH && |
931 |
|
((p = current) != null || (p = q.firstDataNode()) != null)) { |
932 |
|
Object[] a = new Object[batch = n]; |
933 |
|
int i = 0; |
934 |
|
do { |
935 |
|
if ((a[i] = p.item) != null) |
936 |
|
++i; |
937 |
|
if (p == (p = p.next)) |
938 |
|
p = q.firstDataNode(); |
939 |
|
} while (p != null && i < n); |
940 |
|
if ((current = p) == null) |
941 |
|
exhausted = true; |
942 |
|
return Collections.arraySnapshotSpliterator |
943 |
|
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | |
944 |
|
Spliterator.CONCURRENT); |
945 |
|
} |
946 |
|
return null; |
947 |
|
} |
948 |
|
|
949 |
|
@SuppressWarnings("unchecked") |
950 |
|
public void forEach(Consumer<? super E> action) { |
951 |
|
Node p; |
952 |
|
if (action == null) throw new NullPointerException(); |
953 |
|
final LinkedTransferQueue<E> q = this.queue; |
954 |
|
if (!exhausted && |
955 |
|
((p = current) != null || (p = q.firstDataNode()) != null)) { |
956 |
|
exhausted = true; |
957 |
|
do { |
958 |
|
Object e = p.item; |
959 |
|
if (p == (p = p.next)) |
960 |
|
p = q.firstDataNode(); |
961 |
|
if (e != null) |
962 |
|
action.accept((E)e); |
963 |
|
} while (p != null); |
964 |
|
} |
965 |
|
} |
966 |
|
|
967 |
|
@SuppressWarnings("unchecked") |
968 |
|
public boolean tryAdvance(Consumer<? super E> action) { |
969 |
|
Node p; |
970 |
|
if (action == null) throw new NullPointerException(); |
971 |
|
final LinkedTransferQueue<E> q = this.queue; |
972 |
|
if (!exhausted && |
973 |
|
((p = current) != null || (p = q.firstDataNode()) != null)) { |
974 |
|
Object e; |
975 |
|
do { |
976 |
|
e = p.item; |
977 |
|
if (p == (p = p.next)) |
978 |
|
p = q.firstDataNode(); |
979 |
|
} while (e == null && p != null); |
980 |
|
if ((current = p) == null) |
981 |
|
exhausted = true; |
982 |
|
if (e != null) { |
983 |
|
action.accept((E)e); |
984 |
|
return true; |
985 |
|
} |
986 |
|
} |
987 |
|
return false; |
988 |
|
} |
989 |
|
|
990 |
|
public int characteristics() { |
991 |
|
return Spliterator.ORDERED | Spliterator.NONNULL | |
992 |
|
Spliterator.CONCURRENT; |
993 |
|
} |
994 |
|
} |
995 |
|
|
996 |
|
|
997 |
|
Spliterator<E> spliterator() { |
998 |
|
return new LTQSpliterator<E>(this); |
999 |
|
} |
1000 |
|
|
1001 |
|
public Stream<E> stream() { |
1002 |
|
return Streams.stream(spliterator()); |
1003 |
|
} |
1004 |
|
|
1005 |
|
public Stream<E> parallelStream() { |
1006 |
|
return Streams.parallelStream(spliterator()); |
1007 |
|
} |
1008 |
|
|
1009 |
/* -------------- Removal methods -------------- */ |
/* -------------- Removal methods -------------- */ |
1010 |
|
|
1011 |
/** |
/** |