ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/Flow.java (file contents):
Revision 1.13 by jsr166, Mon Jan 19 00:49:29 2015 UTC vs.
Revision 1.14 by dl, Mon Jan 19 13:59:35 2015 UTC

# Line 16 | Line 16 | import java.util.stream.Stream;
16   * flow-controlled components in which {@link Publisher Publishers}
17   * produce items consumed by one or more {@link Subscriber
18   * Subscribers}, each managed by a {@link Subscription
19 < * Subscription}. The use of flow control helps address common
20 < * resource issues in "push" based asynchronous systems.
19 > * Subscription}.
20   *
21   * <p>These interfaces correspond to the <a
22   * href="http://www.reactive-streams.org/"> reactive-streams</a>
23 < * specification. (<b>Preliminary release note:</b> This spec is
24 < * not yet finalized, so minor details could change.)
23 > * specification.  They apply in both concurrent and distributed
24 > * asynchronous settings: All (seven) methods are defined in {@code
25 > * void} "oneway" message style. Communication relies on a simple form
26 > * of flow control (method {@link Subscription#request}) that can be
27 > * used to avoid resource management problems that may otherwise occur
28 > * in "push" based systems.
29   *
30   * <p><b>Examples.</b> A {@link Publisher} usually defines its own
31   * {@link Subscription} implementation; constructing one in method
# Line 30 | Line 33 | import java.util.stream.Stream;
33   * Subscriber}. It publishes items to the subscriber asynchronously,
34   * normally using an {@link Executor}.  For example, here is a very
35   * simple publisher that only issues (when requested) a single {@code
36 < * TRUE} item to each subscriber, then completes.  Because each
37 < * subscriber receives only the same single item, this class does not
38 < * need the buffering and ordering control required in most
39 < * implementations.
36 > * TRUE} item to any subscriber.  Because each subscriber receives
37 > * only the same single item, this class does not need the buffering
38 > * and ordering control required in most implementations (for
39 > * example {@link SubmissionPublisher}).
40   *
41   * <pre> {@code
42   * class OneShotPublisher implements Publisher<Boolean> {
# Line 76 | Line 79 | import java.util.stream.Stream;
79   * with less communication; for example with a value of 64, this keeps
80   * total outstanding requests between 32 and 64.  (See also {@link
81   * #consume(long, Publisher, Consumer)} that automates a common case.)
82 + * Because Subscriber method invocations for a given {@link
83 + * Subscription} are strictly ordered, there is no need for these
84 + * methods to use locks or volatiles unless a Subscriber maintains
85 + * multiple Subscriptions (in which case it is better to instead
86 + * define multiple Subscribers, each with its own Subscription).
87   *
88   * <pre> {@code
89   * class SampleSubscriber<T> implements Subscriber<T> {
# Line 100 | Line 108 | import java.util.stream.Stream;
108   *   public void onComplete() {}
109   * }}</pre>
110   *
111 < * <p>If there is no chance that a publisher will produce elements
104 < * faster than they can be consumed, a subscriber may initially
111 > * <p>When flow control is inapplicable, a subscriber may initially
112   * request an effectively unbounded number of items, as in:
113   *
114   * <pre> {@code
# Line 125 | Line 132 | public final class Flow {
132      /**
133       * A producer of items (and related control messages) received by
134       * Subscribers.  Each current {@link Subscriber} receives the same
135 <     * items (via method onNext) in the same order, unless drops or
136 <     * errors are encountered. If a Publisher encounters an error that
137 <     * does not allow further items to be issued to a Subscriber, that
138 <     * Subscriber receives onError, and then receives no further
139 <     * messages.  Otherwise, if it is known that no further items will
140 <     * be produced, each Subscriber receives onComplete.  Publishers
141 <     * may vary in policy about whether drops (failures to issue an
142 <     * item because of resource limitations) are treated as errors.
143 <     * Publishers may also vary about whether Subscribers receive
144 <     * items that were produced or available before they subscribed.
135 >     * items (via method {@code onNext}) in the same order, unless
136 >     * drops or errors are encountered. If a Publisher encounters an
137 >     * error that does not allow further items to be issued to a
138 >     * Subscriber, that Subscriber receives {@code onError}, and then
139 >     * receives no further messages.  Otherwise, if it is known that
140 >     * no further items will be produced, each Subscriber receives
141 >     * {@code onComplete}.  Publishers ensure that Subscriber method
142 >     * invocations for each subscription are strictly ordered in <a
143 >     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
144 >     * order.
145 >     *
146 >     * <p>Publishers may vary in policy about whether drops
147 >     * (failures to issue an item because of resource limitations) are
148 >     * treated as errors.  Publishers may also vary about whether
149 >     * Subscribers receive items that were produced or available
150 >     * before they subscribed.
151       *
152       * @param <T> the published item type
153       */
# Line 143 | Line 156 | public final class Flow {
156          /**
157           * Adds the given Subscriber if possible.  If already
158           * subscribed, or the attempt to subscribe fails due to policy
159 <         * violations or errors, the Subscriber's onError method is
160 <         * invoked with an IllegalStateException.  Otherwise, upon
161 <         * success, the Subscriber's onSubscribe method is invoked
162 <         * with a new Subscription.  Subscribers may enable receiving
163 <         * items by invoking the request method of this Subscription,
164 <         * and may unsubscribe by invoking its cancel method.
159 >         * violations or errors, the Subscriber's {@code onError}
160 >         * method is invoked with an {@link IllegalStateException}.
161 >         * Otherwise, the Subscriber's {@code onSubscribe} method is
162 >         * invoked with a new {@link Subscription}.  Subscribers may
163 >         * enable receiving items by invoking the {@code request}
164 >         * method of this Subscription, and may unsubscribe by
165 >         * invoking its {@code cancel} method.
166           *
167           * @param subscriber the subscriber
168           * @throws NullPointerException if subscriber is null
# Line 157 | Line 171 | public final class Flow {
171      }
172  
173      /**
174 <     * A receiver of messages.  The methods in this interface must be
175 <     * invoked sequentially by each Subscription, so are not required
176 <     * to be thread-safe unless subscribing to multiple publishers.
174 >     * A receiver of messages.  The methods in this interface are
175 >     * invoked in strict sequential order for each {@link
176 >     * Subscription}.
177       *
178       * @param <T> the subscribed item type
179       */
# Line 171 | Line 185 | public final class Flow {
185           * cause the Subscription to be cancelled.
186           *
187           * <p>Typically, implementations of this method invoke the
188 <         * subscription's request method to enable receiving items.
188 >         * subscription's {@code request} method to enable receiving
189 >         * items.
190           *
191           * @param subscription a new subscription
192           */
# Line 198 | Line 213 | public final class Flow {
213          public void onError(Throwable throwable);
214  
215          /**
216 <         * Method invoked when it is known that no additional onNext
217 <         * invocations will occur for a Subscription that is not
218 <         * already terminated by error, after which no other
216 >         * Method invoked when it is known that no additional {@code
217 >         * onNext} invocations will occur for a Subscription that is
218 >         * not already terminated by error, after which no other
219           * Subscriber methods are invoked by the Subscription.  If
220 <         * this method itself throws an exception, resulting behavior
221 <         * is undefined.
220 >         * this method throws an exception, resulting behavior is
221 >         * undefined.
222           */
223          public void onComplete();
224      }
225  
226      /**
227 <     * Message control linking Publishers and Subscribers.
228 <     * Subscribers receive items (via onNext) only when requested, and
229 <     * may cancel at any time. The methods in this interface are
230 <     * intended to be invoked only by their Subscribers.
227 >     * Message control linking a {@link Publisher} and {@link
228 >     * Subscriber}.  Subscribers receive items only when requested,
229 >     * and may cancel at any time. The methods in this interface are
230 >     * intended to be invoked only by their Subscribers; usages in
231 >     * other contexts have undefined effects.
232       */
233      public static interface Subscription {
234          /**
235           * Adds the given number {@code n} of items to the current
236           * unfulfilled demand for this subscription.  If {@code n} is
237 <         * negative, the Subscriber will receive an onError signal
238 <         * with an IllegalArgumentException argument. Otherwise, the
239 <         * Subscriber will receive up to {@code n} additional onNext
240 <         * invocations (or fewer if terminated).
237 >         * negative, the Subscriber will receive an {@code onError}
238 >         * signal with an {@link IllegalArgumentException}
239 >         * argument. Otherwise, the Subscriber will receive up to
240 >         * {@code n} additional {@code onNext} invocations (or fewer
241 >         * if terminated).
242           *
243           * @param n the increment of demand; a value of {@code
244           * Long.MAX_VALUE} may be considered as effectively unbounded
# Line 229 | Line 246 | public final class Flow {
246          public void request(long n);
247  
248          /**
249 <         * Causes the Subscriber to (eventually) stop receiving onNext
250 <         * messages.
249 >         * Causes the Subscriber to (eventually) stop receiving
250 >         * messages.  Implementation is best-effort -- one or more
251 >         * messages may be received after invoking this method.
252           */
253          public void cancel();
254      }
# Line 293 | Line 311 | public final class Flow {
311       * from the given publisher using the given Consumer function, and
312       * using the given bufferSize for buffering. Returns a
313       * CompletableFuture that is completed normally when the publisher
314 <     * signals onComplete, or completed exceptionally upon any error,
315 <     * including an exception thrown by the Consumer (in which case
316 <     * the subscription is cancelled if not already terminated).
314 >     * signals {@code onComplete}, or completed exceptionally upon any
315 >     * error, including an exception thrown by the Consumer (in which
316 >     * case the subscription is cancelled if not already terminated).
317       *
318       * @param <T> the published item type
319       * @param bufferSize the request size for subscriptions
# Line 359 | Line 377 | public final class Flow {
377       * stream operation to items, and uses the given bufferSize for
378       * buffering. Returns a CompletableFuture that is completed
379       * normally with the result of this function when the publisher
380 <     * signals onComplete, or is completed exceptionally upon any
381 <     * error.
380 >     * signals {@code onComplete}, or is completed exceptionally upon
381 >     * any error.
382       *
383       * <p><b>Preliminary release note:</b> Currently, this method
384       * collects all items before executing the stream

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines