ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/CompletableFuture.java (file contents):
Revision 1.98 by dl, Tue Nov 26 18:36:16 2013 UTC vs.
Revision 1.99 by dl, Sun Apr 6 20:58:46 2014 UTC

# Line 109 | Line 109 | public class CompletableFuture<T> implem
109       * others -- see method postComplete).  Because post-processing
110       * may race with direct calls, class Completion opportunistically
111       * extends AtomicInteger so callers can claim the action via
112 <     * compareAndSet(0, 1).  The Completion.run methods are all
112 >     * compareAndSet(0, 1).  The Completion.trigger methods are all
113       * written a boringly similar uniform way (that sometimes includes
114       * unnecessary-looking checks, kept to maintain uniformity).
115       * There are enough dimensions upon which they differ that
# Line 145 | Line 145 | public class CompletableFuture<T> implem
145      // Basic utilities for triggering and processing completions
146  
147      /**
148 <     * Removes and signals all waiting threads and runs all completions.
148 >     * Triggers completion with the encoding of the given arguments:
149 >     * if the exception is non-null, encodes it as a wrapped
150 >     * CompletionException unless it is one already.  Otherwise uses
151 >     * the given result, boxed as NIL if null.
152       */
153 <    final void postComplete() {
153 >    final void setInternalResult(T v, Throwable ex) {
154 >        if (result == null)
155 >            UNSAFE.compareAndSwapObject
156 >                (this, RESULT, null,
157 >                 (ex == null) ? (v == null) ? NIL : v :
158 >                 new AltResult((ex instanceof CompletionException) ? ex :
159 >                               new CompletionException(ex)));
160 >    }
161 >
162 >    /**
163 >     * Removes and signals all waiting threads
164 >     */
165 >    final void removeAndSignalWaiters() {
166          WaitNode q; Thread t;
167          while ((q = waiters) != null) {
168              if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
# Line 156 | Line 171 | public class CompletableFuture<T> implem
171                  LockSupport.unpark(t);
172              }
173          }
174 +    }
175  
176 <        CompletionNode h; Completion c;
177 <        while ((h = completions) != null) {
178 <            if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
179 <                (c = h.completion) != null)
180 <                c.run();
176 >    /**
177 >     * Triggers all enabled completions reachable from b.  Loopifies
178 >     * the final recursive call for each stage to avoid potential
179 >     * StackOverflowErrors in cases of long linear chains.
180 >     *
181 >     * @param b if non-null, a completed CompletableFuture
182 >     */
183 >    static final void removeAndTriggerCompletions(CompletableFuture<?> b) {
184 >        CompletionNode h; Completion c; CompletableFuture<?> f;
185 >        while (b != null && (h = b.completions) != null) {
186 >            if (UNSAFE.compareAndSwapObject(b, COMPLETIONS, h, h.next) &&
187 >                (c = h.completion) != null &&
188 >                (f = c.trigger()) != null &&
189 >                f.result != null) {
190 >                f.removeAndSignalWaiters();
191 >                if (f.completions != null) {
192 >                    if (b.completions == null)
193 >                        b = f; // tail-recurse
194 >                    else
195 >                        removeAndTriggerCompletions(f);
196 >                }
197 >            }
198          }
199      }
200  
201      /**
202 <     * Triggers completion with the encoding of the given arguments:
170 <     * if the exception is non-null, encodes it as a wrapped
171 <     * CompletionException unless it is one already.  Otherwise uses
172 <     * the given result, boxed as NIL if null.
202 >     * Sets result, signals waiters, and triggers dependents
203       */
204      final void internalComplete(T v, Throwable ex) {
205 <        if (result == null)
206 <            UNSAFE.compareAndSwapObject
207 <                (this, RESULT, null,
178 <                 (ex == null) ? (v == null) ? NIL : v :
179 <                 new AltResult((ex instanceof CompletionException) ? ex :
180 <                               new CompletionException(ex)));
181 <        postComplete(); // help out even if not triggered
205 >        setInternalResult(v, ex);
206 >        removeAndSignalWaiters();
207 >        removeAndTriggerCompletions(this);
208      }
209  
210 +
211      /**
212 <     * If triggered, helps release and/or process completions.
212 >     * Signals waiters and triggers dependents. Call only if known to
213 >     * be completed.
214 >     */
215 >    final void postComplete() {
216 >        removeAndSignalWaiters();
217 >        removeAndTriggerCompletions(this);
218 >    }
219 >
220 >    /**
221 >     * If completed, helps signal waiters and trigger dependents
222       */
223      final void helpPostComplete() {
224 <        if (result != null)
225 <            postComplete();
224 >        if (result != null) {
225 >            removeAndSignalWaiters();
226 >            removeAndTriggerCompletions(this);
227 >        }
228      }
229  
230      /* ------------- waiting for completions -------------- */
# Line 644 | Line 682 | public class CompletableFuture<T> implem
682  
683      // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
684      @SuppressWarnings("serial")
685 <    abstract static class Completion extends AtomicInteger implements Runnable {
685 >    abstract static class Completion extends AtomicInteger {
686 >        /**
687 >         * Complete a dependent Completablefuture if enabled
688 >         * @return the dependent Completablefuture
689 >         */
690 >        public abstract CompletableFuture<?> trigger();
691      }
692  
693      static final class ThenApply<T,U> extends Completion {
# Line 659 | Line 702 | public class CompletableFuture<T> implem
702              this.src = src; this.fn = fn; this.dst = dst;
703              this.executor = executor;
704          }
705 <        public final void run() {
705 >        public final CompletableFuture<?> trigger() {
706              final CompletableFuture<? extends T> a;
707              final Function<? super T,? extends U> fn;
708              final CompletableFuture<U> dst;
# Line 691 | Line 734 | public class CompletableFuture<T> implem
734                      }
735                  }
736                  if (e == null || ex != null)
737 <                    dst.internalComplete(u, ex);
737 >                    dst.setInternalResult(u, ex);
738              }
739 +            return dst;
740          }
741          private static final long serialVersionUID = 5232453952276885070L;
742      }
# Line 709 | Line 753 | public class CompletableFuture<T> implem
753              this.src = src; this.fn = fn; this.dst = dst;
754              this.executor = executor;
755          }
756 <        public final void run() {
756 >        public final CompletableFuture<?> trigger() {
757              final CompletableFuture<? extends T> a;
758              final Consumer<? super T> fn;
759              final CompletableFuture<?> dst;
# Line 740 | Line 784 | public class CompletableFuture<T> implem
784                      }
785                  }
786                  if (e == null || ex != null)
787 <                    dst.internalComplete(null, ex);
787 >                    dst.setInternalResult(null, ex);
788              }
789 +            return dst;
790          }
791          private static final long serialVersionUID = 5232453952276885070L;
792      }
# Line 758 | Line 803 | public class CompletableFuture<T> implem
803              this.src = src; this.fn = fn; this.dst = dst;
804              this.executor = executor;
805          }
806 <        public final void run() {
806 >        public final CompletableFuture<?> trigger() {
807              final CompletableFuture<?> a;
808              final Runnable fn;
809              final CompletableFuture<Void> dst;
# Line 784 | Line 829 | public class CompletableFuture<T> implem
829                      }
830                  }
831                  if (e == null || ex != null)
832 <                    dst.internalComplete(null, ex);
832 >                    dst.setInternalResult(null, ex);
833              }
834 +            return dst;
835          }
836          private static final long serialVersionUID = 5232453952276885070L;
837      }
# Line 805 | Line 851 | public class CompletableFuture<T> implem
851              this.fn = fn; this.dst = dst;
852              this.executor = executor;
853          }
854 <        public final void run() {
854 >        public final CompletableFuture<?> trigger() {
855              final CompletableFuture<? extends T> a;
856              final CompletableFuture<? extends U> b;
857              final BiFunction<? super T,? super U,? extends V> fn;
# Line 850 | Line 896 | public class CompletableFuture<T> implem
896                      }
897                  }
898                  if (e == null || ex != null)
899 <                    dst.internalComplete(v, ex);
899 >                    dst.setInternalResult(v, ex);
900              }
901 +            return dst;
902          }
903          private static final long serialVersionUID = 5232453952276885070L;
904      }
# Line 871 | Line 918 | public class CompletableFuture<T> implem
918              this.fn = fn; this.dst = dst;
919              this.executor = executor;
920          }
921 <        public final void run() {
921 >        public final CompletableFuture<?> trigger() {
922              final CompletableFuture<? extends T> a;
923              final CompletableFuture<? extends U> b;
924              final BiConsumer<? super T,? super U> fn;
# Line 915 | Line 962 | public class CompletableFuture<T> implem
962                      }
963                  }
964                  if (e == null || ex != null)
965 <                    dst.internalComplete(null, ex);
965 >                    dst.setInternalResult(null, ex);
966              }
967 +            return dst;
968          }
969          private static final long serialVersionUID = 5232453952276885070L;
970      }
# Line 936 | Line 984 | public class CompletableFuture<T> implem
984              this.fn = fn; this.dst = dst;
985              this.executor = executor;
986          }
987 <        public final void run() {
987 >        public final CompletableFuture<?> trigger() {
988              final CompletableFuture<?> a;
989              final CompletableFuture<?> b;
990              final Runnable fn;
# Line 967 | Line 1015 | public class CompletableFuture<T> implem
1015                      }
1016                  }
1017                  if (e == null || ex != null)
1018 <                    dst.internalComplete(null, ex);
1018 >                    dst.setInternalResult(null, ex);
1019              }
1020 +            return dst;
1021          }
1022          private static final long serialVersionUID = 5232453952276885070L;
1023      }
# Line 982 | Line 1031 | public class CompletableFuture<T> implem
1031                        CompletableFuture<Void> dst) {
1032              this.src = src; this.snd = snd; this.dst = dst;
1033          }
1034 <        public final void run() {
1034 >        public final CompletableFuture<?> trigger() {
1035              final CompletableFuture<?> a;
1036              final CompletableFuture<?> b;
1037              final CompletableFuture<Void> dst;
# Line 999 | Line 1048 | public class CompletableFuture<T> implem
1048                      ex = null;
1049                  if (ex == null && (s instanceof AltResult))
1050                      ex = ((AltResult)s).ex;
1051 <                dst.internalComplete(null, ex);
1051 >                dst.setInternalResult(null, ex);
1052              }
1053 +            return dst;
1054          }
1055          private static final long serialVersionUID = 5232453952276885070L;
1056      }
# Line 1020 | Line 1070 | public class CompletableFuture<T> implem
1070              this.fn = fn; this.dst = dst;
1071              this.executor = executor;
1072          }
1073 <        public final void run() {
1073 >        public final CompletableFuture<?> trigger() {
1074              final CompletableFuture<? extends T> a;
1075              final CompletableFuture<? extends T> b;
1076              final Function<? super T,? extends U> fn;
# Line 1053 | Line 1103 | public class CompletableFuture<T> implem
1103                      }
1104                  }
1105                  if (e == null || ex != null)
1106 <                    dst.internalComplete(u, ex);
1106 >                    dst.setInternalResult(u, ex);
1107              }
1108 +            return dst;
1109          }
1110          private static final long serialVersionUID = 5232453952276885070L;
1111      }
# Line 1074 | Line 1125 | public class CompletableFuture<T> implem
1125              this.fn = fn; this.dst = dst;
1126              this.executor = executor;
1127          }
1128 <        public final void run() {
1128 >        public final CompletableFuture<?> trigger() {
1129              final CompletableFuture<? extends T> a;
1130              final CompletableFuture<? extends T> b;
1131              final Consumer<? super T> fn;
# Line 1106 | Line 1157 | public class CompletableFuture<T> implem
1157                      }
1158                  }
1159                  if (e == null || ex != null)
1160 <                    dst.internalComplete(null, ex);
1160 >                    dst.setInternalResult(null, ex);
1161              }
1162 +            return dst;
1163          }
1164          private static final long serialVersionUID = 5232453952276885070L;
1165      }
# Line 1127 | Line 1179 | public class CompletableFuture<T> implem
1179              this.fn = fn; this.dst = dst;
1180              this.executor = executor;
1181          }
1182 <        public final void run() {
1182 >        public final CompletableFuture<?> trigger() {
1183              final CompletableFuture<?> a;
1184              final CompletableFuture<?> b;
1185              final Runnable fn;
# Line 1154 | Line 1206 | public class CompletableFuture<T> implem
1206                      }
1207                  }
1208                  if (e == null || ex != null)
1209 <                    dst.internalComplete(null, ex);
1209 >                    dst.setInternalResult(null, ex);
1210              }
1211 +            return dst;
1212          }
1213          private static final long serialVersionUID = 5232453952276885070L;
1214      }
# Line 1169 | Line 1222 | public class CompletableFuture<T> implem
1222                       CompletableFuture<Object> dst) {
1223              this.src = src; this.snd = snd; this.dst = dst;
1224          }
1225 <        public final void run() {
1225 >        public final CompletableFuture<?> trigger() {
1226              final CompletableFuture<?> a;
1227              final CompletableFuture<?> b;
1228              final CompletableFuture<Object> dst;
# Line 1186 | Line 1239 | public class CompletableFuture<T> implem
1239                      ex = null;
1240                      t = r;
1241                  }
1242 <                dst.internalComplete(t, ex);
1242 >                dst.setInternalResult(t, ex);
1243              }
1244 +            return dst;
1245          }
1246          private static final long serialVersionUID = 5232453952276885070L;
1247      }
# Line 1201 | Line 1255 | public class CompletableFuture<T> implem
1255                              CompletableFuture<T> dst) {
1256              this.src = src; this.fn = fn; this.dst = dst;
1257          }
1258 <        public final void run() {
1258 >        public final CompletableFuture<?> trigger() {
1259              final CompletableFuture<? extends T> a;
1260              final Function<? super Throwable, ? extends T> fn;
1261              final CompletableFuture<T> dst;
# Line 1223 | Line 1277 | public class CompletableFuture<T> implem
1277                      @SuppressWarnings("unchecked") T tr = (T) r;
1278                      t = tr;
1279                  }
1280 <                dst.internalComplete(t, dx);
1280 >                dst.setInternalResult(t, dx);
1281              }
1282 +            return dst;
1283          }
1284          private static final long serialVersionUID = 5232453952276885070L;
1285      }
# Line 1241 | Line 1296 | public class CompletableFuture<T> implem
1296              this.src = src; this.fn = fn; this.dst = dst;
1297              this.executor = executor;
1298          }
1299 <        public final void run() {
1299 >        public final CompletableFuture<?> trigger() {
1300              final CompletableFuture<? extends T> a;
1301              final BiConsumer<? super T, ? super Throwable> fn;
1302              final CompletableFuture<T> dst;
# Line 1271 | Line 1326 | public class CompletableFuture<T> implem
1326                      dx = rex;
1327                  }
1328                  if (e == null || dx != null)
1329 <                    dst.internalComplete(t, ex != null ? ex : dx);
1329 >                    dst.setInternalResult(t, ex != null ? ex : dx);
1330              }
1331 +            return dst;
1332          }
1333          private static final long serialVersionUID = 5232453952276885070L;
1334      }
# Line 1284 | Line 1340 | public class CompletableFuture<T> implem
1340                   CompletableFuture<T> dst) {
1341              this.src = src; this.dst = dst;
1342          }
1343 <        public final void run() {
1343 >        public final CompletableFuture<?> trigger() {
1344              final CompletableFuture<?> a;
1345              final CompletableFuture<T> dst;
1346              Object r; T t; Throwable ex;
# Line 1301 | Line 1357 | public class CompletableFuture<T> implem
1357                      @SuppressWarnings("unchecked") T tr = (T) r;
1358                      t = tr;
1359                  }
1360 <                dst.internalComplete(t, ex);
1360 >                dst.setInternalResult(t, ex);
1361              }
1362 +            return dst;
1363          }
1364          private static final long serialVersionUID = 5232453952276885070L;
1365      }
# Line 1315 | Line 1372 | public class CompletableFuture<T> implem
1372                        CompletableFuture<Void> dst) {
1373              this.src = src; this.dst = dst;
1374          }
1375 <        public final void run() {
1375 >        public final CompletableFuture<?> trigger() {
1376              final CompletableFuture<?> a;
1377              final CompletableFuture<Void> dst;
1378              Object r; Throwable ex;
# Line 1327 | Line 1384 | public class CompletableFuture<T> implem
1384                      ex = ((AltResult)r).ex;
1385                  else
1386                      ex = null;
1387 <                dst.internalComplete(null, ex);
1387 >                dst.setInternalResult(null, ex);
1388              }
1389 +            return dst;
1390          }
1391          private static final long serialVersionUID = 5232453952276885070L;
1392      }
# Line 1345 | Line 1403 | public class CompletableFuture<T> implem
1403              this.src = src; this.fn = fn; this.dst = dst;
1404              this.executor = executor;
1405          }
1406 <        public final void run() {
1406 >        public final CompletableFuture<?> trigger() {
1407              final CompletableFuture<? extends T> a;
1408              final BiFunction<? super T, Throwable, ? extends U> fn;
1409              final CompletableFuture<U> dst;
# Line 1376 | Line 1434 | public class CompletableFuture<T> implem
1434                      dx = rex;
1435                  }
1436                  if (e == null || dx != null)
1437 <                    dst.internalComplete(u, dx);
1437 >                    dst.setInternalResult(u, dx);
1438              }
1439 +            return dst;
1440          }
1441          private static final long serialVersionUID = 5232453952276885070L;
1442      }
# Line 1394 | Line 1453 | public class CompletableFuture<T> implem
1453              this.src = src; this.fn = fn; this.dst = dst;
1454              this.executor = executor;
1455          }
1456 <        public final void run() {
1456 >        public final CompletableFuture<?> trigger() {
1457              final CompletableFuture<? extends T> a;
1458              final Function<? super T, ? extends CompletionStage<U>> fn;
1459              final CompletableFuture<U> dst;
# Line 1455 | Line 1514 | public class CompletableFuture<T> implem
1514                      }
1515                  }
1516                  if (complete || ex != null)
1517 <                    dst.internalComplete(u, ex);
1517 >                    dst.setInternalResult(u, ex);
1518                  if (c != null)
1519                      c.helpPostComplete();
1520              }
1521 +            return dst;
1522          }
1523          private static final long serialVersionUID = 5232453952276885070L;
1524      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines