ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SubmissionPublisher.java (file contents):
Revision 1.29 by dl, Sun Jan 25 15:19:40 2015 UTC vs.
Revision 1.30 by dl, Sun Jan 25 23:37:31 2015 UTC

# Line 62 | Line 62 | import java.util.function.Supplier;
62   * them.  For example here is a class that periodically publishes the
63   * items generated from a supplier. (In practice you might add methods
64   * to independently start and stop generation, to share schedulers
65 < * among publishers, and so on, or instead use a SubmissionPublisher
66 < * as a component rather than a superclass.)
65 > * among publishers, and so on, or use a SubmissionPublisher as a
66 > * component rather than a superclass.)
67   *
68   * <pre> {@code
69   * class PeriodicPublisher<T> extends SubmissionPublisher<T> {
# Line 199 | Line 199 | public class SubmissionPublisher<T> impl
199  
200      /**
201       * Adds the given Subscriber unless already subscribed.  If
202 <     * already subscribed, the Subscriber's onError method is invoked
203 <     * on the existing subscription with an IllegalStateException.
204 <     * Otherwise, upon success, the Subscriber's onSubscribe method is
205 <     * invoked asynchronously with a new Subscription. If onSubscribe
206 <     * throws an exception, the subscription is cancelled. Otherwise,
207 <     * if this SubmissionPublisher is closed, the subscriber's
208 <     * onComplete method is then invoked.  Subscribers may enable
209 <     * receiving items by invoking the {@code request} method of the
210 <     * new Subscription, and may unsubscribe by invoking its cancel
211 <     * method.
202 >     * already subscribed, the Subscriber's {@code onError} method is
203 >     * invoked on the existing subscription with an {@link
204 >     * IllegalStateException}.  Otherwise, upon success, the
205 >     * Subscriber's {@code onSubscribe} method is invoked
206 >     * asynchronously with a new {@link Flow.Subscription}. If {@code
207 >     * onSubscribe} throws an exception, the subscription is
208 >     * cancelled. Otherwise, if this SubmissionPublisher is closed,
209 >     * the subscriber's {@code onComplete} method is then invoked.
210 >     * Subscribers may enable receiving items by invoking the {@code
211 >     * request} method of the new Subscription, and may unsubscribe by
212 >     * invoking its {@code cancel} method.
213       *
214       * @param subscriber the subscriber
215       * @throws NullPointerException if subscriber is null
# Line 250 | Line 251 | public class SubmissionPublisher<T> impl
251  
252      /**
253       * Publishes the given item to each current subscriber by
254 <     * asynchronously invoking its onNext method, blocking
255 <     * uninterruptibly while resources for any subscriber are
256 <     * unavailable. This method returns an estimate of the maximum lag
257 <     * (number of items submitted but not yet consumed) among all
258 <     * current subscribers. This value is at least one (accounting for
259 <     * this submitted item) if there are any subscribers, else zero.
254 >     * asynchronously invoking its {@link Flow.Subscriber#onNext}
255 >     * method, blocking uninterruptibly while resources for any
256 >     * subscriber are unavailable. This method returns an estimate of
257 >     * the maximum lag (number of items submitted but not yet
258 >     * consumed) among all current subscribers. This value is at least
259 >     * one (accounting for this submitted item) if there are any
260 >     * subscribers, else zero.
261       *
262       * <p>If the Executor for this publisher throws a
263       * RejectedExecutionException (or any other RuntimeException or
264       * Error) when attempting to asynchronously notify subscribers,
265 <     * then this exception is rethrown.
265 >     * then this exception is rethrown, in which case some but not all
266 >     * subscribers may have received this item.
267       *
268       * @param item the (non-null) item to publish
269       * @return the estimated maximum lag among subscribers
# Line 322 | Line 325 | public class SubmissionPublisher<T> impl
325  
326      /**
327       * Publishes the given item, if possible, to each current
328 <     * subscriber by asynchronously invoking its onNext method. The
329 <     * item may be dropped by one or more subscribers if resource
330 <     * limits are exceeded, in which case the given handler (if
331 <     * non-null) is invoked, and if it returns true, retried once.
332 <     * Other calls to methods in this class by other threads are
333 <     * blocked while the handler is invoked.  Unless recovery is
334 <     * assured, options are usually limited to logging the error
335 <     * and/or issuing an onError signal to the subscriber.
328 >     * subscriber by asynchronously invoking its {@link
329 >     * Flow.Subscriber#onNext} method. The item may be dropped by one
330 >     * or more subscribers if resource limits are exceeded, in which
331 >     * case the given handler (if non-null) is invoked, and if it
332 >     * returns true, retried once.  Other calls to methods in this
333 >     * class by other threads are blocked while the handler is
334 >     * invoked.  Unless recovery is assured, options are usually
335 >     * limited to logging the error and/or issuing an {@code onError}
336 >     * signal to the subscriber.
337       *
338       * <p>This method returns a status indicator: If negative, it
339       * represents the (negative) number of drops (failed attempts to
# Line 362 | Line 366 | public class SubmissionPublisher<T> impl
366  
367      /**
368       * Publishes the given item, if possible, to each current
369 <     * subscriber by asynchronously invoking its onNext method,
370 <     * blocking while resources for any subscription are unavailable,
371 <     * up to the specified timeout or the caller thread is
372 <     * interrupted, at which point the given handler (if non-null) is
373 <     * invoked, and if it returns true, retried once. (The drop
374 <     * handler may distinguish timeouts from interrupts by checking
375 <     * whether the current thread is interrupted.) Other calls to
376 <     * methods in this class by other threads are blocked while the
377 <     * handler is invoked.  Unless recovery is assured, options are
378 <     * usually limited to logging the error and/or issuing an onError
379 <     * signal to the subscriber.
369 >     * subscriber by asynchronously invoking its {@link
370 >     * Flow.Subscriber#onNext} method, blocking while resources for
371 >     * any subscription are unavailable, up to the specified timeout
372 >     * or the caller thread is interrupted, at which point the given
373 >     * handler (if non-null) is invoked, and if it returns true,
374 >     * retried once. (The drop handler may distinguish timeouts from
375 >     * interrupts by checking whether the current thread is
376 >     * interrupted.) Other calls to methods in this class by other
377 >     * threads are blocked while the handler is invoked.  Unless
378 >     * recovery is assured, options are usually limited to logging the
379 >     * error and/or issuing an {@code onError} signal to the
380 >     * subscriber.
381       *
382       * <p>This method returns a status indicator: If negative, it
383       * represents the (negative) number of drops (failed attempts to
# Line 470 | Line 475 | public class SubmissionPublisher<T> impl
475      }
476  
477      /**
478 <     * Unless already closed, issues onComplete signals to current
479 <     * subscribers, and disallows subsequent attempts to publish.
478 >     * Unless already closed, issues {@link
479 >     * Flow.Subscriber#onComplete} signals to current subscribers, and
480 >     * disallows subsequent attempts to publish. Upon return, this
481 >     * method does <em>NOT</em> guarantee that all subscribers have
482 >     * yet completed.
483       */
484      public void close() {
485          if (!closed) {
# Line 490 | Line 498 | public class SubmissionPublisher<T> impl
498      }
499  
500      /**
501 <     * Unless already closed, issues onError signals to current
502 <     * subscribers with the given error, and disallows subsequent
503 <     * attempts to publish.
501 >     * Unless already closed, issues {@link Flow.Subscriber#onError}
502 >     * signals to current subscribers with the given error, and
503 >     * disallows subsequent attempts to publish. Upon return, this
504 >     * method does <em>NOT</em> guarantee that all subscribers have
505 >     * yet completed.
506       *
507 <     * @param error the onError argument sent to subscribers
507 >     * @param error the {@code onError} argument sent to subscribers
508       * @throws NullPointerException if error is null
509       */
510      public void closeExceptionally(Throwable error) {
# Line 1042 | Line 1052 | public class SubmissionPublisher<T> impl
1052                          e.execute(new ConsumerTask<T>(this));
1053                          break;
1054                      } catch (RuntimeException | Error ex) { // back out
1055 <                        do {} while ((c = ctl) >= 0 &&
1055 >                        do {} while (((c = ctl) & DISABLED) == 0 &&
1056                                       (c & ACTIVE) != 0 &&
1057                                       !U.compareAndSwapInt(this, CTL, c,
1058                                                            c & ~ACTIVE));

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines