36 |
|
* nearest power of two and/or bounded by the largest value supported |
37 |
|
* by this implementation.) Invocations of {@link |
38 |
|
* Flow.Subscription#request} do not directly result in buffer |
39 |
< |
* expansion, but risk saturation if unfulfilled requests exceed the |
39 |
> |
* expansion, but risk saturation if unfilled requests exceed the |
40 |
|
* maximum capacity. Choices of buffer parameters rely on expected |
41 |
|
* rates, resources, and usages, that usually benefit from empirical |
42 |
< |
* testing. As a first guess, consider a value of 256. |
42 |
> |
* testing. As a first guess, consider a value of 64. |
43 |
|
* |
44 |
|
* <p>Publication methods support different policies about what to do |
45 |
|
* when buffers are saturated. Method {@link #submit} blocks until |
162 |
|
* async delivery to subscribers, and with the given maximum |
163 |
|
* buffer size for each subscriber. In the absence of other |
164 |
|
* constraints, consider using {@code ForkJoinPool.commonPool(), |
165 |
< |
* 256}. |
165 |
> |
* 64}. |
166 |
|
* |
167 |
|
* @param executor the executor to use for async delivery, |
168 |
|
* supporting creation of at least one independent thread |
262 |
|
synchronized (this) { |
263 |
|
if (closed) |
264 |
|
throw new IllegalStateException("Closed"); |
265 |
< |
BufferedSubscription<T> pred = null, next; |
266 |
< |
for (BufferedSubscription<T> b = clients; b != null; b = next) { |
265 |
> |
/* |
266 |
> |
* To reduce head-of-line blocking, try offer() on each, |
267 |
> |
* place saturated ones in retries list, and later wait |
268 |
> |
* them out. |
269 |
> |
*/ |
270 |
> |
BufferedSubscription<T> b = clients, retries = null, |
271 |
> |
rtail = null, pred = null, next; |
272 |
> |
for ( ; b != null; b = next) { |
273 |
|
int stat; |
274 |
|
next = b.next; |
275 |
< |
if ((stat = b.submit(item)) < 0) { |
275 |
> |
if ((stat = b.offer(item)) < 0) { |
276 |
|
if (pred == null) |
277 |
|
clients = next; |
278 |
|
else |
279 |
|
pred.next = next; |
280 |
|
} |
281 |
|
else { |
282 |
< |
pred = b; |
282 |
> |
if (stat == 0) { |
283 |
> |
if (rtail == null) |
284 |
> |
retries = b; |
285 |
> |
else |
286 |
> |
rtail.nextRetry = b; |
287 |
> |
rtail = b; |
288 |
> |
stat = maxBufferCapacity; |
289 |
> |
} |
290 |
|
if (stat > lag) |
291 |
|
lag = stat; |
292 |
+ |
pred = b; |
293 |
|
} |
294 |
|
} |
295 |
+ |
if (retries != null) |
296 |
+ |
retrySubmit(retries, item); |
297 |
|
} |
298 |
|
return lag; |
299 |
|
} |
300 |
|
|
301 |
|
/** |
302 |
+ |
* Calls submit on each subscription on retry list. |
303 |
+ |
*/ |
304 |
+ |
private void retrySubmit(BufferedSubscription<T> retries, T item) { |
305 |
+ |
for (BufferedSubscription<T> r = retries; r != null;) { |
306 |
+ |
BufferedSubscription<T> nextRetry = r.nextRetry; |
307 |
+ |
r.nextRetry = null; |
308 |
+ |
r.submit(item); |
309 |
+ |
r = nextRetry; |
310 |
+ |
} |
311 |
+ |
} |
312 |
+ |
|
313 |
+ |
/** |
314 |
|
* Publishes the given item, if possible, to each current |
315 |
|
* subscriber by asynchronously invoking its onNext method. The |
316 |
|
* item may be dropped by one or more subscribers if resource |
718 |
|
* Execution control is managed using the ACTIVE ctl bit. We |
719 |
|
* ensure that a task is active when consumable items (and |
720 |
|
* usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and |
721 |
< |
* there is demand (unfulfilled requests). This is complicated on |
721 |
> |
* there is demand (unfilled requests). This is complicated on |
722 |
|
* the creation side by the possibility of exceptions when trying |
723 |
|
* to execute tasks. These eventually force DISABLED state, but |
724 |
|
* sometimes not directly. On the task side, termination (clearing |
770 |
|
volatile Thread waiter; // blocked producer thread |
771 |
|
T putItem; // for offer within ManagedBlocker |
772 |
|
BufferedSubscription<T> next; // used only by publisher |
773 |
+ |
BufferedSubscription<T> nextRetry;// used only by publisher |
774 |
|
|
775 |
|
// ctl values |
776 |
|
static final int ACTIVE = 0x01; // consumer task active |
848 |
|
*/ |
849 |
|
final int growAndAdd(Object[] oldArray, T item) { |
850 |
|
int oldLen, newLen; |
851 |
< |
if (oldArray != null) |
823 |
< |
newLen = (oldLen = oldArray.length) << 1; |
824 |
< |
else if (ctl >= 0) { |
851 |
> |
if (oldArray == null) { |
852 |
|
oldLen = 0; |
853 |
< |
newLen = (maxCapacity < MINCAP) ? maxCapacity : MINCAP; |
853 |
> |
newLen = (maxCapacity >= MINCAP ? MINCAP : |
854 |
> |
maxCapacity >= 2 ? maxCapacity : 2); |
855 |
|
} |
856 |
< |
else |
857 |
< |
return -1; // disabled |
830 |
< |
if (oldLen >= maxCapacity || newLen <= 0) |
856 |
> |
else if ((oldLen = oldArray.length) >= maxCapacity || |
857 |
> |
(newLen = oldLen << 1) <= 0) |
858 |
|
return 0; // cannot grow |
859 |
+ |
if (ctl == DISABLED) |
860 |
+ |
return -1; |
861 |
|
Object[] newArray; |
862 |
|
try { |
863 |
|
newArray = new Object[newLen]; |
864 |
|
} catch (Throwable ex) { // try to cope with OOME |
865 |
< |
if (oldLen > 0) // avoid continuous failure |
866 |
< |
maxCapacity = oldLen; |
865 |
> |
if (oldLen > 0) |
866 |
> |
maxCapacity = oldLen; // avoid continuous failure |
867 |
|
return 0; |
868 |
|
} |
869 |
|
array = newArray; |