ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Flow.java (file contents):
Revision 1.1 by dl, Thu Jan 15 15:58:31 2015 UTC vs.
Revision 1.2 by jsr166, Thu Jan 15 17:07:17 2015 UTC

# Line 17 | Line 17 | import java.util.stream.Stream;
17   * Subscribers}, each managed by a {@link Subscription
18   * Subscription}. The use of flow control helps address common
19   * resource issues in "push" based asynchronous systems.
20 < *
20 > *
21   * <p>These interfaces correspond to the <a
22   * href="http://www.reactive-streams.org/"> reactive-streams</a>
23   * specification. (<b>Preliminary release note:</b> This spec is
# Line 85 | Line 85 | public final class Flow {
85       *   final Consumer<? super T> consumer;
86       *   Subscription subscription;
87       *   final long requestSize;
88 <     *   long count;
88 >     *   long count;
89       *   SampleSubscriber(long requestSize, Consumer<? super T> consumer) {
90       *     this.requestSize = requestSize;
91       *     this.consumer = consumer;
# Line 95 | Line 95 | public final class Flow {
95       *     (this.subscription = subscription).request(requestSize);
96       *   }
97       *   public void onNext(T item) {
98 <     *     if (--count <= 0)
98 >     *     if (--count <= 0)
99       *       subscription.request(count = requestSize);
100       *     consumer.accept(item);
101       *   }
# Line 163 | Line 163 | public final class Flow {
163           * negative, the Subscriber will receive an onError signal
164           * with an IllegalArgumentException argument. Otherwise, the
165           * Subscriber will receive up to {@code n} additional onNext
166 <         * invocations (or fewer if terminated).
166 >         * invocations (or fewer if terminated).
167           *
168           * @param n the increment of demand; a value of {@code
169           * Long.MAX_VALUE} may be considered as effectively unbounded
# Line 178 | Line 178 | public final class Flow {
178      }
179  
180      /**
181 <     * A component that acts as both a Subscriber and Publisher.
181 >     * A component that acts as both a Subscriber and Publisher.
182       *
183 <     * @param <T> the subscribed item type
184 <     * @param <R> the published item type
185 <     */    
183 >     * @param <T> the subscribed item type
184 >     * @param <R> the published item type
185 >     */
186      public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
187      }
188  
# Line 190 | Line 190 | public final class Flow {
190  
191      static final long DEFAULT_REQUEST_SIZE = 64L;
192  
193 <    static abstract class CompletableSubscriber<T,U> implements Subscriber<T>,
193 >    static abstract class CompletableSubscriber<T,U> implements Subscriber<T>,
194                                                                  Consumer<T> {
195          final CompletableFuture<U> status;
196          Subscription subscription;
197          final long requestSize;
198 <        long count;
199 <        CompletableSubscriber(long requestSize,
198 >        long count;
199 >        CompletableSubscriber(long requestSize,
200                                CompletableFuture<U> status) {
201              this.status = status;
202              this.requestSize = requestSize;
# Line 206 | Line 206 | public final class Flow {
206              (this.subscription = subscription).request(requestSize);
207              status.exceptionally(ex -> { subscription.cancel(); return null;});
208          }
209 <        public final void onError(Throwable ex) {
210 <            if (ex == null)
209 >        public final void onError(Throwable ex) {
210 >            if (ex == null)
211                  ex = new IllegalStateException("null onError argument");
212              status.completeExceptionally(ex);
213          }
# Line 218 | Line 218 | public final class Flow {
218                      new IllegalStateException("onNext without subscription"));
219              else {
220                  try {
221 <                    if (--count <= 0)
221 >                    if (--count <= 0)
222                          s.request(count = requestSize);
223                      accept(item);
224                  } catch (Throwable ex) {
# Line 227 | Line 227 | public final class Flow {
227              }
228          }
229      }
230 <    
230 >
231      static final class ConsumeSubscriber<T> extends CompletableSubscriber<T,Void> {
232          final Consumer<? super T> consumer;
233 <        ConsumeSubscriber(long requestSize,
233 >        ConsumeSubscriber(long requestSize,
234                            CompletableFuture<Void> status,
235                            Consumer<? super T> consumer) {
236              super(requestSize, status);
# Line 295 | Line 295 | public final class Flow {
295      static final class StreamSubscriber<T,R> extends CompletableSubscriber<T,R> {
296          final Function<? super Stream<T>, ? extends R> fn;
297          final ArrayList<T> items;
298 <        StreamSubscriber(long requestSize,
298 >        StreamSubscriber(long requestSize,
299                           CompletableFuture<R> status,
300                           Function<? super Stream<T>, ? extends R> fn) {
301              super(requestSize, status);
# Line 341 | Line 341 | public final class Flow {
341                                  requestSize, status, streamFunction));
342          return status;
343      }
344 <        
344 >
345      /**
346       * Equivalent to {@link #stream(long, Publisher, Function)}
347       * with a request size of 64.
# Line 356 | Line 356 | public final class Flow {
356       * @param <R> the result type of the stream function
357       */
358      public static <T,R> CompletableFuture<R> stream(
359 <        Publisher<T> publisher,
359 >        Publisher<T> publisher,
360          Function<? super Stream<T>,? extends R> streamFunction) {
361          return stream(DEFAULT_REQUEST_SIZE, publisher, streamFunction);
362      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines