ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Flow.java
Revision: 1.32
Committed: Fri Aug 4 23:05:47 2017 UTC (6 years, 9 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.31: +2 -2 lines
Log Message:
make javadoc sample code conform to request rule

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8 jsr166 1.9
9 dl 1.1 /**
10     * Interrelated interfaces and static methods for establishing
11     * flow-controlled components in which {@link Publisher Publishers}
12     * produce items consumed by one or more {@link Subscriber
13     * Subscribers}, each managed by a {@link Subscription
14 dl 1.14 * Subscription}.
15 jsr166 1.2 *
16 dl 1.1 * <p>These interfaces correspond to the <a
17     * href="http://www.reactive-streams.org/"> reactive-streams</a>
18 dl 1.14 * specification. They apply in both concurrent and distributed
19     * asynchronous settings: All (seven) methods are defined in {@code
20 jsr166 1.20 * void} "one-way" message style. Communication relies on a simple form
21 dl 1.14 * of flow control (method {@link Subscription#request}) that can be
22     * used to avoid resource management problems that may otherwise occur
23     * in "push" based systems.
24 dl 1.1 *
25 dl 1.10 * <p><b>Examples.</b> A {@link Publisher} usually defines its own
26     * {@link Subscription} implementation; constructing one in method
27     * {@code subscribe} and issuing it to the calling {@link
28     * Subscriber}. It publishes items to the subscriber asynchronously,
29     * normally using an {@link Executor}. For example, here is a very
30     * simple publisher that only issues (when requested) a single {@code
31 dl 1.16 * TRUE} item to a single subscriber. Because the subscriber receives
32     * only a single item, this class does not use buffering and ordering
33     * control required in most implementations (for example {@link
34     * SubmissionPublisher}).
35 dl 1.10 *
36     * <pre> {@code
37     * class OneShotPublisher implements Publisher<Boolean> {
38 dl 1.23 * private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
39 dl 1.26 * private boolean subscribed; // true after first subscribe
40 dl 1.16 * public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
41     * if (subscribed)
42 jsr166 1.30 * subscriber.onError(new IllegalStateException()); // only one allowed
43 dl 1.16 * else {
44     * subscribed = true;
45 dl 1.11 * subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
46 dl 1.16 * }
47 dl 1.10 * }
48     * static class OneShotSubscription implements Subscription {
49 dl 1.23 * private final Subscriber<? super Boolean> subscriber;
50     * private final ExecutorService executor;
51     * private Future<?> future; // to allow cancellation
52 dl 1.26 * private boolean completed;
53 dl 1.10 * OneShotSubscription(Subscriber<? super Boolean> subscriber,
54 dl 1.17 * ExecutorService executor) {
55 dl 1.10 * this.subscriber = subscriber;
56     * this.executor = executor;
57     * }
58     * public synchronized void request(long n) {
59 dl 1.32 * if (!completed) {
60 dl 1.10 * completed = true;
61 dl 1.32 * if (n <= 0) {
62 dl 1.26 * IllegalArgumentException ex = new IllegalArgumentException();
63 dl 1.22 * executor.execute(() -> subscriber.onError(ex));
64 dl 1.26 * } else {
65 dl 1.21 * future = executor.submit(() -> {
66 dl 1.16 * subscriber.onNext(Boolean.TRUE);
67     * subscriber.onComplete();
68 dl 1.21 * });
69     * }
70 dl 1.10 * }
71     * }
72 dl 1.15 * public synchronized void cancel() {
73 dl 1.17 * completed = true;
74     * if (future != null) future.cancel(false);
75 dl 1.15 * }
76 dl 1.10 * }
77     * }}</pre>
78     *
79 jsr166 1.13 * <p>A {@link Subscriber} arranges that items be requested and
80 dl 1.10 * processed. Items (invocations of {@link Subscriber#onNext}) are
81     * not issued unless requested, but multiple items may be requested.
82     * Many Subscriber implementations can arrange this in the style of
83     * the following example, where a buffer size of 1 single-steps, and
84     * larger sizes usually allow for more efficient overlapped processing
85     * with less communication; for example with a value of 64, this keeps
86 jsr166 1.25 * total outstanding requests between 32 and 64.
87 dl 1.14 * Because Subscriber method invocations for a given {@link
88     * Subscription} are strictly ordered, there is no need for these
89     * methods to use locks or volatiles unless a Subscriber maintains
90     * multiple Subscriptions (in which case it is better to instead
91     * define multiple Subscribers, each with its own Subscription).
92 dl 1.10 *
93     * <pre> {@code
94     * class SampleSubscriber<T> implements Subscriber<T> {
95     * final Consumer<? super T> consumer;
96     * Subscription subscription;
97     * final long bufferSize;
98     * long count;
99     * SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
100     * this.bufferSize = bufferSize;
101     * this.consumer = consumer;
102     * }
103     * public void onSubscribe(Subscription subscription) {
104 dl 1.23 * long initialRequestSize = bufferSize;
105 dl 1.10 * count = bufferSize - bufferSize / 2; // re-request when half consumed
106 dl 1.23 * (this.subscription = subscription).request(initialRequestSize);
107 dl 1.10 * }
108     * public void onNext(T item) {
109     * if (--count <= 0)
110     * subscription.request(count = bufferSize - bufferSize / 2);
111     * consumer.accept(item);
112     * }
113     * public void onError(Throwable ex) { ex.printStackTrace(); }
114     * public void onComplete() {}
115     * }}</pre>
116 dl 1.1 *
117 dl 1.18 * <p>The default value of {@link #defaultBufferSize} may provide a
118     * useful starting point for choosing request sizes and capacities in
119     * Flow components based on expected rates, resources, and usages.
120 dl 1.26 * Or, when flow control is never needed, a subscriber may initially
121     * request an effectively unbounded number of items, as in:
122 dl 1.12 *
123     * <pre> {@code
124     * class UnboundedSubscriber<T> implements Subscriber<T> {
125     * public void onSubscribe(Subscription subscription) {
126     * subscription.request(Long.MAX_VALUE); // effectively unbounded
127     * }
128     * public void onNext(T item) { use(item); }
129     * public void onError(Throwable ex) { ex.printStackTrace(); }
130     * public void onComplete() {}
131     * void use(T item) { ... }
132     * }}</pre>
133 jsr166 1.13 *
134 dl 1.1 * @author Doug Lea
135 jsr166 1.29 * @since 9
136 dl 1.1 */
137     public final class Flow {
138    
139     private Flow() {} // uninstantiable
140    
141     /**
142     * A producer of items (and related control messages) received by
143     * Subscribers. Each current {@link Subscriber} receives the same
144 dl 1.14 * items (via method {@code onNext}) in the same order, unless
145     * drops or errors are encountered. If a Publisher encounters an
146 dl 1.15 * error that does not allow items to be issued to a Subscriber,
147     * that Subscriber receives {@code onError}, and then receives no
148     * further messages. Otherwise, when it is known that no further
149     * messages will be issued to it, a subscriber receives {@code
150     * onComplete}. Publishers ensure that Subscriber method
151 dl 1.14 * invocations for each subscription are strictly ordered in <a
152     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
153     * order.
154     *
155 dl 1.15 * <p>Publishers may vary in policy about whether drops (failures
156     * to issue an item because of resource limitations) are treated
157     * as unrecoverable errors. Publishers may also vary about
158     * whether Subscribers receive items that were produced or
159     * available before they subscribed.
160 dl 1.1 *
161     * @param <T> the published item type
162     */
163     @FunctionalInterface
164     public static interface Publisher<T> {
165     /**
166     * Adds the given Subscriber if possible. If already
167     * subscribed, or the attempt to subscribe fails due to policy
168 dl 1.14 * violations or errors, the Subscriber's {@code onError}
169     * method is invoked with an {@link IllegalStateException}.
170     * Otherwise, the Subscriber's {@code onSubscribe} method is
171     * invoked with a new {@link Subscription}. Subscribers may
172     * enable receiving items by invoking the {@code request}
173     * method of this Subscription, and may unsubscribe by
174     * invoking its {@code cancel} method.
175 dl 1.1 *
176     * @param subscriber the subscriber
177     * @throws NullPointerException if subscriber is null
178     */
179     public void subscribe(Subscriber<? super T> subscriber);
180     }
181    
182     /**
183 dl 1.14 * A receiver of messages. The methods in this interface are
184     * invoked in strict sequential order for each {@link
185     * Subscription}.
186 dl 1.1 *
187     * @param <T> the subscribed item type
188     */
189     public static interface Subscriber<T> {
190     /**
191     * Method invoked prior to invoking any other Subscriber
192     * methods for the given Subscription. If this method throws
193     * an exception, resulting behavior is not guaranteed, but may
194 dl 1.26 * cause the Subscription not to be established or to be cancelled.
195 dl 1.1 *
196 dl 1.15 * <p>Typically, implementations of this method invoke {@code
197     * subscription.request} to enable receiving items.
198 dl 1.1 *
199     * @param subscription a new subscription
200     */
201     public void onSubscribe(Subscription subscription);
202    
203     /**
204     * Method invoked with a Subscription's next item. If this
205     * method throws an exception, resulting behavior is not
206     * guaranteed, but may cause the Subscription to be cancelled.
207     *
208     * @param item the item
209     */
210     public void onNext(T item);
211    
212     /**
213     * Method invoked upon an unrecoverable error encountered by a
214     * Publisher or Subscription, after which no other Subscriber
215     * methods are invoked by the Subscription. If this method
216     * itself throws an exception, resulting behavior is
217     * undefined.
218     *
219     * @param throwable the exception
220     */
221     public void onError(Throwable throwable);
222    
223     /**
224 dl 1.15 * Method invoked when it is known that no additional
225     * Subscriber method invocations will occur for a Subscription
226     * that is not already terminated by error, after which no
227     * other Subscriber methods are invoked by the Subscription.
228     * If this method throws an exception, resulting behavior is
229 dl 1.14 * undefined.
230 dl 1.1 */
231     public void onComplete();
232     }
233    
234     /**
235 dl 1.14 * Message control linking a {@link Publisher} and {@link
236     * Subscriber}. Subscribers receive items only when requested,
237     * and may cancel at any time. The methods in this interface are
238     * intended to be invoked only by their Subscribers; usages in
239     * other contexts have undefined effects.
240 dl 1.1 */
241     public static interface Subscription {
242     /**
243     * Adds the given number {@code n} of items to the current
244     * unfulfilled demand for this subscription. If {@code n} is
245 dl 1.31 * less than or equal to zero, the Subscriber will receive an
246     * {@code onError} signal with an {@link
247     * IllegalArgumentException} argument. Otherwise, the
248     * Subscriber will receive up to {@code n} additional {@code
249     * onNext} invocations (or fewer if terminated).
250 dl 1.1 *
251     * @param n the increment of demand; a value of {@code
252     * Long.MAX_VALUE} may be considered as effectively unbounded
253     */
254     public void request(long n);
255    
256     /**
257 dl 1.14 * Causes the Subscriber to (eventually) stop receiving
258 dl 1.15 * messages. Implementation is best-effort -- additional
259 jsr166 1.27 * messages may be received after invoking this method.
260     * A cancelled subscription need not ever receive an
261     * {@code onComplete} or {@code onError} signal.
262 dl 1.1 */
263     public void cancel();
264     }
265    
266     /**
267 jsr166 1.2 * A component that acts as both a Subscriber and Publisher.
268 dl 1.1 *
269 jsr166 1.2 * @param <T> the subscribed item type
270     * @param <R> the published item type
271     */
272 jsr166 1.5 public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
273 dl 1.1 }
274    
275 dl 1.18 static final int DEFAULT_BUFFER_SIZE = 256;
276    
277     /**
278     * Returns a default value for Publisher or Subscriber buffering,
279     * that may be used in the absence of other constraints.
280     *
281     * @implNote
282     * The current value returned is 256.
283     *
284     * @return the buffer size value
285     */
286     public static int defaultBufferSize() {
287     return DEFAULT_BUFFER_SIZE;
288     }
289 dl 1.1
290     }