ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
Revision: 1.37
Committed: Mon May 30 22:43:20 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.36: +10 -118 lines
Log Message:
refactor more generic BlockingQueue tests into BlockingQueueTest.java

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import junit.framework.*;
10 import java.util.Arrays;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.SynchronousQueue;
20 import static java.util.concurrent.TimeUnit.MILLISECONDS;
21 import java.io.*;
22
23 public class SynchronousQueueTest extends JSR166TestCase {
24
25 public static class Fair extends BlockingQueueTest {
26 protected BlockingQueue emptyCollection() {
27 return new SynchronousQueue(true);
28 }
29 }
30
31 public static class NonFair extends BlockingQueueTest {
32 protected BlockingQueue emptyCollection() {
33 return new SynchronousQueue(false);
34 }
35 }
36
37 public static void main(String[] args) {
38 junit.textui.TestRunner.run(suite());
39 }
40
41 public static Test suite() {
42 return newTestSuite(SynchronousQueueTest.class,
43 new Fair().testSuite(),
44 new NonFair().testSuite());
45 }
46
47 /**
48 * Any SynchronousQueue is both empty and full
49 */
50 public void testEmptyFull() { testEmptyFull(false); }
51 public void testEmptyFull_fair() { testEmptyFull(true); }
52 public void testEmptyFull(boolean fair) {
53 final SynchronousQueue q = new SynchronousQueue(fair);
54 assertTrue(q.isEmpty());
55 assertEquals(0, q.size());
56 assertEquals(0, q.remainingCapacity());
57 assertFalse(q.offer(zero));
58 }
59
60 /**
61 * offer fails if no active taker
62 */
63 public void testOffer() { testOffer(false); }
64 public void testOffer_fair() { testOffer(true); }
65 public void testOffer(boolean fair) {
66 SynchronousQueue q = new SynchronousQueue(fair);
67 assertFalse(q.offer(one));
68 }
69
70 /**
71 * add throws IllegalStateException if no active taker
72 */
73 public void testAdd() { testAdd(false); }
74 public void testAdd_fair() { testAdd(true); }
75 public void testAdd(boolean fair) {
76 SynchronousQueue q = new SynchronousQueue(fair);
77 assertEquals(0, q.remainingCapacity());
78 try {
79 q.add(one);
80 shouldThrow();
81 } catch (IllegalStateException success) {}
82 }
83
84 /**
85 * addAll(this) throws IllegalArgumentException
86 */
87 public void testAddAll_self() { testAddAll_self(false); }
88 public void testAddAll_self_fair() { testAddAll_self(true); }
89 public void testAddAll_self(boolean fair) {
90 SynchronousQueue q = new SynchronousQueue(fair);
91 try {
92 q.addAll(q);
93 shouldThrow();
94 } catch (IllegalArgumentException success) {}
95 }
96
97 /**
98 * addAll throws ISE if no active taker
99 */
100 public void testAddAll_ISE() { testAddAll_ISE(false); }
101 public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
102 public void testAddAll_ISE(boolean fair) {
103 SynchronousQueue q = new SynchronousQueue(fair);
104 Integer[] ints = new Integer[1];
105 for (int i = 0; i < ints.length; i++)
106 ints[i] = i;
107 Collection<Integer> coll = Arrays.asList(ints);
108 try {
109 q.addAll(coll);
110 shouldThrow();
111 } catch (IllegalStateException success) {}
112 }
113
114 /**
115 * put blocks interruptibly if no active taker
116 */
117 public void testBlockingPut() { testBlockingPut(false); }
118 public void testBlockingPut_fair() { testBlockingPut(true); }
119 public void testBlockingPut(boolean fair) {
120 final SynchronousQueue q = new SynchronousQueue(fair);
121 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
122 Thread t = newStartedThread(new CheckedRunnable() {
123 public void realRun() throws InterruptedException {
124 Thread.currentThread().interrupt();
125 try {
126 q.put(99);
127 shouldThrow();
128 } catch (InterruptedException success) {}
129 assertFalse(Thread.interrupted());
130
131 pleaseInterrupt.countDown();
132 try {
133 q.put(99);
134 shouldThrow();
135 } catch (InterruptedException success) {}
136 assertFalse(Thread.interrupted());
137 }});
138
139 await(pleaseInterrupt);
140 assertThreadStaysAlive(t);
141 t.interrupt();
142 awaitTermination(t);
143 assertEquals(0, q.remainingCapacity());
144 }
145
146 /**
147 * put blocks interruptibly waiting for take
148 */
149 public void testPutWithTake() { testPutWithTake(false); }
150 public void testPutWithTake_fair() { testPutWithTake(true); }
151 public void testPutWithTake(boolean fair) {
152 final SynchronousQueue q = new SynchronousQueue(fair);
153 final CountDownLatch pleaseTake = new CountDownLatch(1);
154 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
155 Thread t = newStartedThread(new CheckedRunnable() {
156 public void realRun() throws InterruptedException {
157 pleaseTake.countDown();
158 q.put(one);
159
160 pleaseInterrupt.countDown();
161 try {
162 q.put(99);
163 shouldThrow();
164 } catch (InterruptedException success) {}
165 assertFalse(Thread.interrupted());
166 }});
167
168 await(pleaseTake);
169 assertEquals(q.remainingCapacity(), 0);
170 try { assertSame(one, q.take()); }
171 catch (InterruptedException e) { threadUnexpectedException(e); }
172
173 await(pleaseInterrupt);
174 assertThreadStaysAlive(t);
175 t.interrupt();
176 awaitTermination(t);
177 assertEquals(q.remainingCapacity(), 0);
178 }
179
180 /**
181 * timed offer times out if elements not taken
182 */
183 public void testTimedOffer() { testTimedOffer(false); }
184 public void testTimedOffer_fair() { testTimedOffer(true); }
185 public void testTimedOffer(boolean fair) {
186 final SynchronousQueue q = new SynchronousQueue(fair);
187 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
188 Thread t = newStartedThread(new CheckedRunnable() {
189 public void realRun() throws InterruptedException {
190 long startTime = System.nanoTime();
191 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
192 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
193 pleaseInterrupt.countDown();
194 try {
195 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
196 shouldThrow();
197 } catch (InterruptedException success) {}
198 }});
199
200 await(pleaseInterrupt);
201 assertThreadStaysAlive(t);
202 t.interrupt();
203 awaitTermination(t);
204 }
205
206 /**
207 * poll return null if no active putter
208 */
209 public void testPoll() { testPoll(false); }
210 public void testPoll_fair() { testPoll(true); }
211 public void testPoll(boolean fair) {
212 final SynchronousQueue q = new SynchronousQueue(fair);
213 assertNull(q.poll());
214 }
215
216 /**
217 * timed poll with zero timeout times out if no active putter
218 */
219 public void testTimedPoll0() { testTimedPoll0(false); }
220 public void testTimedPoll0_fair() { testTimedPoll0(true); }
221 public void testTimedPoll0(boolean fair) {
222 final SynchronousQueue q = new SynchronousQueue(fair);
223 try { assertNull(q.poll(0, MILLISECONDS)); }
224 catch (InterruptedException e) { threadUnexpectedException(e); }
225 }
226
227 /**
228 * timed poll with nonzero timeout times out if no active putter
229 */
230 public void testTimedPoll() { testTimedPoll(false); }
231 public void testTimedPoll_fair() { testTimedPoll(true); }
232 public void testTimedPoll(boolean fair) {
233 final SynchronousQueue q = new SynchronousQueue(fair);
234 long startTime = System.nanoTime();
235 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
236 catch (InterruptedException e) { threadUnexpectedException(e); }
237 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
238 }
239
240 /**
241 * timed poll before a delayed offer times out, returning null;
242 * after offer succeeds; on interruption throws
243 */
244 public void testTimedPollWithOffer() { testTimedPollWithOffer(false); }
245 public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
246 public void testTimedPollWithOffer(boolean fair) {
247 final SynchronousQueue q = new SynchronousQueue(fair);
248 final CountDownLatch pleaseOffer = new CountDownLatch(1);
249 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
250 Thread t = newStartedThread(new CheckedRunnable() {
251 public void realRun() throws InterruptedException {
252 long startTime = System.nanoTime();
253 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
254 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
255
256 pleaseOffer.countDown();
257 startTime = System.nanoTime();
258 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
259 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
260
261 Thread.currentThread().interrupt();
262 try {
263 q.poll(LONG_DELAY_MS, MILLISECONDS);
264 shouldThrow();
265 } catch (InterruptedException success) {}
266 assertFalse(Thread.interrupted());
267
268 pleaseInterrupt.countDown();
269 try {
270 q.poll(LONG_DELAY_MS, MILLISECONDS);
271 shouldThrow();
272 } catch (InterruptedException success) {}
273 assertFalse(Thread.interrupted());
274 }});
275
276 await(pleaseOffer);
277 long startTime = System.nanoTime();
278 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
279 catch (InterruptedException e) { threadUnexpectedException(e); }
280 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
281
282 await(pleaseInterrupt);
283 assertThreadStaysAlive(t);
284 t.interrupt();
285 awaitTermination(t);
286 }
287
288 /**
289 * peek() returns null if no active putter
290 */
291 public void testPeek() { testPeek(false); }
292 public void testPeek_fair() { testPeek(true); }
293 public void testPeek(boolean fair) {
294 final SynchronousQueue q = new SynchronousQueue(fair);
295 assertNull(q.peek());
296 }
297
298 /**
299 * element() throws NoSuchElementException if no active putter
300 */
301 public void testElement() { testElement(false); }
302 public void testElement_fair() { testElement(true); }
303 public void testElement(boolean fair) {
304 final SynchronousQueue q = new SynchronousQueue(fair);
305 try {
306 q.element();
307 shouldThrow();
308 } catch (NoSuchElementException success) {}
309 }
310
311 /**
312 * remove() throws NoSuchElementException if no active putter
313 */
314 public void testRemove() { testRemove(false); }
315 public void testRemove_fair() { testRemove(true); }
316 public void testRemove(boolean fair) {
317 final SynchronousQueue q = new SynchronousQueue(fair);
318 try {
319 q.remove();
320 shouldThrow();
321 } catch (NoSuchElementException success) {}
322 }
323
324 /**
325 * remove(x) returns false
326 */
327 public void testRemoveElement() { testRemoveElement(false); }
328 public void testRemoveElement_fair() { testRemoveElement(true); }
329 public void testRemoveElement(boolean fair) {
330 final SynchronousQueue q = new SynchronousQueue(fair);
331 assertFalse(q.remove(zero));
332 assertTrue(q.isEmpty());
333 }
334
335 /**
336 * contains returns false
337 */
338 public void testContains() { testContains(false); }
339 public void testContains_fair() { testContains(true); }
340 public void testContains(boolean fair) {
341 final SynchronousQueue q = new SynchronousQueue(fair);
342 assertFalse(q.contains(zero));
343 }
344
345 /**
346 * clear ensures isEmpty
347 */
348 public void testClear() { testClear(false); }
349 public void testClear_fair() { testClear(true); }
350 public void testClear(boolean fair) {
351 final SynchronousQueue q = new SynchronousQueue(fair);
352 q.clear();
353 assertTrue(q.isEmpty());
354 }
355
356 /**
357 * containsAll returns false unless empty
358 */
359 public void testContainsAll() { testContainsAll(false); }
360 public void testContainsAll_fair() { testContainsAll(true); }
361 public void testContainsAll(boolean fair) {
362 final SynchronousQueue q = new SynchronousQueue(fair);
363 Integer[] empty = new Integer[0];
364 assertTrue(q.containsAll(Arrays.asList(empty)));
365 Integer[] ints = new Integer[1]; ints[0] = zero;
366 assertFalse(q.containsAll(Arrays.asList(ints)));
367 }
368
369 /**
370 * retainAll returns false
371 */
372 public void testRetainAll() { testRetainAll(false); }
373 public void testRetainAll_fair() { testRetainAll(true); }
374 public void testRetainAll(boolean fair) {
375 final SynchronousQueue q = new SynchronousQueue(fair);
376 Integer[] empty = new Integer[0];
377 assertFalse(q.retainAll(Arrays.asList(empty)));
378 Integer[] ints = new Integer[1]; ints[0] = zero;
379 assertFalse(q.retainAll(Arrays.asList(ints)));
380 }
381
382 /**
383 * removeAll returns false
384 */
385 public void testRemoveAll() { testRemoveAll(false); }
386 public void testRemoveAll_fair() { testRemoveAll(true); }
387 public void testRemoveAll(boolean fair) {
388 final SynchronousQueue q = new SynchronousQueue(fair);
389 Integer[] empty = new Integer[0];
390 assertFalse(q.removeAll(Arrays.asList(empty)));
391 Integer[] ints = new Integer[1]; ints[0] = zero;
392 assertFalse(q.containsAll(Arrays.asList(ints)));
393 }
394
395 /**
396 * toArray is empty
397 */
398 public void testToArray() { testToArray(false); }
399 public void testToArray_fair() { testToArray(true); }
400 public void testToArray(boolean fair) {
401 final SynchronousQueue q = new SynchronousQueue(fair);
402 Object[] o = q.toArray();
403 assertEquals(o.length, 0);
404 }
405
406 /**
407 * toArray(a) is nulled at position 0
408 */
409 public void testToArray2() { testToArray2(false); }
410 public void testToArray2_fair() { testToArray2(true); }
411 public void testToArray2(boolean fair) {
412 final SynchronousQueue q = new SynchronousQueue(fair);
413 Integer[] ints = new Integer[1];
414 assertNull(ints[0]);
415 }
416
417 /**
418 * toArray(null) throws NPE
419 */
420 public void testToArray_null() { testToArray_null(false); }
421 public void testToArray_null_fair() { testToArray_null(true); }
422 public void testToArray_null(boolean fair) {
423 final SynchronousQueue q = new SynchronousQueue(fair);
424 try {
425 Object o[] = q.toArray(null);
426 shouldThrow();
427 } catch (NullPointerException success) {}
428 }
429
430 /**
431 * iterator does not traverse any elements
432 */
433 public void testIterator() { testIterator(false); }
434 public void testIterator_fair() { testIterator(true); }
435 public void testIterator(boolean fair) {
436 final SynchronousQueue q = new SynchronousQueue(fair);
437 Iterator it = q.iterator();
438 assertFalse(it.hasNext());
439 try {
440 Object x = it.next();
441 shouldThrow();
442 } catch (NoSuchElementException success) {}
443 }
444
445 /**
446 * iterator remove throws ISE
447 */
448 public void testIteratorRemove() { testIteratorRemove(false); }
449 public void testIteratorRemove_fair() { testIteratorRemove(true); }
450 public void testIteratorRemove(boolean fair) {
451 final SynchronousQueue q = new SynchronousQueue(fair);
452 Iterator it = q.iterator();
453 try {
454 it.remove();
455 shouldThrow();
456 } catch (IllegalStateException success) {}
457 }
458
459 /**
460 * toString returns a non-null string
461 */
462 public void testToString() { testToString(false); }
463 public void testToString_fair() { testToString(true); }
464 public void testToString(boolean fair) {
465 final SynchronousQueue q = new SynchronousQueue(fair);
466 String s = q.toString();
467 assertNotNull(s);
468 }
469
470 /**
471 * offer transfers elements across Executor tasks
472 */
473 public void testOfferInExecutor() { testOfferInExecutor(false); }
474 public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
475 public void testOfferInExecutor(boolean fair) {
476 final SynchronousQueue q = new SynchronousQueue(fair);
477 ExecutorService executor = Executors.newFixedThreadPool(2);
478 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
479
480 executor.execute(new CheckedRunnable() {
481 public void realRun() throws InterruptedException {
482 assertFalse(q.offer(one));
483 threadsStarted.await();
484 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
485 assertEquals(0, q.remainingCapacity());
486 }});
487
488 executor.execute(new CheckedRunnable() {
489 public void realRun() throws InterruptedException {
490 threadsStarted.await();
491 assertSame(one, q.take());
492 }});
493
494 joinPool(executor);
495 }
496
497 /**
498 * timed poll retrieves elements across Executor threads
499 */
500 public void testPollInExecutor() { testPollInExecutor(false); }
501 public void testPollInExecutor_fair() { testPollInExecutor(true); }
502 public void testPollInExecutor(boolean fair) {
503 final SynchronousQueue q = new SynchronousQueue(fair);
504 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
505 ExecutorService executor = Executors.newFixedThreadPool(2);
506 executor.execute(new CheckedRunnable() {
507 public void realRun() throws InterruptedException {
508 assertNull(q.poll());
509 threadsStarted.await();
510 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
511 assertTrue(q.isEmpty());
512 }});
513
514 executor.execute(new CheckedRunnable() {
515 public void realRun() throws InterruptedException {
516 threadsStarted.await();
517 q.put(one);
518 }});
519
520 joinPool(executor);
521 }
522
523 /**
524 * a deserialized serialized queue is usable
525 */
526 public void testSerialization() { testSerialization(false); }
527 public void testSerialization_fair() { testSerialization(true); }
528 public void testSerialization(boolean fair) {
529 final SynchronousQueue q = new SynchronousQueue(fair);
530 final SynchronousQueue r = serialClone(q);
531 assertTrue(q != r);
532 assertEquals(q.size(), r.size());
533 while (!q.isEmpty())
534 assertEquals(q.remove(), r.remove());
535 }
536
537 /**
538 * drainTo(c) of empty queue doesn't transfer elements
539 */
540 public void testDrainTo() { testDrainTo(false); }
541 public void testDrainTo_fair() { testDrainTo(true); }
542 public void testDrainTo(boolean fair) {
543 final SynchronousQueue q = new SynchronousQueue(fair);
544 ArrayList l = new ArrayList();
545 q.drainTo(l);
546 assertEquals(q.size(), 0);
547 assertEquals(l.size(), 0);
548 }
549
550 /**
551 * drainTo empties queue, unblocking a waiting put.
552 */
553 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); }
554 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
555 public void testDrainToWithActivePut(boolean fair) {
556 final SynchronousQueue q = new SynchronousQueue(fair);
557 Thread t = newStartedThread(new CheckedRunnable() {
558 public void realRun() throws InterruptedException {
559 q.put(one);
560 }});
561
562 ArrayList l = new ArrayList();
563 long startTime = System.nanoTime();
564 while (l.isEmpty()) {
565 q.drainTo(l);
566 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
567 fail("timed out");
568 Thread.yield();
569 }
570 assertTrue(l.size() == 1);
571 assertSame(one, l.get(0));
572 awaitTermination(t);
573 }
574
575 /**
576 * drainTo(c, n) empties up to n elements of queue into c
577 */
578 public void testDrainToN() throws InterruptedException {
579 final SynchronousQueue q = new SynchronousQueue();
580 Thread t1 = newStartedThread(new CheckedRunnable() {
581 public void realRun() throws InterruptedException {
582 q.put(one);
583 }});
584
585 Thread t2 = newStartedThread(new CheckedRunnable() {
586 public void realRun() throws InterruptedException {
587 q.put(two);
588 }});
589
590 ArrayList l = new ArrayList();
591 delay(SHORT_DELAY_MS);
592 q.drainTo(l, 1);
593 assertEquals(1, l.size());
594 q.drainTo(l, 1);
595 assertEquals(2, l.size());
596 assertTrue(l.contains(one));
597 assertTrue(l.contains(two));
598 awaitTermination(t1);
599 awaitTermination(t2);
600 }
601
602 }