8 |
|
|
9 |
|
import java.util.AbstractQueue; |
10 |
|
import java.util.Collection; |
11 |
+ |
import java.util.Collections; |
12 |
|
import java.util.Iterator; |
13 |
|
import java.util.NoSuchElementException; |
14 |
|
import java.util.concurrent.locks.Condition; |
15 |
|
import java.util.concurrent.locks.ReentrantLock; |
16 |
+ |
import java.util.Spliterator; |
17 |
+ |
import java.util.stream.Stream; |
18 |
+ |
import java.util.stream.Streams; |
19 |
+ |
import java.util.function.Consumer; |
20 |
|
|
21 |
|
/** |
22 |
|
* An optionally-bounded {@linkplain BlockingDeque blocking deque} based on |
1119 |
|
private class Itr extends AbstractItr { |
1120 |
|
Node<E> firstNode() { return first; } |
1121 |
|
Node<E> nextNode(Node<E> n) { return n.next; } |
1122 |
+ |
// minimal, unsplittable Spliterator implementation |
1123 |
+ |
public boolean tryAdvance(Consumer<? super E> action) { |
1124 |
+ |
if (hasNext()) { |
1125 |
+ |
action.accept(next()); |
1126 |
+ |
return true; |
1127 |
+ |
} |
1128 |
+ |
return false; |
1129 |
+ |
} |
1130 |
+ |
public void forEach(Consumer<? super E> action) { |
1131 |
+ |
while (hasNext()) |
1132 |
+ |
action.accept(next()); |
1133 |
+ |
} |
1134 |
+ |
public int characteristics() { |
1135 |
+ |
return Spliterator.ORDERED | Spliterator.NONNULL | Spliterator.CONCURRENT; |
1136 |
+ |
} |
1137 |
|
} |
1138 |
|
|
1139 |
|
/** Descending iterator */ |
1142 |
|
Node<E> nextNode(Node<E> n) { return n.prev; } |
1143 |
|
} |
1144 |
|
|
1145 |
+ |
static final class LBDSpliterator<E> implements Spliterator<E> { |
1146 |
+ |
// Similar idea to ConcurrentLinkedQueue spliterator |
1147 |
+ |
static final int MAX_BATCH = 1 << 11; // saturate batch size |
1148 |
+ |
final LinkedBlockingDeque<E> queue; |
1149 |
+ |
Node<E> current; // current node; null until initialized |
1150 |
+ |
int batch; // batch size for splits |
1151 |
+ |
boolean exhausted; // true when no more nodes |
1152 |
+ |
long est; // size estimate |
1153 |
+ |
LBDSpliterator(LinkedBlockingDeque<E> queue) { |
1154 |
+ |
this.queue = queue; |
1155 |
+ |
this.est = queue.size(); |
1156 |
+ |
} |
1157 |
+ |
|
1158 |
+ |
public long estimateSize() { return est; } |
1159 |
+ |
|
1160 |
+ |
public Spliterator<E> trySplit() { |
1161 |
+ |
int n; |
1162 |
+ |
final LinkedBlockingDeque<E> q = this.queue; |
1163 |
+ |
final ReentrantLock lock = q.lock; |
1164 |
+ |
if (!exhausted && (n = batch + 1) > 0 && n <= MAX_BATCH) { |
1165 |
+ |
Object[] a = new Object[batch = n]; |
1166 |
+ |
int i = 0; |
1167 |
+ |
Node<E> p = current; |
1168 |
+ |
lock.lock(); |
1169 |
+ |
try { |
1170 |
+ |
if (p != null || (p = q.first) != null) { |
1171 |
+ |
do { |
1172 |
+ |
if ((a[i] = p.item) != null) |
1173 |
+ |
++i; |
1174 |
+ |
} while ((p = p.next) != null && i < n); |
1175 |
+ |
} |
1176 |
+ |
} finally { |
1177 |
+ |
lock.unlock(); |
1178 |
+ |
} |
1179 |
+ |
if ((current = p) == null) { |
1180 |
+ |
est = 0L; |
1181 |
+ |
exhausted = true; |
1182 |
+ |
} |
1183 |
+ |
else if ((est -= i) <= 0L) |
1184 |
+ |
est = 1L; |
1185 |
+ |
return Collections.arraySnapshotSpliterator |
1186 |
+ |
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL | |
1187 |
+ |
Spliterator.CONCURRENT); |
1188 |
+ |
} |
1189 |
+ |
return null; |
1190 |
+ |
} |
1191 |
+ |
|
1192 |
+ |
public void forEach(Consumer<? super E> action) { |
1193 |
+ |
if (action == null) throw new NullPointerException(); |
1194 |
+ |
final LinkedBlockingDeque<E> q = this.queue; |
1195 |
+ |
final ReentrantLock lock = q.lock; |
1196 |
+ |
if (!exhausted) { |
1197 |
+ |
exhausted = true; |
1198 |
+ |
Node<E> p = current; |
1199 |
+ |
do { |
1200 |
+ |
E e = null; |
1201 |
+ |
lock.lock(); |
1202 |
+ |
try { |
1203 |
+ |
if (p == null) |
1204 |
+ |
p = q.first; |
1205 |
+ |
while (p != null) { |
1206 |
+ |
e = p.item; |
1207 |
+ |
p = p.next; |
1208 |
+ |
if (e != null) |
1209 |
+ |
break; |
1210 |
+ |
} |
1211 |
+ |
} finally { |
1212 |
+ |
lock.unlock(); |
1213 |
+ |
} |
1214 |
+ |
if (e != null) |
1215 |
+ |
action.accept(e); |
1216 |
+ |
} while (p != null); |
1217 |
+ |
} |
1218 |
+ |
} |
1219 |
+ |
|
1220 |
+ |
public boolean tryAdvance(Consumer<? super E> action) { |
1221 |
+ |
if (action == null) throw new NullPointerException(); |
1222 |
+ |
final LinkedBlockingDeque<E> q = this.queue; |
1223 |
+ |
final ReentrantLock lock = q.lock; |
1224 |
+ |
if (!exhausted) { |
1225 |
+ |
E e = null; |
1226 |
+ |
lock.lock(); |
1227 |
+ |
try { |
1228 |
+ |
if (current == null) |
1229 |
+ |
current = q.first; |
1230 |
+ |
while (current != null) { |
1231 |
+ |
e = current.item; |
1232 |
+ |
current = current.next; |
1233 |
+ |
if (e != null) |
1234 |
+ |
break; |
1235 |
+ |
} |
1236 |
+ |
} finally { |
1237 |
+ |
lock.unlock(); |
1238 |
+ |
} |
1239 |
+ |
if (e != null) { |
1240 |
+ |
action.accept(e); |
1241 |
+ |
return true; |
1242 |
+ |
} |
1243 |
+ |
exhausted = true; |
1244 |
+ |
} |
1245 |
+ |
return false; |
1246 |
+ |
} |
1247 |
+ |
|
1248 |
+ |
public int characteristics() { |
1249 |
+ |
return Spliterator.ORDERED | Spliterator.NONNULL | |
1250 |
+ |
Spliterator.CONCURRENT; |
1251 |
+ |
} |
1252 |
+ |
} |
1253 |
+ |
|
1254 |
+ |
Spliterator<E> spliterator() { |
1255 |
+ |
return new LBDSpliterator<E>(this); |
1256 |
+ |
} |
1257 |
+ |
public Stream<E> stream() { |
1258 |
+ |
return Streams.stream(spliterator()); |
1259 |
+ |
} |
1260 |
+ |
|
1261 |
+ |
public Stream<E> parallelStream() { |
1262 |
+ |
return Streams.parallelStream(spliterator()); |
1263 |
+ |
} |
1264 |
+ |
|
1265 |
|
/** |
1266 |
|
* Saves this deque to a stream (that is, serializes it). |
1267 |
|
* |