15 |
|
import java.util.function.BiConsumer; |
16 |
|
import java.util.function.BiPredicate; |
17 |
|
import java.util.function.Consumer; |
18 |
< |
import static java.util.concurrent.Flow.*; // improve code readablitly |
18 |
> |
import static java.util.concurrent.Flow.Publisher; |
19 |
> |
import static java.util.concurrent.Flow.Subscriber; |
20 |
> |
import static java.util.concurrent.Flow.Subscription; |
21 |
|
|
22 |
|
/** |
23 |
|
* A {@link Flow.Publisher} that asynchronously issues submitted |
198 |
|
* publishing. Unsubscribing occurs only during traversal loops, |
199 |
|
* when BufferedSubscription methods return negative values |
200 |
|
* signifying that they have been closed. To reduce |
201 |
< |
* head-of-line blocking, submit and offer methods first call |
201 |
> |
* head-of-line blocking, submit and offer methods first call |
202 |
|
* BufferedSubscription.offer on each subscriber, and place |
203 |
|
* saturated ones in retries list (using nextRetry field), and |
204 |
|
* retry, possibly blocking or dropping. |
354 |
|
|
355 |
|
/** |
356 |
|
* Common implementation for all three forms of submit and offer. |
357 |
< |
* Acts as submit if nanos == Long.MAX_VALUE, else offer |
357 |
> |
* Acts as submit if nanos == Long.MAX_VALUE, else offer. |
358 |
|
*/ |
359 |
|
private int doOffer(T item, long nanos, |
360 |
|
BiPredicate<Subscriber<? super T>, ? super T> onDrop) { |
397 |
|
|
398 |
|
/** |
399 |
|
* Helps, (timed) waits for, and/or drops buffers on list; returns |
400 |
< |
* lag or negative drops (for use in offer) |
400 |
> |
* lag or negative drops (for use in offer). |
401 |
|
*/ |
402 |
|
private int retryOffer(T item, long nanos, |
403 |
|
BiPredicate<Subscriber<? super T>, ? super T> onDrop, |
951 |
|
* subscribers per publisher). We additionally segregate some |
952 |
|
* fields that would otherwise nearly always encounter cache line |
953 |
|
* contention among producers and consumers. To reduce contention |
954 |
< |
* acrosss time (vs space), consumers only periodically update |
954 |
> |
* across time (vs space), consumers only periodically update |
955 |
|
* other fields (see method takeItems), at the expense of possibly |
956 |
|
* staler reporting of lags and demand (bounded at 12.5% == 1/8 |
957 |
|
* capacity) and possibly more atomic operations. |