ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/CyclicBarrierTest.java
(Generate patch)

Comparing jsr166/src/test/tck/CyclicBarrierTest.java (file contents):
Revision 1.15 by jsr166, Wed Aug 25 00:07:03 2010 UTC vs.
Revision 1.34 by jsr166, Mon Sep 9 00:46:44 2019 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   * Other contributors include Andrew Wright, Jeffrey Hayes,
6   * Pat Fisher, Mike Judd.
7   */
8  
9 import junit.framework.*;
10 import java.util.*;
11 import java.util.concurrent.*;
12 import java.util.concurrent.locks.*;
13 import java.util.concurrent.atomic.*;
9   import static java.util.concurrent.TimeUnit.MILLISECONDS;
10  
11 + import java.util.concurrent.BrokenBarrierException;
12 + import java.util.concurrent.CountDownLatch;
13 + import java.util.concurrent.CyclicBarrier;
14 + import java.util.concurrent.ExecutorService;
15 + import java.util.concurrent.Executors;
16 + import java.util.concurrent.ThreadLocalRandom;
17 + import java.util.concurrent.TimeoutException;
18 + import java.util.concurrent.atomic.AtomicInteger;
19 +
20 + import junit.framework.Test;
21 + import junit.framework.TestSuite;
22 +
23   public class CyclicBarrierTest extends JSR166TestCase {
24      public static void main(String[] args) {
25 <        junit.textui.TestRunner.run(suite());
25 >        main(suite(), args);
26      }
27      public static Test suite() {
28          return new TestSuite(CyclicBarrierTest.class);
29      }
30  
31 <    private volatile int countAction;
32 <    private class MyAction implements Runnable {
33 <        public void run() { ++countAction; }
31 >    /**
32 >     * Spin-waits till the number of waiters == numberOfWaiters.
33 >     */
34 >    void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) {
35 >        long startTime = System.nanoTime();
36 >        while (barrier.getNumberWaiting() != numberOfWaiters) {
37 >            if (millisElapsedSince(startTime) > LONG_DELAY_MS)
38 >                fail("timed out");
39 >            Thread.yield();
40 >        }
41      }
42  
43      /**
44 <     * Creating with negative parties throws IAE
44 >     * Creating with negative parties throws IllegalArgumentException
45       */
46      public void testConstructor1() {
47          try {
# Line 37 | Line 51 | public class CyclicBarrierTest extends J
51      }
52  
53      /**
54 <     * Creating with negative parties and no action throws IAE
54 >     * Creating with negative parties and no action throws
55 >     * IllegalArgumentException
56       */
57      public void testConstructor2() {
58          try {
# Line 71 | Line 86 | public class CyclicBarrierTest extends J
86       * The supplied barrier action is run at barrier
87       */
88      public void testBarrierAction() throws Exception {
89 <        countAction = 0;
90 <        CyclicBarrier b = new CyclicBarrier(1, new MyAction());
89 >        final AtomicInteger count = new AtomicInteger(0);
90 >        final Runnable incCount = new Runnable() { public void run() {
91 >            count.getAndIncrement(); }};
92 >        CyclicBarrier b = new CyclicBarrier(1, incCount);
93          assertEquals(1, b.getParties());
94          assertEquals(0, b.getNumberWaiting());
95          b.await();
96          b.await();
97          assertEquals(0, b.getNumberWaiting());
98 <        assertEquals(countAction, 2);
98 >        assertEquals(2, count.get());
99      }
100  
101      /**
# Line 86 | Line 103 | public class CyclicBarrierTest extends J
103       */
104      public void testTwoParties() throws Exception {
105          final CyclicBarrier b = new CyclicBarrier(2);
106 <        Thread t = new Thread(new CheckedRunnable() {
106 >        Thread t = newStartedThread(new CheckedRunnable() {
107              public void realRun() throws Exception {
108                  b.await();
109                  b.await();
# Line 94 | Line 111 | public class CyclicBarrierTest extends J
111                  b.await();
112              }});
113  
97        t.start();
114          b.await();
115          b.await();
116          b.await();
117          b.await();
118 <        t.join();
118 >        awaitTermination(t);
119      }
120  
105
121      /**
122       * An interruption in one party causes others waiting in await to
123       * throw BrokenBarrierException
124       */
125 <    public void testAwait1_Interrupted_BrokenBarrier() throws Exception {
125 >    public void testAwait1_Interrupted_BrokenBarrier() {
126          final CyclicBarrier c = new CyclicBarrier(3);
127 +        final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
128          Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
129              public void realRun() throws Exception {
130 +                pleaseInterrupt.countDown();
131                  c.await();
132              }};
133          Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
134              public void realRun() throws Exception {
135 +                pleaseInterrupt.countDown();
136                  c.await();
137              }};
138  
139          t1.start();
140          t2.start();
141 <        Thread.sleep(SHORT_DELAY_MS);
141 >        await(pleaseInterrupt);
142          t1.interrupt();
143 <        t1.join();
144 <        t2.join();
143 >        awaitTermination(t1);
144 >        awaitTermination(t2);
145      }
146  
147      /**
# Line 132 | Line 150 | public class CyclicBarrierTest extends J
150       */
151      public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
152          final CyclicBarrier c = new CyclicBarrier(3);
153 +        final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
154          Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
155              public void realRun() throws Exception {
156 +                pleaseInterrupt.countDown();
157                  c.await(LONG_DELAY_MS, MILLISECONDS);
158              }};
159          Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
160              public void realRun() throws Exception {
161 +                pleaseInterrupt.countDown();
162                  c.await(LONG_DELAY_MS, MILLISECONDS);
163              }};
164  
165          t1.start();
166          t2.start();
167 <        Thread.sleep(SHORT_DELAY_MS);
167 >        await(pleaseInterrupt);
168          t1.interrupt();
169 <        t1.join();
170 <        t2.join();
169 >        awaitTermination(t1);
170 >        awaitTermination(t2);
171      }
172  
173      /**
174       * A timeout in timed await throws TimeoutException
175       */
176 <    public void testAwait3_TimeOutException() throws InterruptedException {
176 >    public void testAwait3_TimeoutException() throws InterruptedException {
177          final CyclicBarrier c = new CyclicBarrier(2);
178 <        Thread t = new ThreadShouldThrow(TimeoutException.class) {
178 >        Thread t = newStartedThread(new CheckedRunnable() {
179              public void realRun() throws Exception {
180 <                c.await(SHORT_DELAY_MS, MILLISECONDS);
181 <            }};
180 >                long startTime = System.nanoTime();
181 >                try {
182 >                    c.await(timeoutMillis(), MILLISECONDS);
183 >                    shouldThrow();
184 >                } catch (TimeoutException success) {}
185 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
186 >            }});
187  
188 <        t.start();
163 <        t.join();
188 >        awaitTermination(t);
189      }
190  
191      /**
# Line 169 | Line 194 | public class CyclicBarrierTest extends J
194       */
195      public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException {
196          final CyclicBarrier c = new CyclicBarrier(3);
197 <        Thread t1 = new ThreadShouldThrow(TimeoutException.class) {
197 >        Thread t1 = newStartedThread(new CheckedRunnable() {
198              public void realRun() throws Exception {
199 <                c.await(SHORT_DELAY_MS, MILLISECONDS);
200 <            }};
201 <        Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
199 >                try {
200 >                    c.await(LONG_DELAY_MS, MILLISECONDS);
201 >                    shouldThrow();
202 >                } catch (BrokenBarrierException success) {}
203 >            }});
204 >        Thread t2 = newStartedThread(new CheckedRunnable() {
205              public void realRun() throws Exception {
206 <                c.await(MEDIUM_DELAY_MS, MILLISECONDS);
207 <            }};
206 >                awaitNumberWaiting(c, 1);
207 >                long startTime = System.nanoTime();
208 >                try {
209 >                    c.await(timeoutMillis(), MILLISECONDS);
210 >                    shouldThrow();
211 >                } catch (TimeoutException success) {}
212 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
213 >            }});
214  
215 <        t1.start();
216 <        t2.start();
183 <        t1.join();
184 <        t2.join();
215 >        awaitTermination(t1);
216 >        awaitTermination(t2);
217      }
218  
219      /**
# Line 190 | Line 222 | public class CyclicBarrierTest extends J
222       */
223      public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException {
224          final CyclicBarrier c = new CyclicBarrier(3);
225 <        Thread t1 = new ThreadShouldThrow(TimeoutException.class) {
225 >        Thread t1 = newStartedThread(new CheckedRunnable() {
226              public void realRun() throws Exception {
227 <                c.await(SHORT_DELAY_MS, MILLISECONDS);
228 <            }};
229 <        Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
227 >                try {
228 >                    c.await();
229 >                    shouldThrow();
230 >                } catch (BrokenBarrierException success) {}
231 >            }});
232 >        Thread t2 = newStartedThread(new CheckedRunnable() {
233              public void realRun() throws Exception {
234 <                c.await();
235 <            }};
234 >                awaitNumberWaiting(c, 1);
235 >                long startTime = System.nanoTime();
236 >                try {
237 >                    c.await(timeoutMillis(), MILLISECONDS);
238 >                    shouldThrow();
239 >                } catch (TimeoutException success) {}
240 >                assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
241 >            }});
242  
243 <        t1.start();
244 <        t2.start();
204 <        t1.join();
205 <        t2.join();
243 >        awaitTermination(t1);
244 >        awaitTermination(t2);
245      }
246  
247      /**
# Line 211 | Line 250 | public class CyclicBarrierTest extends J
250       */
251      public void testReset_BrokenBarrier() throws InterruptedException {
252          final CyclicBarrier c = new CyclicBarrier(3);
253 +        final CountDownLatch pleaseReset = new CountDownLatch(2);
254          Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
255              public void realRun() throws Exception {
256 +                pleaseReset.countDown();
257                  c.await();
258              }};
259          Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
260              public void realRun() throws Exception {
261 +                pleaseReset.countDown();
262                  c.await();
263              }};
264  
265          t1.start();
266          t2.start();
267 <        Thread.sleep(SHORT_DELAY_MS);
267 >        await(pleaseReset);
268 >
269 >        awaitNumberWaiting(c, 2);
270          c.reset();
271 <        t1.join();
272 <        t2.join();
271 >        awaitTermination(t1);
272 >        awaitTermination(t2);
273      }
274  
275      /**
# Line 234 | Line 278 | public class CyclicBarrierTest extends J
278       */
279      public void testReset_NoBrokenBarrier() throws Exception {
280          final CyclicBarrier c = new CyclicBarrier(3);
281 <        Thread t1 = new Thread(new CheckedRunnable() {
281 >        c.reset();
282 >
283 >        Thread t1 = newStartedThread(new CheckedRunnable() {
284              public void realRun() throws Exception {
285                  c.await();
286              }});
287 <        Thread t2 = new Thread(new CheckedRunnable() {
287 >        Thread t2 = newStartedThread(new CheckedRunnable() {
288              public void realRun() throws Exception {
289                  c.await();
290              }});
291  
246        c.reset();
247        t1.start();
248        t2.start();
292          c.await();
293 <        t1.join();
294 <        t2.join();
252 <    }
253 <
254 <    /**
255 <     * All threads block while a barrier is broken.
256 <     */
257 <    public void testReset_Leakage() throws InterruptedException {
258 <        final CyclicBarrier c = new CyclicBarrier(2);
259 <        final AtomicBoolean done = new AtomicBoolean();
260 <        Thread t = new Thread() {
261 <                public void run() {
262 <                    while (!done.get()) {
263 <                        try {
264 <                            while (c.isBroken())
265 <                                c.reset();
266 <
267 <                            c.await();
268 <                            threadFail("await should not return");
269 <                        }
270 <                        catch (BrokenBarrierException e) {
271 <                        }
272 <                        catch (InterruptedException ie) {
273 <                        }
274 <                    }
275 <                }
276 <            };
277 <
278 <        t.start();
279 <        for (int i = 0; i < 4; i++) {
280 <            Thread.sleep(SHORT_DELAY_MS);
281 <            t.interrupt();
282 <        }
283 <        done.set(true);
284 <        t.interrupt();
285 <        t.join();
293 >        awaitTermination(t1);
294 >        awaitTermination(t2);
295      }
296  
297      /**
298       * Reset of a non-broken barrier does not break barrier
299       */
300      public void testResetWithoutBreakage() throws Exception {
292        final CyclicBarrier start = new CyclicBarrier(3);
301          final CyclicBarrier barrier = new CyclicBarrier(3);
302          for (int i = 0; i < 3; i++) {
303 <            Thread t1 = new Thread(new CheckedRunnable() {
303 >            final CyclicBarrier start = new CyclicBarrier(3);
304 >            Thread t1 = newStartedThread(new CheckedRunnable() {
305                  public void realRun() throws Exception {
306                      start.await();
307                      barrier.await();
308                  }});
309  
310 <            Thread t2 = new Thread(new CheckedRunnable() {
310 >            Thread t2 = newStartedThread(new CheckedRunnable() {
311                  public void realRun() throws Exception {
312                      start.await();
313                      barrier.await();
314                  }});
315  
307            t1.start();
308            t2.start();
316              start.await();
317              barrier.await();
318 <            t1.join();
319 <            t2.join();
318 >            awaitTermination(t1);
319 >            awaitTermination(t2);
320              assertFalse(barrier.isBroken());
321              assertEquals(0, barrier.getNumberWaiting());
322              if (i == 1) barrier.reset();
# Line 322 | Line 329 | public class CyclicBarrierTest extends J
329       * Reset of a barrier after interruption reinitializes it.
330       */
331      public void testResetAfterInterrupt() throws Exception {
325        final CyclicBarrier start = new CyclicBarrier(3);
332          final CyclicBarrier barrier = new CyclicBarrier(3);
333          for (int i = 0; i < 2; i++) {
334 +            final CyclicBarrier start = new CyclicBarrier(3);
335              Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
336                  public void realRun() throws Exception {
337                      start.await();
# Line 341 | Line 348 | public class CyclicBarrierTest extends J
348              t2.start();
349              start.await();
350              t1.interrupt();
351 <            t1.join();
352 <            t2.join();
351 >            awaitTermination(t1);
352 >            awaitTermination(t2);
353              assertTrue(barrier.isBroken());
354              assertEquals(0, barrier.getNumberWaiting());
355              barrier.reset();
# Line 355 | Line 362 | public class CyclicBarrierTest extends J
362       * Reset of a barrier after timeout reinitializes it.
363       */
364      public void testResetAfterTimeout() throws Exception {
358        final CyclicBarrier start = new CyclicBarrier(3);
365          final CyclicBarrier barrier = new CyclicBarrier(3);
366          for (int i = 0; i < 2; i++) {
367 <            Thread t1 = new ThreadShouldThrow(TimeoutException.class) {
368 <                    public void realRun() throws Exception {
363 <                        start.await();
364 <                        barrier.await(MEDIUM_DELAY_MS, MILLISECONDS);
365 <                    }};
366 <
367 <            Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
367 >            assertEquals(0, barrier.getNumberWaiting());
368 >            Thread t1 = newStartedThread(new CheckedRunnable() {
369                  public void realRun() throws Exception {
370 <                    start.await();
371 <                    barrier.await();
372 <                }};
370 >                    try {
371 >                        barrier.await();
372 >                        shouldThrow();
373 >                    } catch (BrokenBarrierException success) {}
374 >                }});
375 >            Thread t2 = newStartedThread(new CheckedRunnable() {
376 >                public void realRun() throws Exception {
377 >                    awaitNumberWaiting(barrier, 1);
378 >                    long startTime = System.nanoTime();
379 >                    try {
380 >                        barrier.await(timeoutMillis(), MILLISECONDS);
381 >                        shouldThrow();
382 >                    } catch (TimeoutException success) {}
383 >                    assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
384 >                }});
385  
386 <            t1.start();
387 <            t2.start();
388 <            start.await();
376 <            t1.join();
377 <            t2.join();
386 >            awaitTermination(t1);
387 >            awaitTermination(t2);
388 >            assertEquals(0, barrier.getNumberWaiting());
389              assertTrue(barrier.isBroken());
390              assertEquals(0, barrier.getNumberWaiting());
391              barrier.reset();
# Line 383 | Line 394 | public class CyclicBarrierTest extends J
394          }
395      }
396  
386
397      /**
398       * Reset of a barrier after a failed command reinitializes it.
399       */
400      public void testResetAfterCommandException() throws Exception {
391        final CyclicBarrier start = new CyclicBarrier(3);
401          final CyclicBarrier barrier =
402              new CyclicBarrier(3, new Runnable() {
403                      public void run() {
404                          throw new NullPointerException(); }});
405          for (int i = 0; i < 2; i++) {
406 +            final CyclicBarrier start = new CyclicBarrier(3);
407              Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
408                  public void realRun() throws Exception {
409                      start.await();
# Line 409 | Line 419 | public class CyclicBarrierTest extends J
419              t1.start();
420              t2.start();
421              start.await();
422 <            while (barrier.getNumberWaiting() < 2) { Thread.yield(); }
422 >            awaitNumberWaiting(barrier, 2);
423              try {
424                  barrier.await();
425                  shouldThrow();
426              } catch (NullPointerException success) {}
427 <            t1.join();
428 <            t2.join();
427 >            awaitTermination(t1);
428 >            awaitTermination(t2);
429              assertTrue(barrier.isBroken());
430              assertEquals(0, barrier.getNumberWaiting());
431              barrier.reset();
# Line 423 | Line 433 | public class CyclicBarrierTest extends J
433              assertEquals(0, barrier.getNumberWaiting());
434          }
435      }
436 +
437 +    /**
438 +     * There can be more threads calling await() than parties, as long as each
439 +     * task only calls await once and the task count is a multiple of parties.
440 +     */
441 +    public void testMoreTasksThanParties() throws Exception {
442 +        final ThreadLocalRandom rnd = ThreadLocalRandom.current();
443 +        final int parties = rnd.nextInt(1, 5);
444 +        final int nTasks = rnd.nextInt(1, 5) * parties;
445 +        final AtomicInteger tripCount = new AtomicInteger(0);
446 +        final AtomicInteger awaitCount = new AtomicInteger(0);
447 +        final CyclicBarrier barrier =
448 +            new CyclicBarrier(parties, () -> tripCount.getAndIncrement());
449 +        final ExecutorService e = Executors.newFixedThreadPool(nTasks);
450 +        final Runnable awaiter = () -> {
451 +            try {
452 +                if (randomBoolean())
453 +                    barrier.await();
454 +                else
455 +                    barrier.await(LONG_DELAY_MS, MILLISECONDS);
456 +                awaitCount.getAndIncrement();
457 +            } catch (Throwable fail) { threadUnexpectedException(fail); }};
458 +        try (PoolCleaner cleaner = cleaner(e)) {
459 +            for (int i = nTasks; i--> 0; )
460 +                e.execute(awaiter);
461 +        }
462 +        assertEquals(nTasks / parties, tripCount.get());
463 +        assertEquals(nTasks, awaitCount.get());
464 +        assertEquals(0, barrier.getNumberWaiting());
465 +    }
466   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines