53 |
|
* RejectedExecutionException} (or any other RuntimeException or |
54 |
|
* Error) when attempting to execute a task, or a drop handler throws |
55 |
|
* an exception when processing a dropped item, then the exception is |
56 |
< |
* rethrown. In these cases, some but not all subscribers may have |
57 |
< |
* received the published item. It is usually good practice to {@link |
56 |
> |
* rethrown. In these cases, not all subscribers will have been issued |
57 |
> |
* the published item. It is usually good practice to {@link |
58 |
|
* #closeExceptionally closeExceptionally} in these cases. |
59 |
|
* |
60 |
|
* <p>This class may also serve as a convenient base for subclasses |
262 |
|
* <p>If the Executor for this publisher throws a |
263 |
|
* RejectedExecutionException (or any other RuntimeException or |
264 |
|
* Error) when attempting to asynchronously notify subscribers, |
265 |
< |
* then this exception is rethrown, in which case some but not all |
266 |
< |
* subscribers may have received this item. |
265 |
> |
* then this exception is rethrown, in which case not all |
266 |
> |
* subscribers will have been issued this item. |
267 |
|
* |
268 |
|
* @param item the (non-null) item to publish |
269 |
|
* @return the estimated maximum lag among subscribers |
543 |
|
boolean nonEmpty = false; |
544 |
|
if (!closed) { |
545 |
|
synchronized (this) { |
546 |
< |
BufferedSubscription<T> pred = null, next; |
547 |
< |
for (BufferedSubscription<T> b = clients; b != null; b = next) { |
548 |
< |
next = b.next; |
546 |
> |
for (BufferedSubscription<T> b = clients; b != null;) { |
547 |
> |
BufferedSubscription<T> next = b.next; |
548 |
|
if (b.isDisabled()) { |
549 |
|
b.next = null; |
550 |
< |
if (pred == null) |
552 |
< |
clients = next; |
553 |
< |
else |
554 |
< |
pred.next = next; |
550 |
> |
b = clients = next; |
551 |
|
} |
552 |
|
else { |
553 |
|
nonEmpty = true; |
792 |
|
* helping is currently supported. |
793 |
|
* |
794 |
|
* This class uses @Contended and heuristic field declaration |
795 |
< |
* ordering to reduce memory contention on BufferedSubscription |
796 |
< |
* itself, but it does not currently attempt to avoid memory |
797 |
< |
* contention (especially including card-marks) among buffer |
798 |
< |
* elements, that can significantly slow down some usages. |
795 |
> |
* ordering to reduce false-sharing-based memory contention among |
796 |
> |
* instances of BufferedSubscription, but it does not currently |
797 |
> |
* attempt to avoid memory contention among buffers. This field |
798 |
> |
* and element packing can hurt performance especially when each |
799 |
> |
* publisher has only one client operating at a high rate. |
800 |
|
* Addressing this may require allocating substantially more space |
801 |
|
* than users expect. |
802 |
|
*/ |
1182 |
|
if ((d = prev + n) < prev) // saturate |
1183 |
|
d = Long.MAX_VALUE; |
1184 |
|
if (U.compareAndSwapLong(this, DEMAND, prev, d)) { |
1185 |
< |
while (d != 0L) { |
1189 |
< |
int c, h; |
1185 |
> |
for (int c, h;;) { |
1186 |
|
if ((c = ctl) == DISABLED) |
1187 |
|
break; |
1188 |
|
else if ((c & ACTIVE) != 0) { |
1200 |
|
} |
1201 |
|
else if (head == h && tail == h) |
1202 |
|
break; // else stale |
1203 |
< |
d = demand; |
1203 |
> |
if (demand == 0L) |
1204 |
> |
break; |
1205 |
|
} |
1206 |
|
break; |
1207 |
|
} |