62 |
|
* them. For example here is a class that periodically publishes the |
63 |
|
* items generated from a supplier. (In practice you might add methods |
64 |
|
* to independently start and stop generation, to share schedulers |
65 |
< |
* among publishers, and so on, or instead use a SubmissionPublisher |
66 |
< |
* as a component rather than a superclass.) |
65 |
> |
* among publishers, and so on, or use a SubmissionPublisher as a |
66 |
> |
* component rather than a superclass.) |
67 |
|
* |
68 |
|
* <pre> {@code |
69 |
|
* class PeriodicPublisher<T> extends SubmissionPublisher<T> { |
199 |
|
|
200 |
|
/** |
201 |
|
* Adds the given Subscriber unless already subscribed. If |
202 |
< |
* already subscribed, the Subscriber's onError method is invoked |
203 |
< |
* on the existing subscription with an IllegalStateException. |
204 |
< |
* Otherwise, upon success, the Subscriber's onSubscribe method is |
205 |
< |
* invoked asynchronously with a new Subscription. If onSubscribe |
206 |
< |
* throws an exception, the subscription is cancelled. Otherwise, |
207 |
< |
* if this SubmissionPublisher is closed, the subscriber's |
208 |
< |
* onComplete method is then invoked. Subscribers may enable |
209 |
< |
* receiving items by invoking the {@code request} method of the |
210 |
< |
* new Subscription, and may unsubscribe by invoking its cancel |
211 |
< |
* method. |
202 |
> |
* already subscribed, the Subscriber's {@code onError} method is |
203 |
> |
* invoked on the existing subscription with an {@link |
204 |
> |
* IllegalStateException}. Otherwise, upon success, the |
205 |
> |
* Subscriber's {@code onSubscribe} method is invoked |
206 |
> |
* asynchronously with a new {@link Flow.Subscription}. If {@code |
207 |
> |
* onSubscribe} throws an exception, the subscription is |
208 |
> |
* cancelled. Otherwise, if this SubmissionPublisher is closed, |
209 |
> |
* the subscriber's {@code onComplete} method is then invoked. |
210 |
> |
* Subscribers may enable receiving items by invoking the {@code |
211 |
> |
* request} method of the new Subscription, and may unsubscribe by |
212 |
> |
* invoking its {@code cancel} method. |
213 |
|
* |
214 |
|
* @param subscriber the subscriber |
215 |
|
* @throws NullPointerException if subscriber is null |
251 |
|
|
252 |
|
/** |
253 |
|
* Publishes the given item to each current subscriber by |
254 |
< |
* asynchronously invoking its onNext method, blocking |
255 |
< |
* uninterruptibly while resources for any subscriber are |
256 |
< |
* unavailable. This method returns an estimate of the maximum lag |
257 |
< |
* (number of items submitted but not yet consumed) among all |
258 |
< |
* current subscribers. This value is at least one (accounting for |
259 |
< |
* this submitted item) if there are any subscribers, else zero. |
254 |
> |
* asynchronously invoking its {@link Flow.Subscriber#onNext} |
255 |
> |
* method, blocking uninterruptibly while resources for any |
256 |
> |
* subscriber are unavailable. This method returns an estimate of |
257 |
> |
* the maximum lag (number of items submitted but not yet |
258 |
> |
* consumed) among all current subscribers. This value is at least |
259 |
> |
* one (accounting for this submitted item) if there are any |
260 |
> |
* subscribers, else zero. |
261 |
|
* |
262 |
|
* <p>If the Executor for this publisher throws a |
263 |
|
* RejectedExecutionException (or any other RuntimeException or |
264 |
|
* Error) when attempting to asynchronously notify subscribers, |
265 |
< |
* then this exception is rethrown. |
265 |
> |
* then this exception is rethrown, in which case some but not all |
266 |
> |
* subscribers may have received this item. |
267 |
|
* |
268 |
|
* @param item the (non-null) item to publish |
269 |
|
* @return the estimated maximum lag among subscribers |
325 |
|
|
326 |
|
/** |
327 |
|
* Publishes the given item, if possible, to each current |
328 |
< |
* subscriber by asynchronously invoking its onNext method. The |
329 |
< |
* item may be dropped by one or more subscribers if resource |
330 |
< |
* limits are exceeded, in which case the given handler (if |
331 |
< |
* non-null) is invoked, and if it returns true, retried once. |
332 |
< |
* Other calls to methods in this class by other threads are |
333 |
< |
* blocked while the handler is invoked. Unless recovery is |
334 |
< |
* assured, options are usually limited to logging the error |
335 |
< |
* and/or issuing an onError signal to the subscriber. |
328 |
> |
* subscriber by asynchronously invoking its {@link |
329 |
> |
* Flow.Subscriber#onNext} method. The item may be dropped by one |
330 |
> |
* or more subscribers if resource limits are exceeded, in which |
331 |
> |
* case the given handler (if non-null) is invoked, and if it |
332 |
> |
* returns true, retried once. Other calls to methods in this |
333 |
> |
* class by other threads are blocked while the handler is |
334 |
> |
* invoked. Unless recovery is assured, options are usually |
335 |
> |
* limited to logging the error and/or issuing an {@code onError} |
336 |
> |
* signal to the subscriber. |
337 |
|
* |
338 |
|
* <p>This method returns a status indicator: If negative, it |
339 |
|
* represents the (negative) number of drops (failed attempts to |
366 |
|
|
367 |
|
/** |
368 |
|
* Publishes the given item, if possible, to each current |
369 |
< |
* subscriber by asynchronously invoking its onNext method, |
370 |
< |
* blocking while resources for any subscription are unavailable, |
371 |
< |
* up to the specified timeout or the caller thread is |
372 |
< |
* interrupted, at which point the given handler (if non-null) is |
373 |
< |
* invoked, and if it returns true, retried once. (The drop |
374 |
< |
* handler may distinguish timeouts from interrupts by checking |
375 |
< |
* whether the current thread is interrupted.) Other calls to |
376 |
< |
* methods in this class by other threads are blocked while the |
377 |
< |
* handler is invoked. Unless recovery is assured, options are |
378 |
< |
* usually limited to logging the error and/or issuing an onError |
379 |
< |
* signal to the subscriber. |
369 |
> |
* subscriber by asynchronously invoking its {@link |
370 |
> |
* Flow.Subscriber#onNext} method, blocking while resources for |
371 |
> |
* any subscription are unavailable, up to the specified timeout |
372 |
> |
* or the caller thread is interrupted, at which point the given |
373 |
> |
* handler (if non-null) is invoked, and if it returns true, |
374 |
> |
* retried once. (The drop handler may distinguish timeouts from |
375 |
> |
* interrupts by checking whether the current thread is |
376 |
> |
* interrupted.) Other calls to methods in this class by other |
377 |
> |
* threads are blocked while the handler is invoked. Unless |
378 |
> |
* recovery is assured, options are usually limited to logging the |
379 |
> |
* error and/or issuing an {@code onError} signal to the |
380 |
> |
* subscriber. |
381 |
|
* |
382 |
|
* <p>This method returns a status indicator: If negative, it |
383 |
|
* represents the (negative) number of drops (failed attempts to |
475 |
|
} |
476 |
|
|
477 |
|
/** |
478 |
< |
* Unless already closed, issues onComplete signals to current |
479 |
< |
* subscribers, and disallows subsequent attempts to publish. |
478 |
> |
* Unless already closed, issues {@link |
479 |
> |
* Flow.Subscriber#onComplete} signals to current subscribers, and |
480 |
> |
* disallows subsequent attempts to publish. Upon return, this |
481 |
> |
* method does <em>NOT</em> guarantee that all subscribers have |
482 |
> |
* yet completed. |
483 |
|
*/ |
484 |
|
public void close() { |
485 |
|
if (!closed) { |
498 |
|
} |
499 |
|
|
500 |
|
/** |
501 |
< |
* Unless already closed, issues onError signals to current |
502 |
< |
* subscribers with the given error, and disallows subsequent |
503 |
< |
* attempts to publish. |
501 |
> |
* Unless already closed, issues {@link Flow.Subscriber#onError} |
502 |
> |
* signals to current subscribers with the given error, and |
503 |
> |
* disallows subsequent attempts to publish. Upon return, this |
504 |
> |
* method does <em>NOT</em> guarantee that all subscribers have |
505 |
> |
* yet completed. |
506 |
|
* |
507 |
< |
* @param error the onError argument sent to subscribers |
507 |
> |
* @param error the {@code onError} argument sent to subscribers |
508 |
|
* @throws NullPointerException if error is null |
509 |
|
*/ |
510 |
|
public void closeExceptionally(Throwable error) { |
1052 |
|
e.execute(new ConsumerTask<T>(this)); |
1053 |
|
break; |
1054 |
|
} catch (RuntimeException | Error ex) { // back out |
1055 |
< |
do {} while ((c = ctl) >= 0 && |
1055 |
> |
do {} while (((c = ctl) & DISABLED) == 0 && |
1056 |
|
(c & ACTIVE) != 0 && |
1057 |
|
!U.compareAndSwapInt(this, CTL, c, |
1058 |
|
c & ~ACTIVE)); |