89 |
|
* the following example, where a buffer size of 1 single-steps, and |
90 |
|
* larger sizes usually allow for more efficient overlapped processing |
91 |
|
* with less communication; for example with a value of 64, this keeps |
92 |
< |
* total outstanding requests between 32 and 64. (See also {@link |
93 |
< |
* #consume(long, Publisher, Consumer)} that automates a common case.) |
92 |
> |
* total outstanding requests between 32 and 64. |
93 |
|
* Because Subscriber method invocations for a given {@link |
94 |
|
* Subscription} are strictly ordered, there is no need for these |
95 |
|
* methods to use locks or volatiles unless a Subscriber maintains |
279 |
|
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { |
280 |
|
} |
281 |
|
|
283 |
– |
// Support for static methods |
284 |
– |
|
282 |
|
static final int DEFAULT_BUFFER_SIZE = 256; |
283 |
|
|
284 |
|
/** |
294 |
|
return DEFAULT_BUFFER_SIZE; |
295 |
|
} |
296 |
|
|
300 |
– |
abstract static class CompletableSubscriber<T,U> |
301 |
– |
implements Subscriber<T>, Consumer<T> |
302 |
– |
{ |
303 |
– |
final CompletableFuture<U> status; |
304 |
– |
Subscription subscription; |
305 |
– |
long requestSize; |
306 |
– |
long count; |
307 |
– |
CompletableSubscriber(long bufferSize, CompletableFuture<U> status) { |
308 |
– |
this.status = status; |
309 |
– |
this.requestSize = bufferSize; |
310 |
– |
} |
311 |
– |
public final void onSubscribe(Subscription subscription) { |
312 |
– |
long rs = requestSize; |
313 |
– |
count = requestSize -= (rs >>> 1); |
314 |
– |
(this.subscription = subscription).request(rs); |
315 |
– |
} |
316 |
– |
public final void onError(Throwable ex) { |
317 |
– |
status.completeExceptionally(ex); |
318 |
– |
} |
319 |
– |
public void onNext(T item) { |
320 |
– |
try { |
321 |
– |
if (--count <= 0) |
322 |
– |
subscription.request(count = requestSize); |
323 |
– |
accept(item); |
324 |
– |
} catch (Throwable ex) { |
325 |
– |
subscription.cancel(); |
326 |
– |
status.completeExceptionally(ex); |
327 |
– |
} |
328 |
– |
} |
329 |
– |
} |
330 |
– |
|
331 |
– |
static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> { |
332 |
– |
final Consumer<? super T> consumer; |
333 |
– |
ConsumeSubscriber(long bufferSize, |
334 |
– |
CompletableFuture<Void> status, |
335 |
– |
Consumer<? super T> consumer) { |
336 |
– |
super(bufferSize, status); |
337 |
– |
this.consumer = consumer; |
338 |
– |
} |
339 |
– |
public void accept(T item) { consumer.accept(item); } |
340 |
– |
public void onComplete() { status.complete(null); } |
341 |
– |
} |
342 |
– |
|
343 |
– |
/** |
344 |
– |
* Creates and subscribes a Subscriber that consumes all items |
345 |
– |
* from the given publisher using the given Consumer function, and |
346 |
– |
* using the given bufferSize for buffering. Returns a |
347 |
– |
* CompletableFuture that is completed normally when the publisher |
348 |
– |
* signals {@code onComplete}, or completed exceptionally upon any |
349 |
– |
* error, including an exception thrown by the Consumer (in which |
350 |
– |
* case the subscription is cancelled if not already terminated). |
351 |
– |
* Other attempts to cancel the CompletableFuture need not |
352 |
– |
* cause the computation to terminate. |
353 |
– |
* |
354 |
– |
* @param <T> the published item type |
355 |
– |
* @param bufferSize the request size for subscriptions |
356 |
– |
* @param publisher the publisher |
357 |
– |
* @param consumer the function applied to each onNext item |
358 |
– |
* @return a CompletableFuture that is completed normally |
359 |
– |
* when the publisher signals onComplete, and exceptionally |
360 |
– |
* upon any error |
361 |
– |
* @throws NullPointerException if publisher or consumer are null |
362 |
– |
* @throws IllegalArgumentException if bufferSize not positive |
363 |
– |
*/ |
364 |
– |
public static <T> CompletableFuture<Void> consume( |
365 |
– |
long bufferSize, Publisher<T> publisher, Consumer<? super T> consumer) { |
366 |
– |
if (bufferSize <= 0L) |
367 |
– |
throw new IllegalArgumentException("bufferSize must be positive"); |
368 |
– |
if (publisher == null || consumer == null) |
369 |
– |
throw new NullPointerException(); |
370 |
– |
CompletableFuture<Void> status = new CompletableFuture<>(); |
371 |
– |
publisher.subscribe(new ConsumeSubscriber<T>( |
372 |
– |
bufferSize, status, consumer)); |
373 |
– |
return status; |
374 |
– |
} |
375 |
– |
|
376 |
– |
/** |
377 |
– |
* Equivalent to {@link #consume(long, Publisher, Consumer)} |
378 |
– |
* with {@link #defaultBufferSize}. |
379 |
– |
* |
380 |
– |
* @param <T> the published item type |
381 |
– |
* @param publisher the publisher |
382 |
– |
* @param consumer the function applied to each onNext item |
383 |
– |
* @return a CompletableFuture that is completed normally |
384 |
– |
* when the publisher signals onComplete, and exceptionally |
385 |
– |
* upon any error |
386 |
– |
* @throws NullPointerException if publisher or consumer are null |
387 |
– |
*/ |
388 |
– |
public static <T> CompletableFuture<Void> consume( |
389 |
– |
Publisher<T> publisher, Consumer<? super T> consumer) { |
390 |
– |
return consume(defaultBufferSize(), publisher, consumer); |
391 |
– |
} |
392 |
– |
|
393 |
– |
/** |
394 |
– |
* Temporary implementation for Stream, collecting all items |
395 |
– |
* and then applying stream operation. |
396 |
– |
*/ |
397 |
– |
static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> { |
398 |
– |
final Function<? super Stream<T>, ? extends R> fn; |
399 |
– |
final ArrayList<T> items; |
400 |
– |
StreamSubscriber(long bufferSize, |
401 |
– |
CompletableFuture<R> status, |
402 |
– |
Function<? super Stream<T>, ? extends R> fn) { |
403 |
– |
super(bufferSize, status); |
404 |
– |
this.fn = fn; |
405 |
– |
this.items = new ArrayList<T>(); |
406 |
– |
} |
407 |
– |
public void accept(T item) { items.add(item); } |
408 |
– |
public void onComplete() { status.complete(fn.apply(items.stream())); } |
409 |
– |
} |
410 |
– |
|
411 |
– |
/** |
412 |
– |
* Creates and subscribes a Subscriber that applies the given |
413 |
– |
* stream operation to items, and uses the given bufferSize for |
414 |
– |
* buffering. Returns a CompletableFuture that is completed |
415 |
– |
* normally with the result of this function when the publisher |
416 |
– |
* signals {@code onComplete}, or is completed exceptionally upon |
417 |
– |
* any error. Other attempts to cancel the CompletableFuture need |
418 |
– |
* not cause the computation to terminate. |
419 |
– |
* |
420 |
– |
* <p><b>Preliminary release note:</b> Currently, this method |
421 |
– |
* collects all items before executing the stream |
422 |
– |
* computation. Improvements are pending Stream integration. |
423 |
– |
* |
424 |
– |
* @param <T> the published item type |
425 |
– |
* @param <R> the result type of the stream function |
426 |
– |
* @param bufferSize the request size for subscriptions |
427 |
– |
* @param publisher the publisher |
428 |
– |
* @param streamFunction the operation on elements |
429 |
– |
* @return a CompletableFuture that is completed normally with the |
430 |
– |
* result of the given function as result when the publisher signals |
431 |
– |
* onComplete, and exceptionally upon any error |
432 |
– |
* @throws NullPointerException if publisher or function are null |
433 |
– |
* @throws IllegalArgumentException if bufferSize not positive |
434 |
– |
*/ |
435 |
– |
public static <T,R> CompletableFuture<R> stream( |
436 |
– |
long bufferSize, Publisher<T> publisher, |
437 |
– |
Function<? super Stream<T>, ? extends R> streamFunction) { |
438 |
– |
if (bufferSize <= 0L) |
439 |
– |
throw new IllegalArgumentException("bufferSize must be positive"); |
440 |
– |
if (publisher == null || streamFunction == null) |
441 |
– |
throw new NullPointerException(); |
442 |
– |
CompletableFuture<R> status = new CompletableFuture<>(); |
443 |
– |
publisher.subscribe(new StreamSubscriber<T,R>( |
444 |
– |
bufferSize, status, streamFunction)); |
445 |
– |
return status; |
446 |
– |
} |
447 |
– |
|
448 |
– |
/** |
449 |
– |
* Equivalent to {@link #stream(long, Publisher, Function)} |
450 |
– |
* with {@link #defaultBufferSize}. |
451 |
– |
* |
452 |
– |
* @param <T> the published item type |
453 |
– |
* @param <R> the result type of the stream function |
454 |
– |
* @param publisher the publisher |
455 |
– |
* @param streamFunction the operation on elements |
456 |
– |
* @return a CompletableFuture that is completed normally with the |
457 |
– |
* result of the given function as result when the publisher signals |
458 |
– |
* onComplete, and exceptionally upon any error |
459 |
– |
* @throws NullPointerException if publisher or function are null |
460 |
– |
*/ |
461 |
– |
public static <T,R> CompletableFuture<R> stream( |
462 |
– |
Publisher<T> publisher, |
463 |
– |
Function<? super Stream<T>,? extends R> streamFunction) { |
464 |
– |
return stream(defaultBufferSize(), publisher, streamFunction); |
465 |
– |
} |
297 |
|
} |