ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SubmissionPublisher.java (file contents):
Revision 1.23 by jsr166, Thu Jan 22 02:44:39 2015 UTC vs.
Revision 1.24 by dl, Thu Jan 22 14:27:07 2015 UTC

# Line 36 | Line 36 | import java.util.function.Supplier;
36   * nearest power of two and/or bounded by the largest value supported
37   * by this implementation.)  Invocations of {@link
38   * Flow.Subscription#request} do not directly result in buffer
39 < * expansion, but risk saturation if unfulfilled requests exceed the
39 > * expansion, but risk saturation if unfilled requests exceed the
40   * maximum capacity.  Choices of buffer parameters rely on expected
41   * rates, resources, and usages, that usually benefit from empirical
42 < * testing.  As a first guess, consider a value of 256.
42 > * testing.  As a first guess, consider a value of 64.
43   *
44   * <p>Publication methods support different policies about what to do
45   * when buffers are saturated. Method {@link #submit} blocks until
# Line 162 | Line 162 | public class SubmissionPublisher<T> impl
162       * async delivery to subscribers, and with the given maximum
163       * buffer size for each subscriber. In the absence of other
164       * constraints, consider using {@code ForkJoinPool.commonPool(),
165 <     * 256}.
165 >     * 64}.
166       *
167       * @param executor the executor to use for async delivery,
168       * supporting creation of at least one independent thread
# Line 262 | Line 262 | public class SubmissionPublisher<T> impl
262          synchronized (this) {
263              if (closed)
264                  throw new IllegalStateException("Closed");
265 <            BufferedSubscription<T> pred = null, next;
266 <            for (BufferedSubscription<T> b = clients; b != null; b = next) {
265 >            /*
266 >             * To reduce head-of-line blocking, try offer() on each,
267 >             * place saturated ones in retries list, and later wait
268 >             * them out.
269 >             */
270 >            BufferedSubscription<T> b = clients, retries = null,
271 >                rtail = null, pred = null, next;
272 >            for ( ; b != null; b = next) {
273                  int stat;
274                  next = b.next;
275 <                if ((stat = b.submit(item)) < 0) {
275 >                if ((stat = b.offer(item)) < 0) {
276                      if (pred == null)
277                          clients = next;
278                      else
279                          pred.next = next;
280                  }
281                  else {
282 <                    pred = b;
282 >                    if (stat == 0) {
283 >                        if (rtail == null)
284 >                            retries = b;
285 >                        else
286 >                            rtail.nextRetry = b;
287 >                        rtail = b;
288 >                        stat = maxBufferCapacity;
289 >                    }
290                      if (stat > lag)
291                          lag = stat;
292 +                    pred = b;
293                  }
294              }
295 +            if (retries != null)
296 +                retrySubmit(retries, item);
297          }
298          return lag;
299      }
300  
301      /**
302 +     * Calls submit on each subscription on retry list.
303 +     */
304 +    private void retrySubmit(BufferedSubscription<T> retries, T item) {
305 +        for (BufferedSubscription<T> r = retries; r != null;) {
306 +            BufferedSubscription<T> nextRetry = r.nextRetry;
307 +            r.nextRetry = null;
308 +            r.submit(item);
309 +            r = nextRetry;
310 +        }
311 +    }
312 +
313 +    /**
314       * Publishes the given item, if possible, to each current
315       * subscriber by asynchronously invoking its onNext method. The
316       * item may be dropped by one or more subscribers if resource
# Line 690 | Line 718 | public class SubmissionPublisher<T> impl
718       * Execution control is managed using the ACTIVE ctl bit. We
719       * ensure that a task is active when consumable items (and
720       * usually, SUBSCRIBE, ERROR or COMPLETE signals) are present and
721 <     * there is demand (unfulfilled requests).  This is complicated on
721 >     * there is demand (unfilled requests).  This is complicated on
722       * the creation side by the possibility of exceptions when trying
723       * to execute tasks. These eventually force DISABLED state, but
724       * sometimes not directly. On the task side, termination (clearing
# Line 742 | Line 770 | public class SubmissionPublisher<T> impl
770          volatile Thread waiter;           // blocked producer thread
771          T putItem;                        // for offer within ManagedBlocker
772          BufferedSubscription<T> next;     // used only by publisher
773 +        BufferedSubscription<T> nextRetry;// used only by publisher
774  
775          // ctl values
776          static final int ACTIVE    = 0x01; // consumer task active
# Line 819 | Line 848 | public class SubmissionPublisher<T> impl
848           */
849          final int growAndAdd(Object[] oldArray, T item) {
850              int oldLen, newLen;
851 <            if (oldArray != null)
823 <                newLen = (oldLen = oldArray.length) << 1;
824 <            else if (ctl >= 0) {
851 >            if (oldArray == null) {
852                  oldLen = 0;
853 <                newLen = (maxCapacity < MINCAP) ? maxCapacity : MINCAP;
853 >                newLen = (maxCapacity >= MINCAP ? MINCAP :
854 >                          maxCapacity >= 2 ? maxCapacity : 2);
855              }
856 <            else
857 <                return -1;                        // disabled
830 <            if (oldLen >= maxCapacity || newLen <= 0)
856 >            else if ((oldLen = oldArray.length) >= maxCapacity ||
857 >                     (newLen = oldLen << 1) <= 0)
858                  return 0;                         // cannot grow
859 +            if (ctl == DISABLED)
860 +                return -1;
861              Object[] newArray;
862              try {
863                  newArray = new Object[newLen];
864              } catch (Throwable ex) {              // try to cope with OOME
865 <                if (oldLen > 0)                   // avoid continuous failure
866 <                    maxCapacity = oldLen;
865 >                if (oldLen > 0)
866 >                    maxCapacity = oldLen;        // avoid continuous failure
867                  return 0;
868              }
869              array = newArray;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines