11 |
|
import java.util.concurrent.BrokenBarrierException; |
12 |
|
import java.util.concurrent.CountDownLatch; |
13 |
|
import java.util.concurrent.CyclicBarrier; |
14 |
+ |
import java.util.concurrent.ExecutorService; |
15 |
+ |
import java.util.concurrent.Executors; |
16 |
+ |
import java.util.concurrent.ThreadLocalRandom; |
17 |
|
import java.util.concurrent.TimeoutException; |
18 |
|
import java.util.concurrent.atomic.AtomicBoolean; |
19 |
|
import java.util.concurrent.atomic.AtomicInteger; |
296 |
|
} |
297 |
|
|
298 |
|
/** |
296 |
– |
* All threads block while a barrier is broken. |
297 |
– |
*/ |
298 |
– |
public void testReset_Leakage() throws InterruptedException { |
299 |
– |
final CyclicBarrier c = new CyclicBarrier(2); |
300 |
– |
final AtomicBoolean done = new AtomicBoolean(); |
301 |
– |
Thread t = newStartedThread(new CheckedRunnable() { |
302 |
– |
public void realRun() { |
303 |
– |
while (!done.get()) { |
304 |
– |
try { |
305 |
– |
while (c.isBroken()) |
306 |
– |
c.reset(); |
307 |
– |
|
308 |
– |
c.await(); |
309 |
– |
shouldThrow(); |
310 |
– |
} |
311 |
– |
catch (BrokenBarrierException ok) {} |
312 |
– |
catch (InterruptedException ok) {} |
313 |
– |
}}}); |
314 |
– |
|
315 |
– |
for (int i = 0; i < 4; i++) { |
316 |
– |
delay(timeoutMillis()); |
317 |
– |
t.interrupt(); |
318 |
– |
} |
319 |
– |
done.set(true); |
320 |
– |
t.interrupt(); |
321 |
– |
awaitTermination(t); |
322 |
– |
} |
323 |
– |
|
324 |
– |
/** |
299 |
|
* Reset of a non-broken barrier does not break barrier |
300 |
|
*/ |
301 |
|
public void testResetWithoutBreakage() throws Exception { |
434 |
|
assertEquals(0, barrier.getNumberWaiting()); |
435 |
|
} |
436 |
|
} |
437 |
+ |
|
438 |
+ |
/** |
439 |
+ |
* There can be more threads calling await() than parties, as long as each |
440 |
+ |
* task only calls await once and the task count is a multiple of parties. |
441 |
+ |
*/ |
442 |
+ |
public void testMoreTasksThanParties() throws Exception { |
443 |
+ |
final ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
444 |
+ |
final int parties = rnd.nextInt(1, 5); |
445 |
+ |
final int nTasks = rnd.nextInt(1, 5) * parties; |
446 |
+ |
final AtomicInteger tripCount = new AtomicInteger(0); |
447 |
+ |
final AtomicInteger awaitCount = new AtomicInteger(0); |
448 |
+ |
final CyclicBarrier barrier = |
449 |
+ |
new CyclicBarrier(parties, () -> tripCount.getAndIncrement()); |
450 |
+ |
final ExecutorService e = Executors.newFixedThreadPool(nTasks); |
451 |
+ |
final Runnable awaiter = () -> { |
452 |
+ |
try { |
453 |
+ |
if (randomBoolean()) |
454 |
+ |
barrier.await(); |
455 |
+ |
else |
456 |
+ |
barrier.await(LONG_DELAY_MS, MILLISECONDS); |
457 |
+ |
awaitCount.getAndIncrement(); |
458 |
+ |
} catch (Throwable fail) { threadUnexpectedException(fail); }}; |
459 |
+ |
try (PoolCleaner cleaner = cleaner(e)) { |
460 |
+ |
for (int i = nTasks; i--> 0; ) |
461 |
+ |
e.execute(awaiter); |
462 |
+ |
} |
463 |
+ |
assertEquals(nTasks / parties, tripCount.get()); |
464 |
+ |
assertEquals(nTasks, awaitCount.get()); |
465 |
+ |
assertEquals(0, barrier.getNumberWaiting()); |
466 |
+ |
} |
467 |
|
} |