17 |
|
* Subscribers}, each managed by a {@link Subscription |
18 |
|
* Subscription}. The use of flow control helps address common |
19 |
|
* resource issues in "push" based asynchronous systems. |
20 |
< |
* |
20 |
> |
* |
21 |
|
* <p>These interfaces correspond to the <a |
22 |
|
* href="http://www.reactive-streams.org/"> reactive-streams</a> |
23 |
|
* specification. (<b>Preliminary release note:</b> This spec is |
85 |
|
* final Consumer<? super T> consumer; |
86 |
|
* Subscription subscription; |
87 |
|
* final long requestSize; |
88 |
< |
* long count; |
88 |
> |
* long count; |
89 |
|
* SampleSubscriber(long requestSize, Consumer<? super T> consumer) { |
90 |
|
* this.requestSize = requestSize; |
91 |
|
* this.consumer = consumer; |
95 |
|
* (this.subscription = subscription).request(requestSize); |
96 |
|
* } |
97 |
|
* public void onNext(T item) { |
98 |
< |
* if (--count <= 0) |
98 |
> |
* if (--count <= 0) |
99 |
|
* subscription.request(count = requestSize); |
100 |
|
* consumer.accept(item); |
101 |
|
* } |
163 |
|
* negative, the Subscriber will receive an onError signal |
164 |
|
* with an IllegalArgumentException argument. Otherwise, the |
165 |
|
* Subscriber will receive up to {@code n} additional onNext |
166 |
< |
* invocations (or fewer if terminated). |
166 |
> |
* invocations (or fewer if terminated). |
167 |
|
* |
168 |
|
* @param n the increment of demand; a value of {@code |
169 |
|
* Long.MAX_VALUE} may be considered as effectively unbounded |
178 |
|
} |
179 |
|
|
180 |
|
/** |
181 |
< |
* A component that acts as both a Subscriber and Publisher. |
181 |
> |
* A component that acts as both a Subscriber and Publisher. |
182 |
|
* |
183 |
< |
* @param <T> the subscribed item type |
184 |
< |
* @param <R> the published item type |
185 |
< |
*/ |
183 |
> |
* @param <T> the subscribed item type |
184 |
> |
* @param <R> the published item type |
185 |
> |
*/ |
186 |
|
public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> { |
187 |
|
} |
188 |
|
|
190 |
|
|
191 |
|
static final long DEFAULT_REQUEST_SIZE = 64L; |
192 |
|
|
193 |
< |
static abstract class CompletableSubscriber<T,U> implements Subscriber<T>, |
193 |
> |
static abstract class CompletableSubscriber<T,U> implements Subscriber<T>, |
194 |
|
Consumer<T> { |
195 |
|
final CompletableFuture<U> status; |
196 |
|
Subscription subscription; |
197 |
|
final long requestSize; |
198 |
< |
long count; |
199 |
< |
CompletableSubscriber(long requestSize, |
198 |
> |
long count; |
199 |
> |
CompletableSubscriber(long requestSize, |
200 |
|
CompletableFuture<U> status) { |
201 |
|
this.status = status; |
202 |
|
this.requestSize = requestSize; |
206 |
|
(this.subscription = subscription).request(requestSize); |
207 |
|
status.exceptionally(ex -> { subscription.cancel(); return null;}); |
208 |
|
} |
209 |
< |
public final void onError(Throwable ex) { |
210 |
< |
if (ex == null) |
209 |
> |
public final void onError(Throwable ex) { |
210 |
> |
if (ex == null) |
211 |
|
ex = new IllegalStateException("null onError argument"); |
212 |
|
status.completeExceptionally(ex); |
213 |
|
} |
218 |
|
new IllegalStateException("onNext without subscription")); |
219 |
|
else { |
220 |
|
try { |
221 |
< |
if (--count <= 0) |
221 |
> |
if (--count <= 0) |
222 |
|
s.request(count = requestSize); |
223 |
|
accept(item); |
224 |
|
} catch (Throwable ex) { |
227 |
|
} |
228 |
|
} |
229 |
|
} |
230 |
< |
|
230 |
> |
|
231 |
|
static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> { |
232 |
|
final Consumer<? super T> consumer; |
233 |
< |
ConsumeSubscriber(long requestSize, |
233 |
> |
ConsumeSubscriber(long requestSize, |
234 |
|
CompletableFuture<Void> status, |
235 |
|
Consumer<? super T> consumer) { |
236 |
|
super(requestSize, status); |
295 |
|
static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> { |
296 |
|
final Function<? super Stream<T>, ? extends R> fn; |
297 |
|
final ArrayList<T> items; |
298 |
< |
StreamSubscriber(long requestSize, |
298 |
> |
StreamSubscriber(long requestSize, |
299 |
|
CompletableFuture<R> status, |
300 |
|
Function<? super Stream<T>, ? extends R> fn) { |
301 |
|
super(requestSize, status); |
341 |
|
requestSize, status, streamFunction)); |
342 |
|
return status; |
343 |
|
} |
344 |
< |
|
344 |
> |
|
345 |
|
/** |
346 |
|
* Equivalent to {@link #stream(long, Publisher, Function)} |
347 |
|
* with a request size of 64. |
356 |
|
* @param <R> the result type of the stream function |
357 |
|
*/ |
358 |
|
public static <T,R> CompletableFuture<R> stream( |
359 |
< |
Publisher<T> publisher, |
359 |
> |
Publisher<T> publisher, |
360 |
|
Function<? super Stream<T>,? extends R> streamFunction) { |
361 |
|
return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction); |
362 |
|
} |