ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/jtreg/util/concurrent/CompletableFuture/Basic.java
Revision: 1.10
Committed: Mon Jan 8 01:47:47 2018 UTC (6 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.9: +77 -77 lines
Log Message:
standardize lambda style

File Contents

# Content
1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation.
7 *
8 * This code is distributed in the hope that it will be useful, but WITHOUT
9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
11 * version 2 for more details (a copy is included in the LICENSE file that
12 * accompanied this code).
13 *
14 * You should have received a copy of the GNU General Public License version
15 * 2 along with this work; if not, write to the Free Software Foundation,
16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17 *
18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19 * or visit www.oracle.com if you need additional information or have any
20 * questions.
21 */
22
23 /*
24 * This file is available under and governed by the GNU General Public
25 * License version 2 only, as published by the Free Software Foundation.
26 * However, the following notice accompanied the original version of this
27 * file:
28 *
29 * Written by Doug Lea with assistance from members of JCP JSR-166
30 * Expert Group and released to the public domain, as explained at
31 * http://creativecommons.org/publicdomain/zero/1.0/
32 */
33
34 /*
35 * @test
36 * @bug 8005696
37 * @summary Basic tests for CompletableFuture
38 * @library /lib/testlibrary/
39 * @run main Basic
40 * @run main/othervm -Djava.util.concurrent.ForkJoinPool.common.parallelism=0 Basic
41 * @author Chris Hegarty
42 */
43
44 import static java.util.concurrent.CompletableFuture.runAsync;
45 import static java.util.concurrent.CompletableFuture.supplyAsync;
46 import static java.util.concurrent.ForkJoinPool.commonPool;
47 import static java.util.concurrent.TimeUnit.MILLISECONDS;
48 import static java.util.concurrent.TimeUnit.SECONDS;
49
50 import java.lang.reflect.Array;
51 import java.util.concurrent.Phaser;
52 import java.util.concurrent.CompletableFuture;
53 import java.util.concurrent.CompletionException;
54 import java.util.concurrent.CancellationException;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.atomic.AtomicInteger;
59 import jdk.testlibrary.Utils;
60
61 public class Basic {
62 static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
63
64 static void checkCompletedNormally(CompletableFuture<?> cf, Object value) {
65 checkCompletedNormally(cf, value == null ? null : new Object[] { value });
66 }
67
68 static void checkCompletedNormally(CompletableFuture<?> cf, Object[] values) {
69 try { equalAnyOf(cf.join(), values); } catch (Throwable x) { unexpected(x); }
70 try { equalAnyOf(cf.getNow(null), values); } catch (Throwable x) { unexpected(x); }
71 try { equalAnyOf(cf.get(), values); } catch (Throwable x) { unexpected(x); }
72 try { equalAnyOf(cf.get(0L, SECONDS), values); } catch (Throwable x) { unexpected(x); }
73 check(cf.isDone(), "Expected isDone to be true, got:" + cf);
74 check(!cf.isCompletedExceptionally(), "Expected isCompletedExceptionally to return false");
75 check(!cf.isCancelled(), "Expected isCancelled to be false");
76 check(!cf.cancel(true), "Expected cancel to return false");
77 check(cf.toString().matches(".*\\[.*Completed normally.*\\]"));
78 check(cf.complete(null) == false, "Expected complete() to fail");
79 check(cf.completeExceptionally(new Throwable()) == false,
80 "Expected completeExceptionally() to fail");
81 }
82
83 static <T> void checkCompletedExceptionally(CompletableFuture<T> cf)
84 throws Exception
85 {
86 checkCompletedExceptionally(cf, false);
87 }
88
89 @SuppressWarnings("unchecked")
90 static <T> void checkCompletedExceptionally(CompletableFuture<T> cf, boolean cancelled)
91 throws Exception
92 {
93 try { cf.join(); fail("Excepted exception to be thrown"); }
94 catch (CompletionException x) { if (cancelled) fail(); else pass(); }
95 catch (CancellationException x) { if (cancelled) pass(); else fail(); }
96 try { cf.getNow(null); fail("Excepted exception to be thrown"); }
97 catch (CompletionException x) { if (cancelled) fail(); else pass(); }
98 catch (CancellationException x) { if (cancelled) pass(); else fail(); }
99 try { cf.get(); fail("Excepted exception to be thrown");}
100 catch (CancellationException x) { if (cancelled) pass(); else fail(); }
101 catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); }
102 try { cf.get(0L, SECONDS); fail("Excepted exception to be thrown");}
103 catch (CancellationException x) { if (cancelled) pass(); else fail(); }
104 catch (ExecutionException x) { if (cancelled) check(x.getCause() instanceof CancellationException); else pass(); }
105 check(cf.isDone(), "Expected isDone to be true, got:" + cf);
106 check(cf.isCompletedExceptionally(), "Expected isCompletedExceptionally");
107 check(cf.isCancelled() == cancelled, "Expected isCancelled: " + cancelled + ", got:" + cf.isCancelled());
108 check(cf.cancel(true) == cancelled, "Expected cancel: " + cancelled + ", got:" + cf.cancel(true));
109 check(cf.toString().matches(".*\\[.*Completed exceptionally.*\\]")); // ## TODO: 'E'xceptionally
110 check(cf.complete((T)new Object()) == false, "Expected complete() to fail");
111 check(cf.completeExceptionally(new Throwable()) == false,
112 "Expected completeExceptionally() to fail, already completed");
113 }
114
115 private static void realMain(String[] args) throws Throwable {
116 ExecutorService pool = Executors.newFixedThreadPool(2);
117 try {
118 test(pool);
119 } finally {
120 pool.shutdown();
121 if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
122 throw new Error();
123 }
124 }
125
126 static AtomicInteger atomicInt = new AtomicInteger(0);
127
128 private static void test(ExecutorService executor) throws Throwable {
129
130 Thread.currentThread().setName("mainThread");
131
132 //----------------------------------------------------------------
133 // supplyAsync tests
134 //----------------------------------------------------------------
135 try {
136 CompletableFuture<String> cf = supplyAsync(() -> "a test string");
137 checkCompletedNormally(cf, cf.join());
138 cf = supplyAsync(() -> "a test string", commonPool());
139 checkCompletedNormally(cf, cf.join());
140 cf = supplyAsync(() -> "a test string", executor);
141 checkCompletedNormally(cf, cf.join());
142 cf = supplyAsync(() -> { throw new RuntimeException(); });
143 checkCompletedExceptionally(cf);
144 cf = supplyAsync(() -> { throw new RuntimeException(); }, commonPool());
145 checkCompletedExceptionally(cf);
146 cf = supplyAsync(() -> { throw new RuntimeException(); }, executor);
147 checkCompletedExceptionally(cf);
148 } catch (Throwable t) { unexpected(t); }
149
150 //----------------------------------------------------------------
151 // runAsync tests
152 //----------------------------------------------------------------
153 try {
154 CompletableFuture<Void> cf = runAsync(() -> {});
155 checkCompletedNormally(cf, cf.join());
156 cf = runAsync(() -> {}, commonPool());
157 checkCompletedNormally(cf, cf.join());
158 cf = runAsync(() -> {}, executor);
159 checkCompletedNormally(cf, cf.join());
160 cf = runAsync(() -> { throw new RuntimeException(); });
161 checkCompletedExceptionally(cf);
162 cf = runAsync(() -> { throw new RuntimeException(); }, commonPool());
163 checkCompletedExceptionally(cf);
164 cf = runAsync(() -> { throw new RuntimeException(); }, executor);
165 checkCompletedExceptionally(cf);
166 } catch (Throwable t) { unexpected(t); }
167
168 //----------------------------------------------------------------
169 // explicit completion
170 //----------------------------------------------------------------
171 try {
172 final Phaser phaser = new Phaser(1);
173 final int phase = phaser.getPhase();
174 CompletableFuture<Integer> cf;
175 cf = supplyAsync(() -> { phaser.awaitAdvance(phase); return 1; });
176 cf.complete(2);
177 phaser.arrive();
178 checkCompletedNormally(cf, 2);
179
180 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+1); return 1; });
181 cf.completeExceptionally(new Throwable());
182 phaser.arrive();
183 checkCompletedExceptionally(cf);
184
185 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+2); return 1; });
186 cf.cancel(true);
187 phaser.arrive();
188 checkCompletedExceptionally(cf, true);
189
190 cf = supplyAsync(() -> { phaser.awaitAdvance(phase+3); return 1; });
191 check(cf.getNow(2) == 2);
192 phaser.arrive();
193 checkCompletedNormally(cf, 1);
194 check(cf.getNow(2) == 1);
195 } catch (Throwable t) { unexpected(t); }
196
197 //----------------------------------------------------------------
198 // thenApplyXXX tests
199 //----------------------------------------------------------------
200 try {
201 CompletableFuture<Integer> cf2;
202 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string");
203 cf2 = cf1.thenApply(x -> x.equals("a test string") ? 1 : 0);
204 checkCompletedNormally(cf1, "a test string");
205 checkCompletedNormally(cf2, 1);
206
207 cf1 = supplyAsync(() -> "a test string");
208 cf2 = cf1.thenApplyAsync(x -> x.equals("a test string") ? 1 : 0);
209 checkCompletedNormally(cf1, "a test string");
210 checkCompletedNormally(cf2, 1);
211
212 cf1 = supplyAsync(() -> "a test string");
213 cf2 = cf1.thenApplyAsync(x -> x.equals("a test string") ? 1 : 0, executor);
214 checkCompletedNormally(cf1, "a test string");
215 checkCompletedNormally(cf2, 1);
216
217 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
218 cf2 = cf1.thenApply(x -> 0);
219 checkCompletedExceptionally(cf1);
220 checkCompletedExceptionally(cf2);
221
222 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
223 cf2 = cf1.thenApplyAsync(x -> 0);
224 checkCompletedExceptionally(cf1);
225 checkCompletedExceptionally(cf2);
226
227 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
228 cf2 = cf1.thenApplyAsync(x -> 0, executor);
229 checkCompletedExceptionally(cf1);
230 checkCompletedExceptionally(cf2);
231 } catch (Throwable t) { unexpected(t); }
232
233 //----------------------------------------------------------------
234 // thenAcceptXXX tests
235 //----------------------------------------------------------------
236 try {
237 CompletableFuture<Void> cf2;
238 int before = atomicInt.get();
239 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string");
240 cf2 = cf1.thenAccept(x -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); });
241 checkCompletedNormally(cf1, "a test string");
242 checkCompletedNormally(cf2, null);
243 check(atomicInt.get() == (before + 1));
244
245 before = atomicInt.get();
246 cf1 = supplyAsync(() -> "a test string");
247 cf2 = cf1.thenAcceptAsync(x -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); });
248 checkCompletedNormally(cf1, "a test string");
249 checkCompletedNormally(cf2, null);
250 check(atomicInt.get() == (before + 1));
251
252 before = atomicInt.get();
253 cf1 = supplyAsync(() -> "a test string");
254 cf2 = cf1.thenAcceptAsync(x -> { if (x.equals("a test string")) { atomicInt.incrementAndGet(); return; } throw new RuntimeException(); }, executor);
255 checkCompletedNormally(cf1, "a test string");
256 checkCompletedNormally(cf2, null);
257 check(atomicInt.get() == (before + 1));
258
259 before = atomicInt.get();
260 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
261 cf2 = cf1.thenAccept(x -> atomicInt.incrementAndGet());
262 checkCompletedExceptionally(cf1);
263 checkCompletedExceptionally(cf2);
264 check(atomicInt.get() == before);
265
266 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
267 cf2 = cf1.thenAcceptAsync(x -> atomicInt.incrementAndGet());
268 checkCompletedExceptionally(cf1);
269 checkCompletedExceptionally(cf2);
270 check(atomicInt.get() == before);
271
272 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
273 cf2 = cf1.thenAcceptAsync(x -> atomicInt.incrementAndGet(), executor );
274 checkCompletedExceptionally(cf1);
275 checkCompletedExceptionally(cf2);
276 check(atomicInt.get() == before);
277 } catch (Throwable t) { unexpected(t); }
278
279 //----------------------------------------------------------------
280 // thenRunXXX tests
281 //----------------------------------------------------------------
282 try {
283 CompletableFuture<Void> cf2;
284 int before = atomicInt.get();
285 CompletableFuture<String> cf1 = supplyAsync(() -> "a test string");
286 cf2 = cf1.thenRun(() -> atomicInt.incrementAndGet());
287 checkCompletedNormally(cf1, "a test string");
288 checkCompletedNormally(cf2, null);
289 check(atomicInt.get() == (before + 1));
290
291 before = atomicInt.get();
292 cf1 = supplyAsync(() -> "a test string");
293 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet());
294 checkCompletedNormally(cf1, "a test string");
295 checkCompletedNormally(cf2, null);
296 check(atomicInt.get() == (before + 1));
297
298 before = atomicInt.get();
299 cf1 = supplyAsync(() -> "a test string");
300 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet(), executor);
301 checkCompletedNormally(cf1, "a test string");
302 checkCompletedNormally(cf2, null);
303 check(atomicInt.get() == (before + 1));
304
305 before = atomicInt.get();
306 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
307 cf2 = cf1.thenRun(() -> atomicInt.incrementAndGet());
308 checkCompletedExceptionally(cf1);
309 checkCompletedExceptionally(cf2);
310 check(atomicInt.get() == before);
311
312 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
313 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet());
314 checkCompletedExceptionally(cf1);
315 checkCompletedExceptionally(cf2);
316 check(atomicInt.get() == before);
317
318 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
319 cf2 = cf1.thenRunAsync(() -> atomicInt.incrementAndGet(), executor);
320 checkCompletedExceptionally(cf1);
321 checkCompletedExceptionally(cf2);
322 check(atomicInt.get() == before);
323 } catch (Throwable t) { unexpected(t); }
324
325 //----------------------------------------------------------------
326 // thenCombineXXX tests
327 //----------------------------------------------------------------
328 try {
329 CompletableFuture<Integer> cf3;
330 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
331 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1);
332 cf3 = cf1.thenCombine(cf2, (x, y) -> x + y);
333 checkCompletedNormally(cf1, 1);
334 checkCompletedNormally(cf2, 1);
335 checkCompletedNormally(cf3, 2);
336
337 cf1 = supplyAsync(() -> 1);
338 cf2 = supplyAsync(() -> 1);
339 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> x + y);
340 checkCompletedNormally(cf1, 1);
341 checkCompletedNormally(cf2, 1);
342 checkCompletedNormally(cf3, 2);
343
344 cf1 = supplyAsync(() -> 1);
345 cf2 = supplyAsync(() -> 1);
346 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> x + y, executor);
347 checkCompletedNormally(cf1, 1);
348 checkCompletedNormally(cf2, 1);
349 checkCompletedNormally(cf3, 2);
350
351 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
352 cf2 = supplyAsync(() -> 1);
353 cf3 = cf1.thenCombine(cf2, (x, y) -> 0);
354 checkCompletedExceptionally(cf1);
355 checkCompletedNormally(cf2, 1);
356 checkCompletedExceptionally(cf3);
357
358 cf1 = supplyAsync(() -> 1);
359 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
360 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> 0);
361 checkCompletedNormally(cf1, 1);
362 checkCompletedExceptionally(cf2);
363 checkCompletedExceptionally(cf3);
364
365 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
366 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
367 cf3 = cf1.thenCombineAsync(cf2, (x, y) -> 0, executor);
368 checkCompletedExceptionally(cf1);
369 checkCompletedExceptionally(cf2);
370 checkCompletedExceptionally(cf3);
371 } catch (Throwable t) { unexpected(t); }
372
373 //----------------------------------------------------------------
374 // thenAcceptBothXXX tests
375 //----------------------------------------------------------------
376 try {
377 CompletableFuture<Void> cf3;
378 int before = atomicInt.get();
379 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
380 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1);
381 cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); });
382 checkCompletedNormally(cf1, 1);
383 checkCompletedNormally(cf2, 1);
384 checkCompletedNormally(cf3, null);
385 check(atomicInt.get() == (before + 1));
386
387 before = atomicInt.get();
388 cf1 = supplyAsync(() -> 1);
389 cf2 = supplyAsync(() -> 1);
390 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); });
391 checkCompletedNormally(cf1, 1);
392 checkCompletedNormally(cf2, 1);
393 checkCompletedNormally(cf3, null);
394 check(atomicInt.get() == (before + 1));
395
396 before = atomicInt.get();
397 cf1 = supplyAsync(() -> 1);
398 cf2 = supplyAsync(() -> 1);
399 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> { check(x + y == 2); atomicInt.incrementAndGet(); }, executor);
400 checkCompletedNormally(cf1, 1);
401 checkCompletedNormally(cf2, 1);
402 checkCompletedNormally(cf3, null);
403 check(atomicInt.get() == (before + 1));
404
405 before = atomicInt.get();
406 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
407 cf2 = supplyAsync(() -> 1);
408 cf3 = cf1.thenAcceptBoth(cf2, (x, y) -> atomicInt.incrementAndGet());
409 checkCompletedExceptionally(cf1);
410 checkCompletedNormally(cf2, 1);
411 checkCompletedExceptionally(cf3);
412 check(atomicInt.get() == before);
413
414 cf1 = supplyAsync(() -> 1);
415 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
416 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> atomicInt.incrementAndGet());
417 checkCompletedNormally(cf1, 1);
418 checkCompletedExceptionally(cf2);
419 checkCompletedExceptionally(cf3);
420 check(atomicInt.get() == before);
421
422 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
423 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
424 cf3 = cf1.thenAcceptBothAsync(cf2, (x, y) -> atomicInt.incrementAndGet(), executor);
425 checkCompletedExceptionally(cf1);
426 checkCompletedExceptionally(cf2);
427 checkCompletedExceptionally(cf3);
428 check(atomicInt.get() == before);
429 } catch (Throwable t) { unexpected(t); }
430
431 //----------------------------------------------------------------
432 // runAfterBothXXX tests
433 //----------------------------------------------------------------
434 try {
435 CompletableFuture<Void> cf3;
436 int before = atomicInt.get();
437 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
438 CompletableFuture<Integer> cf2 = supplyAsync(() -> 1);
439 cf3 = cf1.runAfterBoth(cf2, () -> { check(cf1.isDone()); check(cf2.isDone()); atomicInt.incrementAndGet(); });
440 checkCompletedNormally(cf1, 1);
441 checkCompletedNormally(cf2, 1);
442 checkCompletedNormally(cf3, null);
443 check(atomicInt.get() == (before + 1));
444
445 before = atomicInt.get();
446 CompletableFuture<Integer> cfa = supplyAsync(() -> 1);
447 CompletableFuture<Integer> cfb = supplyAsync(() -> 1);
448 cf3 = cfa.runAfterBothAsync(cfb, () -> { check(cfa.isDone()); check(cfb.isDone()); atomicInt.incrementAndGet(); });
449 checkCompletedNormally(cfa, 1);
450 checkCompletedNormally(cfb, 1);
451 checkCompletedNormally(cf3, null);
452 check(atomicInt.get() == (before + 1));
453
454 before = atomicInt.get();
455 CompletableFuture<Integer> cfx = supplyAsync(() -> 1);
456 CompletableFuture<Integer> cfy = supplyAsync(() -> 1);
457 cf3 = cfy.runAfterBothAsync(cfx, () -> { check(cfx.isDone()); check(cfy.isDone()); atomicInt.incrementAndGet(); }, executor);
458 checkCompletedNormally(cfx, 1);
459 checkCompletedNormally(cfy, 1);
460 checkCompletedNormally(cf3, null);
461 check(atomicInt.get() == (before + 1));
462
463 before = atomicInt.get();
464 CompletableFuture<Integer> cf4 = supplyAsync(() -> { throw new RuntimeException(); });
465 CompletableFuture<Integer> cf5 = supplyAsync(() -> 1);
466 cf3 = cf5.runAfterBothAsync(cf4, () -> atomicInt.incrementAndGet(), executor);
467 checkCompletedExceptionally(cf4);
468 checkCompletedNormally(cf5, 1);
469 checkCompletedExceptionally(cf3);
470 check(atomicInt.get() == before);
471
472 before = atomicInt.get();
473 cf4 = supplyAsync(() -> 1);
474 cf5 = supplyAsync(() -> { throw new RuntimeException(); });
475 cf3 = cf5.runAfterBothAsync(cf4, () -> atomicInt.incrementAndGet());
476 checkCompletedNormally(cf4, 1);
477 checkCompletedExceptionally(cf5);
478 checkCompletedExceptionally(cf3);
479 check(atomicInt.get() == before);
480
481 before = atomicInt.get();
482 cf4 = supplyAsync(() -> { throw new RuntimeException(); });
483 cf5 = supplyAsync(() -> { throw new RuntimeException(); });
484 cf3 = cf5.runAfterBoth(cf4, () -> atomicInt.incrementAndGet());
485 checkCompletedExceptionally(cf4);
486 checkCompletedExceptionally(cf5);
487 checkCompletedExceptionally(cf3);
488 check(atomicInt.get() == before);
489 } catch (Throwable t) { unexpected(t); }
490
491 //----------------------------------------------------------------
492 // applyToEitherXXX tests
493 //----------------------------------------------------------------
494 try {
495 CompletableFuture<Integer> cf3;
496 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
497 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2);
498 cf3 = cf1.applyToEither(cf2, x -> { check(x == 1 || x == 2); return x; });
499 checkCompletedNormally(cf3, new Object[] {1, 2});
500 check(cf1.isDone() || cf2.isDone());
501
502 cf1 = supplyAsync(() -> 1);
503 cf2 = supplyAsync(() -> 2);
504 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1 || x == 2); return x; });
505 checkCompletedNormally(cf3, new Object[] {1, 2});
506 check(cf1.isDone() || cf2.isDone());
507
508 cf1 = supplyAsync(() -> 1);
509 cf2 = supplyAsync(() -> 2);
510 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1 || x == 2); return x; }, executor);
511 checkCompletedNormally(cf3, new Object[] {1, 2});
512 check(cf1.isDone() || cf2.isDone());
513
514 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
515 cf2 = supplyAsync(() -> 2);
516 cf3 = cf1.applyToEither(cf2, x -> { check(x == 2); return x; });
517 try { check(cf3.join() == 2); } catch (CompletionException x) { pass(); }
518 check(cf3.isDone());
519 check(cf1.isDone() || cf2.isDone());
520
521 cf1 = supplyAsync(() -> 1);
522 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
523 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1); return x; });
524 try { check(cf3.join() == 1); } catch (CompletionException x) { pass(); }
525 check(cf3.isDone());
526 check(cf1.isDone() || cf2.isDone());
527
528 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
529 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
530 cf3 = cf1.applyToEitherAsync(cf2, x -> { fail(); return x; });
531 checkCompletedExceptionally(cf3);
532 check(cf1.isDone() || cf2.isDone());
533
534 final Phaser cf3Done = new Phaser(2);
535 cf1 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 1; });
536 cf2 = supplyAsync(() -> 2);
537 cf3 = cf1.applyToEither(cf2, x -> { check(x == 2); return x; });
538 checkCompletedNormally(cf3, 2);
539 checkCompletedNormally(cf2, 2);
540 check(!cf1.isDone());
541 cf3Done.arrive();
542 checkCompletedNormally(cf1, 1);
543 checkCompletedNormally(cf3, 2);
544
545 cf1 = supplyAsync(() -> 1);
546 cf2 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 2; });
547 cf3 = cf1.applyToEitherAsync(cf2, x -> { check(x == 1); return x; });
548 checkCompletedNormally(cf3, 1);
549 checkCompletedNormally(cf1, 1);
550 check(!cf2.isDone());
551 cf3Done.arrive();
552 checkCompletedNormally(cf2, 2);
553 checkCompletedNormally(cf3, 1);
554 } catch (Throwable t) { unexpected(t); }
555
556 //----------------------------------------------------------------
557 // acceptEitherXXX tests
558 //----------------------------------------------------------------
559 try {
560 CompletableFuture<Void> cf3;
561 int before = atomicInt.get();
562 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
563 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2);
564 cf3 = cf1.acceptEither(cf2, x -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); });
565 checkCompletedNormally(cf3, null);
566 check(cf1.isDone() || cf2.isDone());
567 check(atomicInt.get() == (before + 1));
568
569 before = atomicInt.get();
570 cf1 = supplyAsync(() -> 1);
571 cf2 = supplyAsync(() -> 2);
572 cf3 = cf1.acceptEitherAsync(cf2, x -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); });
573 checkCompletedNormally(cf3, null);
574 check(cf1.isDone() || cf2.isDone());
575 check(atomicInt.get() == (before + 1));
576
577 before = atomicInt.get();
578 cf1 = supplyAsync(() -> 1);
579 cf2 = supplyAsync(() -> 2);
580 cf3 = cf2.acceptEitherAsync(cf1, x -> { check(x == 1 || x == 2); atomicInt.incrementAndGet(); }, executor);
581 checkCompletedNormally(cf3, null);
582 check(cf1.isDone() || cf2.isDone());
583 check(atomicInt.get() == (before + 1));
584
585 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
586 cf2 = supplyAsync(() -> 2);
587 cf3 = cf2.acceptEitherAsync(cf1, x -> { check(x == 2); }, executor);
588 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); }
589 check(cf3.isDone());
590 check(cf1.isDone() || cf2.isDone());
591
592 cf1 = supplyAsync(() -> 1);
593 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
594 cf3 = cf2.acceptEitherAsync(cf1, x -> { check(x == 1); });
595 try { check(cf3.join() == null); } catch (CompletionException x) { pass(); }
596 check(cf3.isDone());
597 check(cf1.isDone() || cf2.isDone());
598
599 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
600 cf2 = supplyAsync(() -> { throw new RuntimeException(); });
601 cf3 = cf2.acceptEitherAsync(cf1, x -> { fail(); });
602 checkCompletedExceptionally(cf3);
603 check(cf1.isDone() || cf2.isDone());
604
605 final Phaser cf3Done = new Phaser(2);
606 cf1 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 1; });
607 cf2 = supplyAsync(() -> 2);
608 cf3 = cf1.acceptEither(cf2, x -> { check(x == 2); });
609 checkCompletedNormally(cf3, null);
610 checkCompletedNormally(cf2, 2);
611 check(!cf1.isDone());
612 cf3Done.arrive();
613 checkCompletedNormally(cf1, 1);
614 checkCompletedNormally(cf3, null);
615
616 cf1 = supplyAsync(() -> 1);
617 cf2 = supplyAsync(() -> { cf3Done.arriveAndAwaitAdvance(); return 2; });
618 cf3 = cf1.acceptEitherAsync(cf2, x -> { check(x == 1); });
619 checkCompletedNormally(cf3, null);
620 checkCompletedNormally(cf1, 1);
621 check(!cf2.isDone());
622 cf3Done.arrive();
623 checkCompletedNormally(cf2, 2);
624 checkCompletedNormally(cf3, null);
625 } catch (Throwable t) { unexpected(t); }
626
627 //----------------------------------------------------------------
628 // runAfterEitherXXX tests
629 //----------------------------------------------------------------
630 try {
631 CompletableFuture<Void> cf3;
632 int before = atomicInt.get();
633 CompletableFuture<Void> cf1 = runAsync(() -> {});
634 CompletableFuture<Void> cf2 = runAsync(() -> {});
635 cf3 = cf1.runAfterEither(cf2, () -> atomicInt.incrementAndGet());
636 checkCompletedNormally(cf3, null);
637 check(cf1.isDone() || cf2.isDone());
638 check(atomicInt.get() == (before + 1));
639
640 before = atomicInt.get();
641 cf1 = runAsync(() -> {});
642 cf2 = runAsync(() -> {});
643 cf3 = cf1.runAfterEitherAsync(cf2, () -> atomicInt.incrementAndGet());
644 checkCompletedNormally(cf3, null);
645 check(cf1.isDone() || cf2.isDone());
646 check(atomicInt.get() == (before + 1));
647
648 before = atomicInt.get();
649 cf1 = runAsync(() -> {});
650 cf2 = runAsync(() -> {});
651 cf3 = cf2.runAfterEitherAsync(cf1, () -> atomicInt.incrementAndGet(), executor);
652 checkCompletedNormally(cf3, null);
653 check(cf1.isDone() || cf2.isDone());
654 check(atomicInt.get() == (before + 1));
655
656 before = atomicInt.get();
657 cf1 = runAsync(() -> { throw new RuntimeException(); });
658 cf2 = runAsync(() -> {});
659 cf3 = cf2.runAfterEither(cf1, () -> atomicInt.incrementAndGet());
660 try {
661 check(cf3.join() == null);
662 check(atomicInt.get() == (before + 1));
663 } catch (CompletionException x) { pass(); }
664 check(cf3.isDone());
665 check(cf1.isDone() || cf2.isDone());
666
667 before = atomicInt.get();
668 cf1 = runAsync(() -> {});
669 cf2 = runAsync(() -> { throw new RuntimeException(); });
670 cf3 = cf1.runAfterEitherAsync(cf2, () -> atomicInt.incrementAndGet());
671 try {
672 check(cf3.join() == null);
673 check(atomicInt.get() == (before + 1));
674 } catch (CompletionException x) { pass(); }
675 check(cf3.isDone());
676 check(cf1.isDone() || cf2.isDone());
677
678 before = atomicInt.get();
679 cf1 = runAsync(() -> { throw new RuntimeException(); });
680 cf2 = runAsync(() -> { throw new RuntimeException(); });
681 cf3 = cf2.runAfterEitherAsync(cf1, () -> atomicInt.incrementAndGet(), executor);
682 checkCompletedExceptionally(cf3);
683 check(cf1.isDone() || cf2.isDone());
684 check(atomicInt.get() == before);
685
686 final Phaser cf3Done = new Phaser(2);
687 before = atomicInt.get();
688 cf1 = runAsync(() -> cf3Done.arriveAndAwaitAdvance());
689 cf2 = runAsync(() -> {});
690 cf3 = cf1.runAfterEither(cf2, () -> atomicInt.incrementAndGet());
691 checkCompletedNormally(cf3, null);
692 checkCompletedNormally(cf2, null);
693 check(!cf1.isDone());
694 check(atomicInt.get() == (before + 1));
695 cf3Done.arrive();
696 checkCompletedNormally(cf1, null);
697 checkCompletedNormally(cf3, null);
698
699 before = atomicInt.get();
700 cf1 = runAsync(() -> {});
701 cf2 = runAsync(() -> cf3Done.arriveAndAwaitAdvance());
702 cf3 = cf1.runAfterEitherAsync(cf2, () -> atomicInt.incrementAndGet());
703 checkCompletedNormally(cf3, null);
704 checkCompletedNormally(cf1, null);
705 check(!cf2.isDone());
706 check(atomicInt.get() == (before + 1));
707 cf3Done.arrive();
708 checkCompletedNormally(cf2, null);
709 checkCompletedNormally(cf3, null);
710 } catch (Throwable t) { unexpected(t); }
711
712 //----------------------------------------------------------------
713 // thenComposeXXX tests
714 //----------------------------------------------------------------
715 try {
716 CompletableFuture<Integer> cf2;
717 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
718 cf2 = cf1.thenCompose(x -> { check(x == 1); return CompletableFuture.completedFuture(2); });
719 checkCompletedNormally(cf1, 1);
720 checkCompletedNormally(cf2, 2);
721
722 cf1 = supplyAsync(() -> 1);
723 cf2 = cf1.thenComposeAsync(x -> { check(x == 1); return CompletableFuture.completedFuture(2); });
724 checkCompletedNormally(cf1, 1);
725 checkCompletedNormally(cf2, 2);
726
727 cf1 = supplyAsync(() -> 1);
728 cf2 = cf1.thenComposeAsync(x -> { check(x == 1); return CompletableFuture.completedFuture(2); }, executor);
729 checkCompletedNormally(cf1, 1);
730 checkCompletedNormally(cf2, 2);
731
732 int before = atomicInt.get();
733 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
734 cf2 = cf1.thenCompose(x -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); });
735 checkCompletedExceptionally(cf1);
736 checkCompletedExceptionally(cf2);
737 check(atomicInt.get() == before);
738
739 cf1 = supplyAsync(() -> { throw new RuntimeException(); });
740 cf2 = cf1.thenComposeAsync(x -> { atomicInt.incrementAndGet(); return CompletableFuture.completedFuture(2); });
741 checkCompletedExceptionally(cf1);
742 checkCompletedExceptionally(cf2);
743 check(atomicInt.get() == before);
744
745 cf1 = supplyAsync(() -> 1);
746 cf2 = cf1.thenComposeAsync(x -> { throw new RuntimeException(); }, executor);
747 checkCompletedNormally(cf1, 1);
748 checkCompletedExceptionally(cf2);
749 } catch (Throwable t) { unexpected(t); }
750
751 //----------------------------------------------------------------
752 // anyOf tests
753 //----------------------------------------------------------------
754 try {
755 CompletableFuture<Object> cf3;
756 for (int k=0; k < 10; k++){
757 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
758 CompletableFuture<Integer> cf2 = supplyAsync(() -> 2);
759 cf3 = CompletableFuture.anyOf(cf1, cf2);
760 checkCompletedNormally(cf3, new Object[] {1, 2});
761 check(cf1.isDone() || cf2.isDone());
762 }
763 } catch (Throwable t) { unexpected(t); }
764
765 //----------------------------------------------------------------
766 // allOf tests
767 //----------------------------------------------------------------
768 try {
769 CompletableFuture<?> cf3;
770 for (int k=0; k < 10; k++){
771 CompletableFuture<Integer>[] cfs = (CompletableFuture<Integer>[])
772 Array.newInstance(CompletableFuture.class, 10);
773 for (int j=0; j < 10; j++) {
774 final int v = j;
775 cfs[j] = supplyAsync(() -> v);
776 }
777 cf3 = CompletableFuture.allOf(cfs);
778 for (int j=0; j < 10; j++)
779 checkCompletedNormally(cfs[j], j);
780 checkCompletedNormally(cf3, null);
781 }
782 } catch (Throwable t) { unexpected(t); }
783
784 //----------------------------------------------------------------
785 // exceptionally tests
786 //----------------------------------------------------------------
787 try {
788 CompletableFuture<Integer> cf2;
789 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
790 cf2 = cf1.exceptionally(t -> { fail("function should never be called"); return 2;});
791 checkCompletedNormally(cf1, 1);
792 checkCompletedNormally(cf2, 1);
793
794 final RuntimeException t = new RuntimeException();
795 cf1 = supplyAsync(() -> { throw t; });
796 cf2 = cf1.exceptionally(x -> { check(x.getCause() == t); return 2;});
797 checkCompletedExceptionally(cf1);
798 checkCompletedNormally(cf2, 2);
799 } catch (Throwable t) { unexpected(t); }
800
801 //----------------------------------------------------------------
802 // handle tests
803 //----------------------------------------------------------------
804 try {
805 CompletableFuture<Integer> cf2;
806 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
807 cf2 = cf1.handle((x,t) -> x+1);
808 checkCompletedNormally(cf1, 1);
809 checkCompletedNormally(cf2, 2);
810
811 final RuntimeException ex = new RuntimeException();
812 cf1 = supplyAsync(() -> { throw ex; });
813 cf2 = cf1.handle((x,t) -> { check(t.getCause() == ex); return 2;});
814 checkCompletedExceptionally(cf1);
815 checkCompletedNormally(cf2, 2);
816
817 cf1 = supplyAsync(() -> 1);
818 cf2 = cf1.handleAsync((x,t) -> x+1);
819 checkCompletedNormally(cf1, 1);
820 checkCompletedNormally(cf2, 2);
821
822 cf1 = supplyAsync(() -> { throw ex; });
823 cf2 = cf1.handleAsync((x,t) -> { check(t.getCause() == ex); return 2;});
824 checkCompletedExceptionally(cf1);
825 checkCompletedNormally(cf2, 2);
826 } catch (Throwable t) { unexpected(t); }
827
828 //----------------------------------------------------------------
829 // whenComplete tests
830 //----------------------------------------------------------------
831 try {
832 AtomicInteger count = new AtomicInteger();
833 CompletableFuture<Integer> cf2;
834 CompletableFuture<Integer> cf1 = supplyAsync(() -> 1);
835 cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement());
836 checkCompletedNormally(cf1, 1);
837 checkCompletedNormally(cf2, 1);
838 check(count.get() == 1, "action count should be incremented");
839
840 final RuntimeException ex = new RuntimeException();
841 cf1 = supplyAsync(() -> { throw ex; });
842 cf2 = cf1.whenComplete((x,t) -> count.getAndIncrement());
843 checkCompletedExceptionally(cf1);
844 checkCompletedExceptionally(cf2);
845 check(count.get() == 2, "action count should be incremented");
846
847 cf1 = supplyAsync(() -> 1);
848 cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement());
849 checkCompletedNormally(cf1, 1);
850 checkCompletedNormally(cf2, 1);
851 check(count.get() == 3, "action count should be incremented");
852
853 cf1 = supplyAsync(() -> { throw ex; });
854 cf2 = cf1.whenCompleteAsync((x,t) -> count.getAndIncrement());
855 checkCompletedExceptionally(cf1);
856 checkCompletedExceptionally(cf2);
857 check(count.get() == 4, "action count should be incremented");
858
859 } catch (Throwable t) { unexpected(t); }
860
861 }
862
863 //--------------------- Infrastructure ---------------------------
864 static volatile int passed = 0, failed = 0;
865 static void pass() {passed++;}
866 static void fail() {failed++; Thread.dumpStack();}
867 static void fail(String msg) {System.out.println(msg); fail();}
868 static void unexpected(Throwable t) {failed++; t.printStackTrace();}
869 static void check(boolean cond) {if (cond) pass(); else fail();}
870 static void check(boolean cond, String msg) {if (cond) pass(); else fail(msg);}
871 static void equal(Object x, Object y) {
872 if (x == null ? y == null : x.equals(y)) pass();
873 else fail(x + " not equal to " + y);}
874 static void equalAnyOf(Object x, Object[] y) {
875 if (x == null && y == null) { pass(); return; }
876 for (Object z : y) { if (x.equals(z)) { pass(); return; } }
877 StringBuilder sb = new StringBuilder();
878 for (Object o : y)
879 sb.append(o).append(" ");
880 fail(x + " not equal to one of [" + sb + "]");}
881 public static void main(String[] args) throws Throwable {
882 try {realMain(args);} catch (Throwable t) {unexpected(t);}
883 System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed);
884 if (failed > 0) throw new AssertionError("Some tests failed");}
885 }