ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/Flow.java
Revision: 1.1
Committed: Sat Mar 26 06:22:50 2016 UTC (8 years, 1 month ago) by jsr166
Branch: MAIN
Log Message:
fork jdk8 maintenance branch for source and jtreg tests

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     /**
10     * Interrelated interfaces and static methods for establishing
11     * flow-controlled components in which {@link Publisher Publishers}
12     * produce items consumed by one or more {@link Subscriber
13     * Subscribers}, each managed by a {@link Subscription
14     * Subscription}.
15     *
16     * <p>These interfaces correspond to the <a
17     * href="http://www.reactive-streams.org/"> reactive-streams</a>
18     * specification. They apply in both concurrent and distributed
19     * asynchronous settings: All (seven) methods are defined in {@code
20     * void} "one-way" message style. Communication relies on a simple form
21     * of flow control (method {@link Subscription#request}) that can be
22     * used to avoid resource management problems that may otherwise occur
23     * in "push" based systems.
24     *
25     * <p><b>Examples.</b> A {@link Publisher} usually defines its own
26     * {@link Subscription} implementation; constructing one in method
27     * {@code subscribe} and issuing it to the calling {@link
28     * Subscriber}. It publishes items to the subscriber asynchronously,
29     * normally using an {@link Executor}. For example, here is a very
30     * simple publisher that only issues (when requested) a single {@code
31     * TRUE} item to a single subscriber. Because the subscriber receives
32     * only a single item, this class does not use buffering and ordering
33     * control required in most implementations (for example {@link
34     * SubmissionPublisher}).
35     *
36     * <pre> {@code
37     * class OneShotPublisher implements Publisher<Boolean> {
38     * private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
39     * private boolean subscribed; // true after first subscribe
40     * public synchronized void subscribe(Subscriber<? super Boolean> subscriber) {
41     * if (subscribed)
42     * subscriber.onError(new IllegalStateException()); // only one allowed
43     * else {
44     * subscribed = true;
45     * subscriber.onSubscribe(new OneShotSubscription(subscriber, executor));
46     * }
47     * }
48     * static class OneShotSubscription implements Subscription {
49     * private final Subscriber<? super Boolean> subscriber;
50     * private final ExecutorService executor;
51     * private Future<?> future; // to allow cancellation
52     * private boolean completed;
53     * OneShotSubscription(Subscriber<? super Boolean> subscriber,
54     * ExecutorService executor) {
55     * this.subscriber = subscriber;
56     * this.executor = executor;
57     * }
58     * public synchronized void request(long n) {
59     * if (n != 0 && !completed) {
60     * completed = true;
61     * if (n < 0) {
62     * IllegalArgumentException ex = new IllegalArgumentException();
63     * executor.execute(() -> subscriber.onError(ex));
64     * } else {
65     * future = executor.submit(() -> {
66     * subscriber.onNext(Boolean.TRUE);
67     * subscriber.onComplete();
68     * });
69     * }
70     * }
71     * }
72     * public synchronized void cancel() {
73     * completed = true;
74     * if (future != null) future.cancel(false);
75     * }
76     * }
77     * }}</pre>
78     *
79     * <p>A {@link Subscriber} arranges that items be requested and
80     * processed. Items (invocations of {@link Subscriber#onNext}) are
81     * not issued unless requested, but multiple items may be requested.
82     * Many Subscriber implementations can arrange this in the style of
83     * the following example, where a buffer size of 1 single-steps, and
84     * larger sizes usually allow for more efficient overlapped processing
85     * with less communication; for example with a value of 64, this keeps
86     * total outstanding requests between 32 and 64.
87     * Because Subscriber method invocations for a given {@link
88     * Subscription} are strictly ordered, there is no need for these
89     * methods to use locks or volatiles unless a Subscriber maintains
90     * multiple Subscriptions (in which case it is better to instead
91     * define multiple Subscribers, each with its own Subscription).
92     *
93     * <pre> {@code
94     * class SampleSubscriber<T> implements Subscriber<T> {
95     * final Consumer<? super T> consumer;
96     * Subscription subscription;
97     * final long bufferSize;
98     * long count;
99     * SampleSubscriber(long bufferSize, Consumer<? super T> consumer) {
100     * this.bufferSize = bufferSize;
101     * this.consumer = consumer;
102     * }
103     * public void onSubscribe(Subscription subscription) {
104     * long initialRequestSize = bufferSize;
105     * count = bufferSize - bufferSize / 2; // re-request when half consumed
106     * (this.subscription = subscription).request(initialRequestSize);
107     * }
108     * public void onNext(T item) {
109     * if (--count <= 0)
110     * subscription.request(count = bufferSize - bufferSize / 2);
111     * consumer.accept(item);
112     * }
113     * public void onError(Throwable ex) { ex.printStackTrace(); }
114     * public void onComplete() {}
115     * }}</pre>
116     *
117     * <p>The default value of {@link #defaultBufferSize} may provide a
118     * useful starting point for choosing request sizes and capacities in
119     * Flow components based on expected rates, resources, and usages.
120     * Or, when flow control is never needed, a subscriber may initially
121     * request an effectively unbounded number of items, as in:
122     *
123     * <pre> {@code
124     * class UnboundedSubscriber<T> implements Subscriber<T> {
125     * public void onSubscribe(Subscription subscription) {
126     * subscription.request(Long.MAX_VALUE); // effectively unbounded
127     * }
128     * public void onNext(T item) { use(item); }
129     * public void onError(Throwable ex) { ex.printStackTrace(); }
130     * public void onComplete() {}
131     * void use(T item) { ... }
132     * }}</pre>
133     *
134     * @author Doug Lea
135     * @since 9
136     */
137     public final class Flow {
138    
139     private Flow() {} // uninstantiable
140    
141     /**
142     * A producer of items (and related control messages) received by
143     * Subscribers. Each current {@link Subscriber} receives the same
144     * items (via method {@code onNext}) in the same order, unless
145     * drops or errors are encountered. If a Publisher encounters an
146     * error that does not allow items to be issued to a Subscriber,
147     * that Subscriber receives {@code onError}, and then receives no
148     * further messages. Otherwise, when it is known that no further
149     * messages will be issued to it, a subscriber receives {@code
150     * onComplete}. Publishers ensure that Subscriber method
151     * invocations for each subscription are strictly ordered in <a
152     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
153     * order.
154     *
155     * <p>Publishers may vary in policy about whether drops (failures
156     * to issue an item because of resource limitations) are treated
157     * as unrecoverable errors. Publishers may also vary about
158     * whether Subscribers receive items that were produced or
159     * available before they subscribed.
160     *
161     * @param <T> the published item type
162     */
163     @FunctionalInterface
164     public static interface Publisher<T> {
165     /**
166     * Adds the given Subscriber if possible. If already
167     * subscribed, or the attempt to subscribe fails due to policy
168     * violations or errors, the Subscriber's {@code onError}
169     * method is invoked with an {@link IllegalStateException}.
170     * Otherwise, the Subscriber's {@code onSubscribe} method is
171     * invoked with a new {@link Subscription}. Subscribers may
172     * enable receiving items by invoking the {@code request}
173     * method of this Subscription, and may unsubscribe by
174     * invoking its {@code cancel} method.
175     *
176     * @param subscriber the subscriber
177     * @throws NullPointerException if subscriber is null
178     */
179     public void subscribe(Subscriber<? super T> subscriber);
180     }
181    
182     /**
183     * A receiver of messages. The methods in this interface are
184     * invoked in strict sequential order for each {@link
185     * Subscription}.
186     *
187     * @param <T> the subscribed item type
188     */
189     public static interface Subscriber<T> {
190     /**
191     * Method invoked prior to invoking any other Subscriber
192     * methods for the given Subscription. If this method throws
193     * an exception, resulting behavior is not guaranteed, but may
194     * cause the Subscription not to be established or to be cancelled.
195     *
196     * <p>Typically, implementations of this method invoke {@code
197     * subscription.request} to enable receiving items.
198     *
199     * @param subscription a new subscription
200     */
201     public void onSubscribe(Subscription subscription);
202    
203     /**
204     * Method invoked with a Subscription's next item. If this
205     * method throws an exception, resulting behavior is not
206     * guaranteed, but may cause the Subscription to be cancelled.
207     *
208     * @param item the item
209     */
210     public void onNext(T item);
211    
212     /**
213     * Method invoked upon an unrecoverable error encountered by a
214     * Publisher or Subscription, after which no other Subscriber
215     * methods are invoked by the Subscription. If this method
216     * itself throws an exception, resulting behavior is
217     * undefined.
218     *
219     * @param throwable the exception
220     */
221     public void onError(Throwable throwable);
222    
223     /**
224     * Method invoked when it is known that no additional
225     * Subscriber method invocations will occur for a Subscription
226     * that is not already terminated by error, after which no
227     * other Subscriber methods are invoked by the Subscription.
228     * If this method throws an exception, resulting behavior is
229     * undefined.
230     */
231     public void onComplete();
232     }
233    
234     /**
235     * Message control linking a {@link Publisher} and {@link
236     * Subscriber}. Subscribers receive items only when requested,
237     * and may cancel at any time. The methods in this interface are
238     * intended to be invoked only by their Subscribers; usages in
239     * other contexts have undefined effects.
240     */
241     public static interface Subscription {
242     /**
243     * Adds the given number {@code n} of items to the current
244     * unfulfilled demand for this subscription. If {@code n} is
245     * negative, the Subscriber will receive an {@code onError}
246     * signal with an {@link IllegalArgumentException} argument.
247     * Otherwise, the Subscriber will receive up to {@code n}
248     * additional {@code onNext} invocations (or fewer if
249     * terminated).
250     *
251     * @param n the increment of demand; a value of {@code
252     * Long.MAX_VALUE} may be considered as effectively unbounded
253     */
254     public void request(long n);
255    
256     /**
257     * Causes the Subscriber to (eventually) stop receiving
258     * messages. Implementation is best-effort -- additional
259     * messages may be received after invoking this method.
260     * A cancelled subscription need not ever receive an
261     * {@code onComplete} or {@code onError} signal.
262     */
263     public void cancel();
264     }
265    
266     /**
267     * A component that acts as both a Subscriber and Publisher.
268     *
269     * @param <T> the subscribed item type
270     * @param <R> the published item type
271     */
272     public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
273     }
274    
275     static final int DEFAULT_BUFFER_SIZE = 256;
276    
277     /**
278     * Returns a default value for Publisher or Subscriber buffering,
279     * that may be used in the absence of other constraints.
280     *
281     * @implNote
282     * The current value returned is 256.
283     *
284     * @return the buffer size value
285     */
286     public static int defaultBufferSize() {
287     return DEFAULT_BUFFER_SIZE;
288     }
289    
290     }