ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/CyclicBarrierTest.java
Revision: 1.32
Committed: Tue Aug 13 00:54:51 2019 UTC (4 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.31: +1 -1 lines
Log Message:
use randomBoolean()

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10
11 import java.util.concurrent.BrokenBarrierException;
12 import java.util.concurrent.CountDownLatch;
13 import java.util.concurrent.CyclicBarrier;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.ThreadLocalRandom;
17 import java.util.concurrent.TimeoutException;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import junit.framework.Test;
22 import junit.framework.TestSuite;
23
24 public class CyclicBarrierTest extends JSR166TestCase {
25 public static void main(String[] args) {
26 main(suite(), args);
27 }
28 public static Test suite() {
29 return new TestSuite(CyclicBarrierTest.class);
30 }
31
32 /**
33 * Spin-waits till the number of waiters == numberOfWaiters.
34 */
35 void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) {
36 long startTime = System.nanoTime();
37 while (barrier.getNumberWaiting() != numberOfWaiters) {
38 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
39 fail("timed out");
40 Thread.yield();
41 }
42 }
43
44 /**
45 * Creating with negative parties throws IllegalArgumentException
46 */
47 public void testConstructor1() {
48 try {
49 new CyclicBarrier(-1, (Runnable)null);
50 shouldThrow();
51 } catch (IllegalArgumentException success) {}
52 }
53
54 /**
55 * Creating with negative parties and no action throws
56 * IllegalArgumentException
57 */
58 public void testConstructor2() {
59 try {
60 new CyclicBarrier(-1);
61 shouldThrow();
62 } catch (IllegalArgumentException success) {}
63 }
64
65 /**
66 * getParties returns the number of parties given in constructor
67 */
68 public void testGetParties() {
69 CyclicBarrier b = new CyclicBarrier(2);
70 assertEquals(2, b.getParties());
71 assertEquals(0, b.getNumberWaiting());
72 }
73
74 /**
75 * A 1-party barrier triggers after single await
76 */
77 public void testSingleParty() throws Exception {
78 CyclicBarrier b = new CyclicBarrier(1);
79 assertEquals(1, b.getParties());
80 assertEquals(0, b.getNumberWaiting());
81 b.await();
82 b.await();
83 assertEquals(0, b.getNumberWaiting());
84 }
85
86 /**
87 * The supplied barrier action is run at barrier
88 */
89 public void testBarrierAction() throws Exception {
90 final AtomicInteger count = new AtomicInteger(0);
91 final Runnable incCount = new Runnable() { public void run() {
92 count.getAndIncrement(); }};
93 CyclicBarrier b = new CyclicBarrier(1, incCount);
94 assertEquals(1, b.getParties());
95 assertEquals(0, b.getNumberWaiting());
96 b.await();
97 b.await();
98 assertEquals(0, b.getNumberWaiting());
99 assertEquals(2, count.get());
100 }
101
102 /**
103 * A 2-party/thread barrier triggers after both threads invoke await
104 */
105 public void testTwoParties() throws Exception {
106 final CyclicBarrier b = new CyclicBarrier(2);
107 Thread t = newStartedThread(new CheckedRunnable() {
108 public void realRun() throws Exception {
109 b.await();
110 b.await();
111 b.await();
112 b.await();
113 }});
114
115 b.await();
116 b.await();
117 b.await();
118 b.await();
119 awaitTermination(t);
120 }
121
122 /**
123 * An interruption in one party causes others waiting in await to
124 * throw BrokenBarrierException
125 */
126 public void testAwait1_Interrupted_BrokenBarrier() {
127 final CyclicBarrier c = new CyclicBarrier(3);
128 final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
129 Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
130 public void realRun() throws Exception {
131 pleaseInterrupt.countDown();
132 c.await();
133 }};
134 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
135 public void realRun() throws Exception {
136 pleaseInterrupt.countDown();
137 c.await();
138 }};
139
140 t1.start();
141 t2.start();
142 await(pleaseInterrupt);
143 t1.interrupt();
144 awaitTermination(t1);
145 awaitTermination(t2);
146 }
147
148 /**
149 * An interruption in one party causes others waiting in timed await to
150 * throw BrokenBarrierException
151 */
152 public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
153 final CyclicBarrier c = new CyclicBarrier(3);
154 final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
155 Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
156 public void realRun() throws Exception {
157 pleaseInterrupt.countDown();
158 c.await(LONG_DELAY_MS, MILLISECONDS);
159 }};
160 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
161 public void realRun() throws Exception {
162 pleaseInterrupt.countDown();
163 c.await(LONG_DELAY_MS, MILLISECONDS);
164 }};
165
166 t1.start();
167 t2.start();
168 await(pleaseInterrupt);
169 t1.interrupt();
170 awaitTermination(t1);
171 awaitTermination(t2);
172 }
173
174 /**
175 * A timeout in timed await throws TimeoutException
176 */
177 public void testAwait3_TimeoutException() throws InterruptedException {
178 final CyclicBarrier c = new CyclicBarrier(2);
179 Thread t = newStartedThread(new CheckedRunnable() {
180 public void realRun() throws Exception {
181 long startTime = System.nanoTime();
182 try {
183 c.await(timeoutMillis(), MILLISECONDS);
184 shouldThrow();
185 } catch (TimeoutException success) {}
186 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
187 }});
188
189 awaitTermination(t);
190 }
191
192 /**
193 * A timeout in one party causes others waiting in timed await to
194 * throw BrokenBarrierException
195 */
196 public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException {
197 final CyclicBarrier c = new CyclicBarrier(3);
198 Thread t1 = newStartedThread(new CheckedRunnable() {
199 public void realRun() throws Exception {
200 try {
201 c.await(LONG_DELAY_MS, MILLISECONDS);
202 shouldThrow();
203 } catch (BrokenBarrierException success) {}
204 }});
205 Thread t2 = newStartedThread(new CheckedRunnable() {
206 public void realRun() throws Exception {
207 awaitNumberWaiting(c, 1);
208 long startTime = System.nanoTime();
209 try {
210 c.await(timeoutMillis(), MILLISECONDS);
211 shouldThrow();
212 } catch (TimeoutException success) {}
213 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
214 }});
215
216 awaitTermination(t1);
217 awaitTermination(t2);
218 }
219
220 /**
221 * A timeout in one party causes others waiting in await to
222 * throw BrokenBarrierException
223 */
224 public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException {
225 final CyclicBarrier c = new CyclicBarrier(3);
226 Thread t1 = newStartedThread(new CheckedRunnable() {
227 public void realRun() throws Exception {
228 try {
229 c.await();
230 shouldThrow();
231 } catch (BrokenBarrierException success) {}
232 }});
233 Thread t2 = newStartedThread(new CheckedRunnable() {
234 public void realRun() throws Exception {
235 awaitNumberWaiting(c, 1);
236 long startTime = System.nanoTime();
237 try {
238 c.await(timeoutMillis(), MILLISECONDS);
239 shouldThrow();
240 } catch (TimeoutException success) {}
241 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
242 }});
243
244 awaitTermination(t1);
245 awaitTermination(t2);
246 }
247
248 /**
249 * A reset of an active barrier causes waiting threads to throw
250 * BrokenBarrierException
251 */
252 public void testReset_BrokenBarrier() throws InterruptedException {
253 final CyclicBarrier c = new CyclicBarrier(3);
254 final CountDownLatch pleaseReset = new CountDownLatch(2);
255 Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
256 public void realRun() throws Exception {
257 pleaseReset.countDown();
258 c.await();
259 }};
260 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
261 public void realRun() throws Exception {
262 pleaseReset.countDown();
263 c.await();
264 }};
265
266 t1.start();
267 t2.start();
268 await(pleaseReset);
269
270 awaitNumberWaiting(c, 2);
271 c.reset();
272 awaitTermination(t1);
273 awaitTermination(t2);
274 }
275
276 /**
277 * A reset before threads enter barrier does not throw
278 * BrokenBarrierException
279 */
280 public void testReset_NoBrokenBarrier() throws Exception {
281 final CyclicBarrier c = new CyclicBarrier(3);
282 c.reset();
283
284 Thread t1 = newStartedThread(new CheckedRunnable() {
285 public void realRun() throws Exception {
286 c.await();
287 }});
288 Thread t2 = newStartedThread(new CheckedRunnable() {
289 public void realRun() throws Exception {
290 c.await();
291 }});
292
293 c.await();
294 awaitTermination(t1);
295 awaitTermination(t2);
296 }
297
298 /**
299 * All threads block while a barrier is broken.
300 */
301 public void testReset_Leakage() throws InterruptedException {
302 final CyclicBarrier c = new CyclicBarrier(2);
303 final AtomicBoolean done = new AtomicBoolean();
304 Thread t = newStartedThread(new CheckedRunnable() {
305 public void realRun() {
306 while (!done.get()) {
307 try {
308 while (c.isBroken())
309 c.reset();
310
311 c.await();
312 shouldThrow();
313 }
314 catch (BrokenBarrierException | InterruptedException ok) {}
315 }}});
316
317 for (int i = 0; i < 4; i++) {
318 delay(timeoutMillis());
319 t.interrupt();
320 }
321 done.set(true);
322 t.interrupt();
323 awaitTermination(t);
324 }
325
326 /**
327 * Reset of a non-broken barrier does not break barrier
328 */
329 public void testResetWithoutBreakage() throws Exception {
330 final CyclicBarrier barrier = new CyclicBarrier(3);
331 for (int i = 0; i < 3; i++) {
332 final CyclicBarrier start = new CyclicBarrier(3);
333 Thread t1 = newStartedThread(new CheckedRunnable() {
334 public void realRun() throws Exception {
335 start.await();
336 barrier.await();
337 }});
338
339 Thread t2 = newStartedThread(new CheckedRunnable() {
340 public void realRun() throws Exception {
341 start.await();
342 barrier.await();
343 }});
344
345 start.await();
346 barrier.await();
347 awaitTermination(t1);
348 awaitTermination(t2);
349 assertFalse(barrier.isBroken());
350 assertEquals(0, barrier.getNumberWaiting());
351 if (i == 1) barrier.reset();
352 assertFalse(barrier.isBroken());
353 assertEquals(0, barrier.getNumberWaiting());
354 }
355 }
356
357 /**
358 * Reset of a barrier after interruption reinitializes it.
359 */
360 public void testResetAfterInterrupt() throws Exception {
361 final CyclicBarrier barrier = new CyclicBarrier(3);
362 for (int i = 0; i < 2; i++) {
363 final CyclicBarrier start = new CyclicBarrier(3);
364 Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
365 public void realRun() throws Exception {
366 start.await();
367 barrier.await();
368 }};
369
370 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
371 public void realRun() throws Exception {
372 start.await();
373 barrier.await();
374 }};
375
376 t1.start();
377 t2.start();
378 start.await();
379 t1.interrupt();
380 awaitTermination(t1);
381 awaitTermination(t2);
382 assertTrue(barrier.isBroken());
383 assertEquals(0, barrier.getNumberWaiting());
384 barrier.reset();
385 assertFalse(barrier.isBroken());
386 assertEquals(0, barrier.getNumberWaiting());
387 }
388 }
389
390 /**
391 * Reset of a barrier after timeout reinitializes it.
392 */
393 public void testResetAfterTimeout() throws Exception {
394 final CyclicBarrier barrier = new CyclicBarrier(3);
395 for (int i = 0; i < 2; i++) {
396 assertEquals(0, barrier.getNumberWaiting());
397 Thread t1 = newStartedThread(new CheckedRunnable() {
398 public void realRun() throws Exception {
399 try {
400 barrier.await();
401 shouldThrow();
402 } catch (BrokenBarrierException success) {}
403 }});
404 Thread t2 = newStartedThread(new CheckedRunnable() {
405 public void realRun() throws Exception {
406 awaitNumberWaiting(barrier, 1);
407 long startTime = System.nanoTime();
408 try {
409 barrier.await(timeoutMillis(), MILLISECONDS);
410 shouldThrow();
411 } catch (TimeoutException success) {}
412 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
413 }});
414
415 awaitTermination(t1);
416 awaitTermination(t2);
417 assertEquals(0, barrier.getNumberWaiting());
418 assertTrue(barrier.isBroken());
419 assertEquals(0, barrier.getNumberWaiting());
420 barrier.reset();
421 assertFalse(barrier.isBroken());
422 assertEquals(0, barrier.getNumberWaiting());
423 }
424 }
425
426 /**
427 * Reset of a barrier after a failed command reinitializes it.
428 */
429 public void testResetAfterCommandException() throws Exception {
430 final CyclicBarrier barrier =
431 new CyclicBarrier(3, new Runnable() {
432 public void run() {
433 throw new NullPointerException(); }});
434 for (int i = 0; i < 2; i++) {
435 final CyclicBarrier start = new CyclicBarrier(3);
436 Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
437 public void realRun() throws Exception {
438 start.await();
439 barrier.await();
440 }};
441
442 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
443 public void realRun() throws Exception {
444 start.await();
445 barrier.await();
446 }};
447
448 t1.start();
449 t2.start();
450 start.await();
451 awaitNumberWaiting(barrier, 2);
452 try {
453 barrier.await();
454 shouldThrow();
455 } catch (NullPointerException success) {}
456 awaitTermination(t1);
457 awaitTermination(t2);
458 assertTrue(barrier.isBroken());
459 assertEquals(0, barrier.getNumberWaiting());
460 barrier.reset();
461 assertFalse(barrier.isBroken());
462 assertEquals(0, barrier.getNumberWaiting());
463 }
464 }
465
466 /**
467 * There can be more threads calling await() than parties, as long as each
468 * task only calls await once and the task count is a multiple of parties.
469 */
470 public void testMoreTasksThanParties() throws Exception {
471 final ThreadLocalRandom rnd = ThreadLocalRandom.current();
472 final int parties = rnd.nextInt(1, 5);
473 final int nTasks = rnd.nextInt(1, 5) * parties;
474 final AtomicInteger tripCount = new AtomicInteger(0);
475 final AtomicInteger awaitCount = new AtomicInteger(0);
476 final CyclicBarrier barrier =
477 new CyclicBarrier(parties, () -> tripCount.getAndIncrement());
478 final ExecutorService e = Executors.newFixedThreadPool(nTasks);
479 final Runnable awaiter = () -> {
480 try {
481 if (randomBoolean())
482 barrier.await();
483 else
484 barrier.await(LONG_DELAY_MS, MILLISECONDS);
485 awaitCount.getAndIncrement();
486 } catch (Throwable fail) { threadUnexpectedException(fail); }};
487 try (PoolCleaner cleaner = cleaner(e)) {
488 for (int i = nTasks; i--> 0; )
489 e.execute(awaiter);
490 }
491 assertEquals(nTasks / parties, tripCount.get());
492 assertEquals(nTasks, awaitCount.get());
493 assertEquals(0, barrier.getNumberWaiting());
494 }
495 }