ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SubmissionPublisher.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SubmissionPublisher.java (file contents):
Revision 1.58 by jsr166, Fri Jan 29 20:06:49 2016 UTC vs.
Revision 1.59 by dl, Sat Apr 2 18:37:56 2016 UTC

# Line 877 | Line 877 | public class SubmissionPublisher<T> impl
877       */
878      @SuppressWarnings("serial")
879      static final class ConsumerTask<T> extends ForkJoinTask<Void>
880 <        implements Runnable {
880 >        implements Runnable, CompletableFuture.AsynchronousCompletionTask {
881          final BufferedSubscription<T> consumer;
882          ConsumerTask(BufferedSubscription<T> consumer) {
883              this.consumer = consumer;
# Line 930 | Line 930 | public class SubmissionPublisher<T> impl
930       * Blocking control relies on the "waiter" field. Producers set
931       * the field before trying to block, but must then recheck (via
932       * offer) before parking. Signalling then just unparks and clears
933 <     * waiter field. If the producer and consumer are both in the same
934 <     * ForkJoinPool, or consumers are running in commonPool, the
935 <     * producer attempts to help run consumer tasks that it forked
936 <     * before blocking.  To avoid potential cycles, only one level of
937 <     * helping is currently supported.
933 >     * waiter field. If the producer and/or consumer are using a
934 >     * ForkJoinPool, the producer attempts to help run consumer tasks
935 >     * via ForkJoinPool.helpAsyncBlocker before blocking.
936       *
937       * This class uses @Contended and heuristic field declaration
938       * ordering to reduce false-sharing-based memory contention among
# Line 954 | Line 952 | public class SubmissionPublisher<T> impl
952          volatile long demand;              // # unfilled requests
953          int maxCapacity;                   // reduced on OOME
954          int putStat;                       // offer result for ManagedBlocker
957        int helpDepth;                     // nested helping depth (at most 1)
955          volatile int ctl;                  // atomic run state flags
956          volatile int head;                 // next position to take
957          int tail;                          // next position to put
# Line 1107 | Line 1104 | public class SubmissionPublisher<T> impl
1104           * initial offer return 0.
1105           */
1106          final int submit(T item) {
1107 <            int stat; Executor e; ForkJoinWorkerThread w;
1108 <            if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1112 <                ((e = executor) instanceof ForkJoinPool)) {
1113 <                helpDepth = 1;
1114 <                Thread thread = Thread.currentThread();
1115 <                if ((thread instanceof ForkJoinWorkerThread) &&
1116 <                    ((w = (ForkJoinWorkerThread)thread)).getPool() == e)
1117 <                    stat = internalHelpConsume(w.workQueue, item);
1118 <                else if (e == ForkJoinPool.commonPool())
1119 <                    stat = externalHelpConsume
1120 <                        (ForkJoinPool.commonSubmitterQueue(), item);
1121 <                helpDepth = 0;
1122 <            }
1123 <            if (stat == 0 && (stat = offer(item)) == 0) {
1107 >            int stat;
1108 >            if ((stat = offer(item)) == 0) {
1109                  putItem = item;
1110                  timeout = 0L;
1111 <                try {
1112 <                    ForkJoinPool.managedBlock(this);
1113 <                } catch (InterruptedException ie) {
1114 <                    timeout = INTERRUPTED;
1111 >                putStat = 0;
1112 >                ForkJoinPool.helpAsyncBlocker(executor, this);
1113 >                if ((stat = putStat) == 0) {
1114 >                    try {
1115 >                        ForkJoinPool.managedBlock(this);
1116 >                    } catch (InterruptedException ie) {
1117 >                        timeout = INTERRUPTED;
1118 >                    }
1119 >                    stat = putStat;
1120                  }
1131                stat = putStat;
1121                  if (timeout < 0L)
1122                      Thread.currentThread().interrupt();
1123              }
# Line 1136 | Line 1125 | public class SubmissionPublisher<T> impl
1125          }
1126  
1127          /**
1139         * Tries helping for FJ submitter.
1140         */
1141        private int internalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1142            int stat = 0;
1143            if (w != null) {
1144                ForkJoinTask<?> t;
1145                while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1146                    if ((stat = offer(item)) != 0 || !w.tryUnpush(t))
1147                        break;
1148                    ((ConsumerTask<?>)t).consumer.consume();
1149                }
1150            }
1151            return stat;
1152        }
1153
1154        /**
1155         * Tries helping for non-FJ submitter.
1156         */
1157        private int externalHelpConsume(ForkJoinPool.WorkQueue w, T item) {
1158            int stat = 0;
1159            if (w != null) {
1160                ForkJoinTask<?> t;
1161                while ((t = w.peek()) != null && (t instanceof ConsumerTask)) {
1162                    if ((stat = offer(item)) != 0 || !w.trySharedUnpush(t))
1163                        break;
1164                    ((ConsumerTask<?>)t).consumer.consume();
1165                }
1166            }
1167            return stat;
1168        }
1169
1170        /**
1128           * Timeout version; similar to submit.
1129           */
1130          final int timedOffer(T item, long nanos) {
1131 <            int stat; Executor e;
1132 <            if ((stat = offer(item)) == 0 && helpDepth == 0 &&
1176 <                ((e = executor) instanceof ForkJoinPool)) {
1177 <                Thread thread = Thread.currentThread();
1178 <                if (((thread instanceof ForkJoinWorkerThread) &&
1179 <                     ((ForkJoinWorkerThread)thread).getPool() == e) ||
1180 <                    e == ForkJoinPool.commonPool()) {
1181 <                    helpDepth = 1;
1182 <                    ForkJoinTask<?> t;
1183 <                    long deadline = System.nanoTime() + nanos;
1184 <                    while ((t = ForkJoinTask.peekNextLocalTask()) != null &&
1185 <                           (t instanceof ConsumerTask)) {
1186 <                        if ((stat = offer(item)) != 0 ||
1187 <                            (nanos = deadline - System.nanoTime()) <= 0L ||
1188 <                            !t.tryUnfork())
1189 <                            break;
1190 <                        ((ConsumerTask<?>)t).consumer.consume();
1191 <                    }
1192 <                    helpDepth = 0;
1193 <                }
1194 <            }
1195 <            if (stat == 0 && (stat = offer(item)) == 0 &&
1196 <                (timeout = nanos) > 0L) {
1131 >            int stat;
1132 >            if ((stat = offer(item)) == 0 && (timeout = nanos) > 0L) {
1133                  putItem = item;
1134 <                try {
1135 <                    ForkJoinPool.managedBlock(this);
1136 <                } catch (InterruptedException ie) {
1137 <                    timeout = INTERRUPTED;
1134 >                putStat = 0;
1135 >                ForkJoinPool.helpAsyncBlocker(executor, this);
1136 >                if ((stat = putStat) == 0) {
1137 >                    try {
1138 >                        ForkJoinPool.managedBlock(this);
1139 >                    } catch (InterruptedException ie) {
1140 >                        timeout = INTERRUPTED;
1141 >                    }
1142 >                    stat = putStat;
1143                  }
1203                stat = putStat;
1144                  if (timeout < 0L)
1145                      Thread.currentThread().interrupt();
1146              }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines