ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SubmissionPublisher.java (file contents):
Revision 1.28 by jsr166, Sat Jan 24 23:07:47 2015 UTC vs.
Revision 1.29 by dl, Sun Jan 25 15:19:40 2015 UTC

# Line 147 | Line 147 | public class SubmissionPublisher<T> impl
147       * but we expect that subscribing is much less common than
148       * publishing. Unsubscribing occurs only during traversal loops,
149       * when BufferedSubscription methods return negative values
150 <     * signifying that they have been disabled.
150 >     * signifying that they have been disabled.  To reduce
151 >     * head-of-line blocking, submit and offer methods first call
152 >     * BufferedSubscription.offer on each subscriber, and place
153 >     * saturated ones in retries list (using nextRetry field), and
154 >     * retry, possibly blocking or dropping.
155       */
156      BufferedSubscription<T> clients;
157  
# Line 196 | Line 200 | public class SubmissionPublisher<T> impl
200      /**
201       * Adds the given Subscriber unless already subscribed.  If
202       * already subscribed, the Subscriber's onError method is invoked
203 <     * with an IllegalStateException.  Otherwise, upon success, the
204 <     * Subscriber's onSubscribe method is invoked asynchronously with
205 <     * a new Subscription. If onSubscribe throws an exception, the
206 <     * subscription is cancelled. Otherwise, if this
207 <     * SubmissionPublisher is closed, the subscriber's onComplete
208 <     * method is then invoked.  Subscribers may enable receiving items
209 <     * by invoking the {@code request} method of the new Subscription,
210 <     * and may unsubscribe by invoking its cancel method.
203 >     * on the existing subscription with an IllegalStateException.
204 >     * Otherwise, upon success, the Subscriber's onSubscribe method is
205 >     * invoked asynchronously with a new Subscription. If onSubscribe
206 >     * throws an exception, the subscription is cancelled. Otherwise,
207 >     * if this SubmissionPublisher is closed, the subscriber's
208 >     * onComplete method is then invoked.  Subscribers may enable
209 >     * receiving items by invoking the {@code request} method of the
210 >     * new Subscription, and may unsubscribe by invoking its cancel
211 >     * method.
212       *
213       * @param subscriber the subscriber
214       * @throws NullPointerException if subscriber is null
# Line 212 | Line 217 | public class SubmissionPublisher<T> impl
217          if (subscriber == null) throw new NullPointerException();
218          BufferedSubscription<T> subscription =
219              new BufferedSubscription<T>(subscriber, executor, maxBufferCapacity);
215        boolean present = false, complete;
220          synchronized (this) {
221 <            complete = closed;
222 <            BufferedSubscription<T> b = clients, pred = null;
223 <            if (!complete) {
224 <                while (b != null) {
225 <                    BufferedSubscription<T> next = b.next;
226 <                    if (b.isDisabled()) { // remove
227 <                        if (pred == null)
224 <                            clients = next;
225 <                        else
226 <                            pred.next = next;
227 <                    }
228 <                    else if (subscriber.equals(b.subscriber)) {
229 <                        present = true;
230 <                        break;
231 <                    }
221 >            for (BufferedSubscription<T> b = clients, pred = null;;) {
222 >                if (b == null) {
223 >                    subscription.onSubscribe();
224 >                    if (closed)
225 >                        subscription.onComplete();
226 >                    else if (pred == null)
227 >                        clients = subscription;
228                      else
229 <                        pred = b;
230 <                    b = next;
229 >                        pred.next = subscription;
230 >                    break;
231                  }
232 <                if (!present) {
232 >                BufferedSubscription<T> next = b.next;
233 >                if (b.isDisabled()) { // remove
234 >                    b.next = null;    // detach
235                      if (pred == null)
236 <                        clients = subscription;
236 >                        clients = next;
237                      else
238 <                        pred.next = subscription;
239 <                    subscription.onSubscribe();
238 >                        pred.next = next;
239 >                }
240 >                else if (subscriber.equals(b.subscriber)) {
241 >                    b.onError(new IllegalStateException("Duplicate subscribe"));
242 >                    break;
243                  }
244 +                else
245 +                    pred = b;
246 +                b = next;
247              }
248          }
245        if (present)
246            subscriber.onError(new IllegalStateException("Already subscribed"));
247        else if (complete)
248            subscription.onComplete();
249      }
250  
251      /**
# Line 269 | Line 269 | public class SubmissionPublisher<T> impl
269       * @throws RejectedExecutionException if thrown by Executor
270       */
271      public int submit(T item) {
272        /*
273         * To reduce head-of-line blocking, try offer() on each,
274         * place saturated ones in retries list, and later wait
275         * them out.
276         */
272          if (item == null) throw new NullPointerException();
273          int lag = 0;
274          boolean complete;
275          synchronized (this) {
276              complete = closed;
277 <            BufferedSubscription<T> b = clients, pred = null;
283 <            BufferedSubscription<T> retries = null, rtail = null;
277 >            BufferedSubscription<T> b = clients;
278              if (!complete) {
279 +                BufferedSubscription<T> pred = null, r = null, rtail = null;
280                  while (b != null) {
286                    int stat;
281                      BufferedSubscription<T> next = b.next;
282 <                    if ((stat = b.offer(item)) < 0) {
282 >                    int stat = b.offer(item);
283 >                    if (stat < 0) {           // disabled
284 >                        b.next = null;
285                          if (pred == null)
286                              clients = next;
287                          else
288                              pred.next = next;
289                      }
290                      else {
291 <                        if (stat == 0) {
291 >                        if (stat > lag)
292 >                            lag = stat;
293 >                        else if (stat == 0) { // place on retry list
294 >                            b.nextRetry = null;
295                              if (rtail == null)
296 <                                retries = b;
296 >                                r = b;
297                              else
298                                  rtail.nextRetry = b;
299                              rtail = b;
301                            stat = maxBufferCapacity;
300                          }
303                        if (stat > lag)
304                            lag = stat;
301                          pred = b;
302                      }
303                      b = next;
304                  }
305 <                if (retries != null)
306 <                    retrySubmit(retries, item);
305 >                while (r != null) {
306 >                    BufferedSubscription<T> nextRetry = r.nextRetry;
307 >                    r.nextRetry = null;
308 >                    int stat = r.submit(item);
309 >                    if (stat > lag)
310 >                        lag = stat;
311 >                    else if (stat < 0 && clients == r)
312 >                        clients = r.next; // postpone internal unsubscribes
313 >                    r = nextRetry;
314 >                }
315              }
316          }
317          if (complete)
# Line 317 | Line 321 | public class SubmissionPublisher<T> impl
321      }
322  
323      /**
320     * Calls submit on each subscription on retry list.
321     */
322    private void retrySubmit(BufferedSubscription<T> retries, T item) {
323        for (BufferedSubscription<T> r = retries; r != null;) {
324            BufferedSubscription<T> nextRetry = r.nextRetry;
325            r.nextRetry = null;
326            r.submit(item);
327            r = nextRetry;
328        }
329    }
330
331    /**
324       * Publishes the given item, if possible, to each current
325       * subscriber by asynchronously invoking its onNext method. The
326       * item may be dropped by one or more subscribers if resource
# Line 365 | Line 357 | public class SubmissionPublisher<T> impl
357       */
358      public int offer(T item,
359                       BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
360 <        if (item == null) throw new NullPointerException();
369 <        int lag = 0, drops = 0;
370 <        boolean complete;
371 <        synchronized (this) {
372 <            complete = closed;
373 <            BufferedSubscription<T> b = clients, pred = null, next;
374 <            if (!complete) {
375 <                for (; b != null; b = next) {
376 <                    int stat;
377 <                    next = b.next;
378 <                    if ((stat = b.offer(item)) == 0 &&
379 <                        onDrop != null &&
380 <                        onDrop.test(b.subscriber, item))
381 <                        stat = b.offer(item);
382 <                    if (stat < 0) {
383 <                        if (pred == null)
384 <                            clients = next;
385 <                        else
386 <                            pred.next = next;
387 <                    }
388 <                    else {
389 <                        if (stat == 0)
390 <                            ++drops;
391 <                        else if (stat > lag)
392 <                            lag = stat;
393 <                        pred = b;
394 <                    }
395 <                }
396 <            }
397 <        }
398 <        if (complete)
399 <            throw new IllegalStateException("Closed");
400 <        else
401 <            return (drops > 0) ? -drops : lag;
360 >        return doOffer(0L, item, onDrop);
361      }
362  
363      /**
# Line 445 | Line 404 | public class SubmissionPublisher<T> impl
404       */
405      public int offer(T item, long timeout, TimeUnit unit,
406                       BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
407 <        // Same idea as submit
407 >        return doOffer(unit.toNanos(timeout), item, onDrop);
408 >    }
409 >
410 >    /** Common implementation for both forms of offer */
411 >    final int doOffer(long nanos, T item,
412 >                      BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
413          if (item == null) throw new NullPointerException();
450        long nanos = unit.toNanos(timeout);
451        if (nanos <= 0L)
452            return offer(item, onDrop);
414          int lag = 0, drops = 0;
415          boolean complete;
416          synchronized (this) {
417              complete = closed;
418 <            BufferedSubscription<T> b = clients, pred = null;
458 <            BufferedSubscription<T> retries = null, rtail = null;
418 >            BufferedSubscription<T> b = clients;
419              if (!complete) {
420 +                BufferedSubscription<T> pred = null, r = null, rtail = null;
421                  while (b != null) {
461                    int stat;
422                      BufferedSubscription<T> next = b.next;
423 <                    if ((stat = b.offer(item)) < 0) {
423 >                    int stat = b.offer(item);
424 >                    if (stat < 0) {
425 >                        b.next = null;
426                          if (pred == null)
427                              clients = next;
428                          else
429                              pred.next = next;
430                      }
431                      else {
432 <                        if (stat == 0) {
432 >                        if (stat > lag)
433 >                            lag = stat;
434 >                        else if (stat == 0) {
435 >                            b.nextRetry = null;
436                              if (rtail == null)
437 <                                retries = b;
437 >                                r = b;
438                              else
439                                  rtail.nextRetry = b;
440                              rtail = b;
476                            stat = maxBufferCapacity;
441                          }
442 <                        if (stat > lag)
442 >                        else if (stat > lag)
443                              lag = stat;
444                          pred = b;
445                      }
446                      b = next;
447                  }
448 <                if (retries != null)
449 <                    drops = retryTimedOffer(retries, item, nanos, onDrop);
448 >                while (r != null) {
449 >                    BufferedSubscription<T> nextRetry = r.nextRetry;
450 >                    r.nextRetry = null;
451 >                    int stat = (nanos > 0L) ? r.timedOffer(item, nanos) :
452 >                        r.offer(item);
453 >                    if (stat == 0 && onDrop != null &&
454 >                        onDrop.test(r.subscriber, item))
455 >                        stat = r.offer(item);
456 >                    if (stat == 0)
457 >                        ++drops;
458 >                    else if (stat > lag)
459 >                        lag = stat;
460 >                    else if (stat < 0 && clients == r)
461 >                        clients = r.next;
462 >                    r = nextRetry;
463 >                }
464              }
465          }
466          if (complete)
# Line 492 | Line 470 | public class SubmissionPublisher<T> impl
470      }
471  
472      /**
495     * Calls timedOffer on each subscription on retry list, retrying
496     * offer if onDrop true.
497     * @return number of drops
498     */
499    private int retryTimedOffer(BufferedSubscription<T> retries, T item,
500                                long nanos,
501                                BiPredicate<Flow.Subscriber<? super T>, ? super T> onDrop) {
502        int drops = 0;
503        for (BufferedSubscription<T> r = retries; r != null;) {
504            BufferedSubscription<T> nextRetry = r.nextRetry;
505            r.nextRetry = null;
506            if (r.timedOffer(item, nanos) == 0 &&
507                (onDrop == null || !onDrop.test(r.subscriber, item) ||
508                 r.offer(item) == 0))
509                ++drops;
510            r = nextRetry;
511        }
512        return drops;
513    }
514
515
516    /**
473       * Unless already closed, issues onComplete signals to current
474       * subscribers, and disallows subsequent attempts to publish.
475       */
# Line 581 | Line 537 | public class SubmissionPublisher<T> impl
537                  for (BufferedSubscription<T> b = clients; b != null; b = next) {
538                      next = b.next;
539                      if (b.isDisabled()) {
540 +                        b.next = null;
541                          if (pred == null)
542                              clients = next;
543                          else
# Line 609 | Line 566 | public class SubmissionPublisher<T> impl
566                  for (BufferedSubscription<T> b = clients; b != null; b = next) {
567                      next = b.next;
568                      if (b.isDisabled()) {
569 +                        b.next = null;
570                          if (pred == null)
571                              clients = next;
572                          else
# Line 643 | Line 601 | public class SubmissionPublisher<T> impl
601      }
602  
603      /**
604 <     * Returns a list of current subscribers.
604 >     * Returns a list of current subscribers for monitoring and
605 >     * tracking purposes, not for invoking {@link Flow.Subscriber}
606 >     * methods on the subscribers.
607       *
608       * @return list of current subscribers
609       */
# Line 654 | Line 614 | public class SubmissionPublisher<T> impl
614              for (BufferedSubscription<T> b = clients; b != null; b = next) {
615                  next = b.next;
616                  if (b.isDisabled()) {
617 +                    b.next = null;
618                      if (pred == null)
619                          clients = next;
620                      else
# Line 681 | Line 642 | public class SubmissionPublisher<T> impl
642                  for (BufferedSubscription<T> b = clients; b != null; b = next) {
643                      next = b.next;
644                      if (b.isDisabled()) {
645 +                        b.next = null;
646                          if (pred == null)
647                              clients = next;
648                          else
# Line 712 | Line 674 | public class SubmissionPublisher<T> impl
674                  int n; long d;
675                  next = b.next;
676                  if ((n = b.estimateLag()) < 0) {
677 +                    b.next = null;
678                      if (pred == null)
679                          clients = next;
680                      else
# Line 741 | Line 704 | public class SubmissionPublisher<T> impl
704                  int n;
705                  next = b.next;
706                  if ((n = b.estimateLag()) < 0) {
707 +                    b.next = null;
708                      if (pred == null)
709                          clients = next;
710                      else
# Line 799 | Line 763 | public class SubmissionPublisher<T> impl
763       * to execute tasks. These eventually force DISABLED state, but
764       * sometimes not directly. On the task side, termination (clearing
765       * ACTIVE) that would otherwise race with producers or request()
766 <     * calls uses the KEEPALIVE bit to force a recheck.
766 >     * calls uses the CONSUME keep-alive bit to force a recheck.
767       *
768       * The ctl field also manages run state. When DISABLED, no further
769       * updates are possible. Disabling may be preceded by setting
# Line 816 | Line 780 | public class SubmissionPublisher<T> impl
780       * the field before trying to block, but must then recheck (via
781       * offer) before parking. Signalling then just unparks and clears
782       * waiter field. If the producer and consumer are both in the same
783 <     * ForkJoinPool, the producer attempts to help run consumer tasks
784 <     * that it forked before blocking.
783 >     * ForkJoinPool, or consumers are running in commonPool, the
784 >     * producer attempts to help run consumer tasks that it forked
785 >     * before blocking.  To avoid potential cycles, only one level of
786 >     * helping is currently supported.
787       *
788       * This class uses @Contended and heuristic field declaration
789       * ordering to reduce memory contention on BufferedSubscription
# Line 836 | Line 802 | public class SubmissionPublisher<T> impl
802          volatile long demand;              // # unfilled requests
803          int maxCapacity;                   // reduced on OOME
804          int putStat;                       // offer result for ManagedBlocker
805 +        int helpDepth;                     // nested helping depth (at most 1)
806          volatile int ctl;                  // atomic run state flags
807          volatile int head;                 // next position to take
808          volatile int tail;                 // next position to put
# Line 850 | Line 817 | public class SubmissionPublisher<T> impl
817  
818          // ctl values
819          static final int ACTIVE    = 0x01; // consumer task active
820 <        static final int KEEPALIVE = 0x02; // force termination recheck
820 >        static final int CONSUME   = 0x02; // keep-alive for consumer task
821          static final int DISABLED  = 0x04; // final state
822          static final int ERROR     = 0x08; // signal onError then disable
823          static final int SUBSCRIBE = 0x10; // signal onSubscribe
# Line 897 | Line 864 | public class SubmissionPublisher<T> impl
864                  U.putOrderedInt(this, TAIL, t + 1);
865                  stat = size;
866              }
867 <            return (stat > 0 && (ctl & (ACTIVE|KEEPALIVE)) != (ACTIVE|KEEPALIVE)) ?
867 >            return (stat > 0 &&
868 >                    (ctl & (ACTIVE | CONSUME)) != (ACTIVE | CONSUME)) ?
869                  startOnOffer(stat) : stat;
870          }
871  
# Line 961 | Line 929 | public class SubmissionPublisher<T> impl
929  
930          /**
931           * Spins/helps/blocks while offer returns 0.  Called only if
932 <         * initial offer return 0. Tries helping if either the
965 <         * producer and consumers are in same ForkJoinPool, or
966 <         * consumers are in commonPool.
932 >         * initial offer return 0.
933           */
934          final int submit(T item) {
969            Executor e = executor;
970            Thread thread = Thread.currentThread();
935              int stat = 0;
936 <            if (e instanceof ForkJoinPool) {
936 >            Executor e = executor;
937 >            if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
938 >                Thread thread = Thread.currentThread();
939 >                // split cases to help compiler specialization
940                  if ((thread instanceof ForkJoinWorkerThread) &&
941                      ((ForkJoinWorkerThread)thread).getPool() == e) {
942                      ForkJoinTask<?> t;
# Line 977 | Line 944 | public class SubmissionPublisher<T> impl
944                             (t instanceof ConsumerTask)) {
945                          if ((stat = offer(item)) != 0 || !t.tryUnfork())
946                              break;
947 <                        ((ConsumerTask<?>)t).consumer.consume();
947 >                        ((ConsumerTask<?>)t).consumer.helpConsume();
948                      }
949                  }
950                  else if (e == ForkJoinPool.commonPool()) {
# Line 986 | Line 953 | public class SubmissionPublisher<T> impl
953                             (t instanceof ConsumerTask)) {
954                          if ((stat = offer(item)) != 0 || !t.tryUnfork())
955                              break;
956 <                        ((ConsumerTask<?>)t).consumer.consume();
956 >                        ((ConsumerTask<?>)t).consumer.helpConsume();
957                      }
958                  }
959              }
# Line 1009 | Line 976 | public class SubmissionPublisher<T> impl
976           * Timeout version; similar to submit
977           */
978          final int timedOffer(T item, long nanos) {
1012            Executor e = executor;
1013            Thread thread = Thread.currentThread();
979              int stat = 0;
980 <            if ((e instanceof ForkJoinPool) &&
981 <                (((thread instanceof ForkJoinWorkerThread) &&
982 <                  ((ForkJoinWorkerThread)thread).getPool() == e) ||
983 <                 e == ForkJoinPool.commonPool())) {
984 <                ForkJoinTask<?> t;
985 <                long deadline = System.nanoTime() + nanos;
986 <                while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
987 <                       (t instanceof ConsumerTask)) {
988 <                    if ((stat = offer(item)) != 0 ||
989 <                        (nanos = deadline - System.nanoTime()) <= 0L ||
990 <                        !t.tryUnfork())
991 <                        break;
992 <                    ((ConsumerTask<?>)t).consumer.consume();
980 >            Executor e = executor;
981 >            if (helpDepth == 0 && (e instanceof ForkJoinPool)) {
982 >                Thread thread = Thread.currentThread();
983 >                if (((thread instanceof ForkJoinWorkerThread) &&
984 >                     ((ForkJoinWorkerThread)thread).getPool() == e) ||
985 >                    e == ForkJoinPool.commonPool()) {
986 >                    ForkJoinTask<?> t;
987 >                    long deadline = System.nanoTime() + nanos;
988 >                    while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
989 >                           (t instanceof ConsumerTask)) {
990 >                        if ((stat = offer(item)) != 0 ||
991 >                            (nanos = deadline - System.nanoTime()) <= 0L ||
992 >                            !t.tryUnfork())
993 >                            break;
994 >                        ((ConsumerTask<?>)t).consumer.helpConsume();
995 >                    }
996                  }
997              }
998              if (stat == 0 && (stat = offer(item)) == 0 &&
# Line 1042 | Line 1010 | public class SubmissionPublisher<T> impl
1010              return stat;
1011          }
1012  
1013 +        /** Version of consume called when helping in submit or timedOffer */
1014 +        private void helpConsume() {
1015 +            helpDepth = 1; // only one level allowed
1016 +            consume();
1017 +            helpDepth = 0;
1018 +        }
1019 +
1020          /**
1021           * Tries to start consumer task after offer.
1022           * @return -1 if now disabled, else argument
# Line 1054 | Line 1029 | public class SubmissionPublisher<T> impl
1029                      break;
1030                  }
1031                  else if ((c & ACTIVE) != 0) { // ensure keep-alive
1032 <                    if ((c & KEEPALIVE) != 0 ||
1032 >                    if ((c & CONSUME) != 0 ||
1033                          U.compareAndSwapInt(this, CTL, c,
1034 <                                            c | KEEPALIVE))
1034 >                                            c | CONSUME))
1035                          break;
1036                  }
1037                  else if (demand == 0L || tail == head)
1038                      break;
1039                  else if (U.compareAndSwapInt(this, CTL, c,
1040 <                                             c | (ACTIVE | KEEPALIVE))) {
1040 >                                             c | (ACTIVE | CONSUME))) {
1041                      try {
1042                          e.execute(new ConsumerTask<T>(this));
1043                          break;
# Line 1100 | Line 1075 | public class SubmissionPublisher<T> impl
1075           */
1076          final void onError(Throwable ex) {
1077              for (int c;;) {
1078 <                if ((c = ctl) == DISABLED)
1078 >                if (((c = ctl) & (ERROR | DISABLED)) != 0)
1079                      break;
1080                  else if ((c & ACTIVE) != 0) {
1081                      pendingError = ex;
# Line 1148 | Line 1123 | public class SubmissionPublisher<T> impl
1123                  if ((c = ctl) == DISABLED)
1124                      break;
1125                  if (U.compareAndSwapInt(this, CTL, c,
1126 <                                        c | (ACTIVE | KEEPALIVE | COMPLETE))) {
1126 >                                        c | (ACTIVE | CONSUME | COMPLETE))) {
1127                      if ((c & ACTIVE) == 0)
1128                          startOrDisable();
1129                      break;
# Line 1161 | Line 1136 | public class SubmissionPublisher<T> impl
1136                  if ((c = ctl) == DISABLED)
1137                      break;
1138                  if (U.compareAndSwapInt(this, CTL, c,
1139 <                                        c | (ACTIVE | KEEPALIVE | SUBSCRIBE))) {
1139 >                                        c | (ACTIVE | CONSUME | SUBSCRIBE))) {
1140                      if ((c & ACTIVE) == 0)
1141                          startOrDisable();
1142                      break;
# Line 1179 | Line 1154 | public class SubmissionPublisher<T> impl
1154                  if ((c = ctl) == DISABLED)
1155                      break;
1156                  else if ((c & ACTIVE) != 0) {
1157 <                    if (U.compareAndSwapInt(this, CTL, c, c | ERROR))
1157 >                    if (U.compareAndSwapInt(this, CTL, c,
1158 >                                            c | (CONSUME | ERROR)))
1159                          break;
1160                  }
1161                  else if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
# Line 1204 | Line 1180 | public class SubmissionPublisher<T> impl
1180                              if ((c = ctl) == DISABLED)
1181                                  break;
1182                              else if ((c & ACTIVE) != 0) {
1183 <                                if ((c & KEEPALIVE) != 0 ||
1183 >                                if ((c & CONSUME) != 0 ||
1184                                      U.compareAndSwapInt(this, CTL, c,
1185 <                                                        c | KEEPALIVE))
1185 >                                                        c | CONSUME))
1186                                      break;
1187                              }
1188                              else if ((h = head) != tail) {
1189                                  if (U.compareAndSwapInt(this, CTL, c,
1190 <                                                        c | (ACTIVE | KEEPALIVE))) {
1190 >                                                        c | (ACTIVE|CONSUME))) {
1191                                      startOrDisable();
1192                                      break;
1193                                  }
# Line 1279 | Line 1255 | public class SubmissionPublisher<T> impl
1255              return true;
1256          }
1257  
1258 <        /** Consumer loop called only from ConsumerTask */
1258 >        /**
1259 >         * Consumer loop, called from ConsumerTask, or indirectly
1260 >         * via helpConsume when helping during submit.
1261 >         */
1262          final void consume() {
1263              Flow.Subscriber<? super T> s;
1264              if ((s = subscriber) != null) {           // else disabled
# Line 1303 | Line 1282 | public class SubmissionPublisher<T> impl
1282                                  try {
1283                                      s.onSubscribe(this);
1284                                  } catch (Throwable ex) {
1285 <                                    ctl = DISABLED;   // disable on throw
1285 >                                    onError(ex);
1286                                  }
1287                              }
1288                          }
# Line 1313 | Line 1292 | public class SubmissionPublisher<T> impl
1292                          }
1293                      }
1294                      else if (h == tail) {             // apparently empty
1295 <                        if ((c & KEEPALIVE) != 0)     // recheck
1296 <                            U.compareAndSwapInt(this, CTL, c, c & ~KEEPALIVE);
1295 >                        if ((c & CONSUME) != 0)       // recheck
1296 >                            U.compareAndSwapInt(this, CTL, c, c & ~CONSUME);
1297                          else if ((c & COMPLETE) != 0) {
1298 <                            ctl = DISABLED;
1299 <                            try {
1300 <                                s.onComplete();
1301 <                            } catch (Throwable ignore) {
1298 >                            if (U.compareAndSwapInt(this, CTL, c, DISABLED)) {
1299 >                                try {
1300 >                                    s.onComplete();
1301 >                                } catch (Throwable ignore) {
1302 >                                }
1303                              }
1304                          }
1305                          else if (U.compareAndSwapInt(this, CTL, c,
# Line 1328 | Line 1308 | public class SubmissionPublisher<T> impl
1308                      }
1309                      else if (d == 0L) {               // can't consume
1310                          if (demand == 0L) {           // recheck
1311 <                            if ((c & KEEPALIVE) != 0)
1311 >                            if ((c & CONSUME) != 0)
1312                                  U.compareAndSwapInt(this, CTL, c,
1313 <                                                    c & ~KEEPALIVE);
1313 >                                                    c & ~CONSUME);
1314                              else if (U.compareAndSwapInt(this, CTL, c,
1315                                                           c & ~ACTIVE))
1316                                  break;
# Line 1339 | Line 1319 | public class SubmissionPublisher<T> impl
1319                      else if ((a = array) == null || (n = a.length) == 0 ||
1320                               (x = a[i = h & (n - 1)]) == null)
1321                          ;                             // stale; retry
1322 <                    else if ((c & KEEPALIVE) == 0)
1323 <                        U.compareAndSwapInt(this, CTL, c, c | KEEPALIVE);
1322 >                    else if ((c & CONSUME) == 0)
1323 >                        U.compareAndSwapInt(this, CTL, c, c | CONSUME);
1324                      else if (U.compareAndSwapObject(
1325                                   a, (((long)i) << ASHIFT) + ABASE, x, null)) {
1326                          U.putOrderedInt(this, HEAD, ++h);
# Line 1353 | Line 1333 | public class SubmissionPublisher<T> impl
1333                          try {
1334                              @SuppressWarnings("unchecked") T y = (T) x;
1335                              s.onNext(y);
1336 <                        } catch (Throwable ex) {      // disable on throw
1337 <                            ctl = DISABLED;
1336 >                        } catch (Throwable ex) {
1337 >                            onError(ex);
1338                          }
1339                      }
1340                  }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines