ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
Revision: 1.69
Committed: Wed Jan 27 01:57:24 2021 UTC (3 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.68: +37 -37 lines
Log Message:
use diamond <> pervasively

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import static java.util.concurrent.TimeUnit.MILLISECONDS;
10
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.Collection;
14 import java.util.Iterator;
15 import java.util.NoSuchElementException;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.SynchronousQueue;
21
22 import junit.framework.Test;
23
24 public class SynchronousQueueTest extends JSR166TestCase {
25
26 public static class Fair extends BlockingQueueTest {
27 protected BlockingQueue emptyCollection() {
28 return new SynchronousQueue(true);
29 }
30 }
31
32 public static class NonFair extends BlockingQueueTest {
33 protected BlockingQueue emptyCollection() {
34 return new SynchronousQueue(false);
35 }
36 }
37
38 public static void main(String[] args) {
39 main(suite(), args);
40 }
41
42 public static Test suite() {
43 return newTestSuite(SynchronousQueueTest.class,
44 new Fair().testSuite(),
45 new NonFair().testSuite());
46 }
47
48 /**
49 * Any SynchronousQueue is both empty and full
50 */
51 public void testEmptyFull() { testEmptyFull(false); }
52 public void testEmptyFull_fair() { testEmptyFull(true); }
53 public void testEmptyFull(boolean fair) {
54 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
55 assertTrue(q.isEmpty());
56 mustEqual(0, q.size());
57 mustEqual(0, q.remainingCapacity());
58 assertFalse(q.offer(zero));
59 }
60
61 /**
62 * offer fails if no active taker
63 */
64 public void testOffer() { testOffer(false); }
65 public void testOffer_fair() { testOffer(true); }
66 public void testOffer(boolean fair) {
67 SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
68 assertFalse(q.offer(one));
69 }
70
71 /**
72 * add throws IllegalStateException if no active taker
73 */
74 public void testAdd() { testAdd(false); }
75 public void testAdd_fair() { testAdd(true); }
76 public void testAdd(boolean fair) {
77 SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
78 mustEqual(0, q.remainingCapacity());
79 try {
80 q.add(one);
81 shouldThrow();
82 } catch (IllegalStateException success) {}
83 }
84
85 /**
86 * addAll(this) throws IllegalArgumentException
87 */
88 public void testAddAll_self() { testAddAll_self(false); }
89 public void testAddAll_self_fair() { testAddAll_self(true); }
90 public void testAddAll_self(boolean fair) {
91 SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
92 try {
93 q.addAll(q);
94 shouldThrow();
95 } catch (IllegalArgumentException success) {}
96 }
97
98 /**S
99 * addAll throws IllegalStateException if no active taker
100 */
101 public void testAddAll_ISE() { testAddAll_ISE(false); }
102 public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
103 public void testAddAll_ISE(boolean fair) {
104 SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
105 Item[] items = seqItems(1);
106 Collection<Item> coll = Arrays.asList(items);
107 try {
108 q.addAll(coll);
109 shouldThrow();
110 } catch (IllegalStateException success) {}
111 }
112
113 /**
114 * put blocks interruptibly if no active taker
115 */
116 public void testBlockingPut() { testBlockingPut(false); }
117 public void testBlockingPut_fair() { testBlockingPut(true); }
118 public void testBlockingPut(boolean fair) {
119 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
120 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
121 Thread t = newStartedThread(new CheckedRunnable() {
122 public void realRun() throws InterruptedException {
123 Thread.currentThread().interrupt();
124 try {
125 q.put(ninetynine);
126 shouldThrow();
127 } catch (InterruptedException success) {}
128 assertFalse(Thread.interrupted());
129
130 pleaseInterrupt.countDown();
131 try {
132 q.put(ninetynine);
133 shouldThrow();
134 } catch (InterruptedException success) {}
135 assertFalse(Thread.interrupted());
136 }});
137
138 await(pleaseInterrupt);
139 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
140 t.interrupt();
141 awaitTermination(t);
142 mustEqual(0, q.remainingCapacity());
143 }
144
145 /**
146 * put blocks interruptibly waiting for take
147 */
148 public void testPutWithTake() { testPutWithTake(false); }
149 public void testPutWithTake_fair() { testPutWithTake(true); }
150 public void testPutWithTake(boolean fair) {
151 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
152 final CountDownLatch pleaseTake = new CountDownLatch(1);
153 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
154 Thread t = newStartedThread(new CheckedRunnable() {
155 public void realRun() throws InterruptedException {
156 pleaseTake.countDown();
157 q.put(one);
158
159 Thread.currentThread().interrupt();
160 try {
161 q.put(ninetynine);
162 shouldThrow();
163 } catch (InterruptedException success) {}
164 assertFalse(Thread.interrupted());
165
166 pleaseInterrupt.countDown();
167 try {
168 q.put(ninetynine);
169 shouldThrow();
170 } catch (InterruptedException success) {}
171 assertFalse(Thread.interrupted());
172 }});
173
174 await(pleaseTake);
175 mustEqual(0, q.remainingCapacity());
176 try { assertSame(one, q.take()); }
177 catch (InterruptedException e) { threadUnexpectedException(e); }
178
179 await(pleaseInterrupt);
180 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
181 t.interrupt();
182 awaitTermination(t);
183 mustEqual(0, q.remainingCapacity());
184 }
185
186 /**
187 * timed offer times out if elements not taken
188 */
189 public void testTimedOffer() {
190 final boolean fair = randomBoolean();
191 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
192 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
193 Thread t = newStartedThread(new CheckedRunnable() {
194 public void realRun() throws InterruptedException {
195 long startTime = System.nanoTime();
196
197 assertFalse(q.offer(zero, timeoutMillis(), MILLISECONDS));
198 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
199
200 Thread.currentThread().interrupt();
201 try {
202 q.offer(one, randomTimeout(), randomTimeUnit());
203 shouldThrow();
204 } catch (InterruptedException success) {}
205 assertFalse(Thread.interrupted());
206
207 pleaseInterrupt.countDown();
208 try {
209 q.offer(two, LONGER_DELAY_MS, MILLISECONDS);
210 shouldThrow();
211 } catch (InterruptedException success) {}
212 assertFalse(Thread.interrupted());
213 }});
214
215 await(pleaseInterrupt);
216 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
217 t.interrupt();
218 awaitTermination(t);
219 }
220
221 /**
222 * poll return null if no active putter
223 */
224 public void testPoll() { testPoll(false); }
225 public void testPoll_fair() { testPoll(true); }
226 public void testPoll(boolean fair) {
227 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
228 assertNull(q.poll());
229 }
230
231 /**
232 * timed poll with zero timeout times out if no active putter
233 */
234 public void testTimedPoll0() { testTimedPoll0(false); }
235 public void testTimedPoll0_fair() { testTimedPoll0(true); }
236 public void testTimedPoll0(boolean fair) {
237 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
238 try { assertNull(q.poll(0, MILLISECONDS)); }
239 catch (InterruptedException e) { threadUnexpectedException(e); }
240 }
241
242 /**
243 * timed poll with nonzero timeout times out if no active putter
244 */
245 public void testTimedPoll() {
246 final boolean fair = randomBoolean();
247 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
248 final long startTime = System.nanoTime();
249 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
250 catch (InterruptedException e) { threadUnexpectedException(e); }
251 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
252 }
253
254 /**
255 * timed poll before a delayed offer times out, returning null;
256 * after offer succeeds; on interruption throws
257 */
258 public void testTimedPollWithOffer() {
259 final boolean fair = randomBoolean();
260 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
261 final CountDownLatch pleaseOffer = new CountDownLatch(1);
262 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
263 Thread t = newStartedThread(new CheckedRunnable() {
264 public void realRun() throws InterruptedException {
265 long startTime = System.nanoTime();
266 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
268
269 pleaseOffer.countDown();
270 startTime = System.nanoTime();
271 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
272
273 Thread.currentThread().interrupt();
274 try {
275 q.poll(randomTimeout(), randomTimeUnit());
276 shouldThrow();
277 } catch (InterruptedException success) {}
278 assertFalse(Thread.interrupted());
279
280 pleaseInterrupt.countDown();
281 try {
282 q.poll(LONG_DELAY_MS, MILLISECONDS);
283 shouldThrow();
284 } catch (InterruptedException success) {}
285 assertFalse(Thread.interrupted());
286
287 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
288 }});
289
290 await(pleaseOffer);
291 long startTime = System.nanoTime();
292 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
293 catch (InterruptedException e) { threadUnexpectedException(e); }
294 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
295
296 await(pleaseInterrupt);
297 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
298 t.interrupt();
299 awaitTermination(t);
300 }
301
302 /**
303 * peek() returns null if no active putter
304 */
305 public void testPeek() { testPeek(false); }
306 public void testPeek_fair() { testPeek(true); }
307 public void testPeek(boolean fair) {
308 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
309 assertNull(q.peek());
310 }
311
312 /**
313 * element() throws NoSuchElementException if no active putter
314 */
315 public void testElement() { testElement(false); }
316 public void testElement_fair() { testElement(true); }
317 public void testElement(boolean fair) {
318 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
319 try {
320 q.element();
321 shouldThrow();
322 } catch (NoSuchElementException success) {}
323 }
324
325 /**
326 * remove() throws NoSuchElementException if no active putter
327 */
328 public void testRemove() { testRemove(false); }
329 public void testRemove_fair() { testRemove(true); }
330 public void testRemove(boolean fair) {
331 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
332 try {
333 q.remove();
334 shouldThrow();
335 } catch (NoSuchElementException success) {}
336 }
337
338 /**
339 * contains returns false
340 */
341 public void testContains() { testContains(false); }
342 public void testContains_fair() { testContains(true); }
343 public void testContains(boolean fair) {
344 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
345 assertFalse(q.contains(zero));
346 }
347
348 /**
349 * clear ensures isEmpty
350 */
351 public void testClear() { testClear(false); }
352 public void testClear_fair() { testClear(true); }
353 public void testClear(boolean fair) {
354 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
355 q.clear();
356 assertTrue(q.isEmpty());
357 }
358
359 /**
360 * containsAll returns false unless empty
361 */
362 public void testContainsAll() { testContainsAll(false); }
363 public void testContainsAll_fair() { testContainsAll(true); }
364 public void testContainsAll(boolean fair) {
365 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
366 Item[] empty = new Item[0];
367 assertTrue(q.containsAll(Arrays.asList(empty)));
368 Item[] items = new Item[1]; items[0] = zero;
369 assertFalse(q.containsAll(Arrays.asList(items)));
370 }
371
372 /**
373 * retainAll returns false
374 */
375 public void testRetainAll() { testRetainAll(false); }
376 public void testRetainAll_fair() { testRetainAll(true); }
377 public void testRetainAll(boolean fair) {
378 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
379 Item[] empty = new Item[0];
380 assertFalse(q.retainAll(Arrays.asList(empty)));
381 Item[] items = new Item[1]; items[0] = zero;
382 assertFalse(q.retainAll(Arrays.asList(items)));
383 }
384
385 /**
386 * removeAll returns false
387 */
388 public void testRemoveAll() { testRemoveAll(false); }
389 public void testRemoveAll_fair() { testRemoveAll(true); }
390 public void testRemoveAll(boolean fair) {
391 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
392 Item[] empty = new Item[0];
393 assertFalse(q.removeAll(Arrays.asList(empty)));
394 Item[] items = new Item[1]; items[0] = zero;
395 assertFalse(q.containsAll(Arrays.asList(items)));
396 }
397
398 /**
399 * toArray is empty
400 */
401 public void testToArray() { testToArray(false); }
402 public void testToArray_fair() { testToArray(true); }
403 public void testToArray(boolean fair) {
404 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
405 Object[] o = q.toArray();
406 mustEqual(0, o.length);
407 }
408
409 /**
410 * toArray(Item array) returns its argument with the first
411 * element (if present) nulled out
412 */
413 public void testToArray2() { testToArray2(false); }
414 public void testToArray2_fair() { testToArray2(true); }
415 public void testToArray2(boolean fair) {
416 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
417 Item[] a;
418
419 a = new Item[0];
420 assertSame(a, q.toArray(a));
421
422 a = new Item[3];
423 Arrays.fill(a, fortytwo);
424 assertSame(a, q.toArray(a));
425 assertNull(a[0]);
426 for (int i = 1; i < a.length; i++)
427 mustEqual(42, a[i]);
428 }
429
430 /**
431 * toArray(null) throws NPE
432 */
433 public void testToArray_null() { testToArray_null(false); }
434 public void testToArray_null_fair() { testToArray_null(true); }
435 public void testToArray_null(boolean fair) {
436 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
437 try {
438 Object[] unused = q.toArray((Object[])null);
439 shouldThrow();
440 } catch (NullPointerException success) {}
441 }
442
443 /**
444 * iterator does not traverse any elements
445 */
446 public void testIterator() { testIterator(false); }
447 public void testIterator_fair() { testIterator(true); }
448 public void testIterator(boolean fair) {
449 assertIteratorExhausted(new SynchronousQueue<Item>(fair).iterator());
450 }
451
452 /**
453 * iterator remove throws IllegalStateException
454 */
455 public void testIteratorRemove() { testIteratorRemove(false); }
456 public void testIteratorRemove_fair() { testIteratorRemove(true); }
457 public void testIteratorRemove(boolean fair) {
458 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
459 Iterator<? extends Item> it = q.iterator();
460 try {
461 it.remove();
462 shouldThrow();
463 } catch (IllegalStateException success) {}
464 }
465
466 /**
467 * toString returns a non-null string
468 */
469 public void testToString() { testToString(false); }
470 public void testToString_fair() { testToString(true); }
471 public void testToString(boolean fair) {
472 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
473 String s = q.toString();
474 assertNotNull(s);
475 }
476
477 /**
478 * offer transfers elements across Executor tasks
479 */
480 public void testOfferInExecutor() { testOfferInExecutor(false); }
481 public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
482 public void testOfferInExecutor(boolean fair) {
483 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
484 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
485 final ExecutorService executor = Executors.newFixedThreadPool(2);
486 try (PoolCleaner cleaner = cleaner(executor)) {
487
488 executor.execute(new CheckedRunnable() {
489 public void realRun() throws InterruptedException {
490 assertFalse(q.offer(one));
491 threadsStarted.await();
492 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
493 mustEqual(0, q.remainingCapacity());
494 }});
495
496 executor.execute(new CheckedRunnable() {
497 public void realRun() throws InterruptedException {
498 threadsStarted.await();
499 assertSame(one, q.take());
500 }});
501 }
502 }
503
504 /**
505 * timed poll retrieves elements across Executor threads
506 */
507 public void testPollInExecutor() { testPollInExecutor(false); }
508 public void testPollInExecutor_fair() { testPollInExecutor(true); }
509 public void testPollInExecutor(boolean fair) {
510 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
511 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
512 final ExecutorService executor = Executors.newFixedThreadPool(2);
513 try (PoolCleaner cleaner = cleaner(executor)) {
514 executor.execute(new CheckedRunnable() {
515 public void realRun() throws InterruptedException {
516 assertNull(q.poll());
517 threadsStarted.await();
518 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
519 assertTrue(q.isEmpty());
520 }});
521
522 executor.execute(new CheckedRunnable() {
523 public void realRun() throws InterruptedException {
524 threadsStarted.await();
525 q.put(one);
526 }});
527 }
528 }
529
530 /**
531 * a deserialized/reserialized queue is usable
532 */
533 public void testSerialization() {
534 final SynchronousQueue<Item> x = new SynchronousQueue<>();
535 final SynchronousQueue<Item> y = new SynchronousQueue<>(false);
536 final SynchronousQueue<Item> z = new SynchronousQueue<>(true);
537 assertSerialEquals(x, y);
538 assertNotSerialEquals(x, z);
539 SynchronousQueue[] rqs = { x, y, z };
540 @SuppressWarnings("unchecked")
541 SynchronousQueue<Item>[] qs = (SynchronousQueue<Item>[])rqs;
542 for (SynchronousQueue<Item> q : qs) {
543 SynchronousQueue<Item> clone = serialClone(q);
544 assertNotSame(q, clone);
545 assertSerialEquals(q, clone);
546 assertTrue(clone.isEmpty());
547 mustEqual(0, clone.size());
548 mustEqual(0, clone.remainingCapacity());
549 assertFalse(clone.offer(zero));
550 }
551 }
552
553 /**
554 * drainTo(c) of empty queue doesn't transfer elements
555 */
556 public void testDrainTo() { testDrainTo(false); }
557 public void testDrainTo_fair() { testDrainTo(true); }
558 public void testDrainTo(boolean fair) {
559 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
560 ArrayList<Item> l = new ArrayList<>();
561 q.drainTo(l);
562 mustEqual(0, q.size());
563 mustEqual(0, l.size());
564 }
565
566 /**
567 * drainTo empties queue, unblocking a waiting put.
568 */
569 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); }
570 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
571 public void testDrainToWithActivePut(boolean fair) {
572 final SynchronousQueue<Item> q = new SynchronousQueue<>(fair);
573 Thread t = newStartedThread(new CheckedRunnable() {
574 public void realRun() throws InterruptedException {
575 q.put(one);
576 }});
577
578 ArrayList<Item> l = new ArrayList<>();
579 long startTime = System.nanoTime();
580 while (l.isEmpty()) {
581 q.drainTo(l);
582 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
583 fail("timed out");
584 Thread.yield();
585 }
586 mustEqual(1, l.size());
587 assertSame(one, l.get(0));
588 awaitTermination(t);
589 }
590
591 /**
592 * drainTo(c, n) empties up to n elements of queue into c
593 */
594 public void testDrainToN() throws InterruptedException {
595 final SynchronousQueue<Item> q = new SynchronousQueue<>();
596 Thread t1 = newStartedThread(new CheckedRunnable() {
597 public void realRun() throws InterruptedException {
598 q.put(one);
599 }});
600
601 Thread t2 = newStartedThread(new CheckedRunnable() {
602 public void realRun() throws InterruptedException {
603 q.put(two);
604 }});
605
606 ArrayList<Item> l = new ArrayList<>();
607 int drained;
608 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
609 mustEqual(1, drained);
610 mustEqual(1, l.size());
611 while ((drained = q.drainTo(l, 1)) == 0) Thread.yield();
612 mustEqual(1, drained);
613 mustEqual(2, l.size());
614 assertTrue(l.contains(one));
615 assertTrue(l.contains(two));
616 awaitTermination(t1);
617 awaitTermination(t2);
618 }
619
620 /**
621 * remove(null), contains(null) always return false
622 */
623 public void testNeverContainsNull() {
624 Collection<?> q = new SynchronousQueue<>();
625 assertFalse(q.contains(null));
626 assertFalse(q.remove(null));
627 }
628
629 }