ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/SynchronousQueueTest.java
Revision: 1.39
Committed: Fri Jul 15 18:49:31 2011 UTC (12 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.38: +0 -11 lines
Log Message:
Robust weak consistency for ArrayBlockingQueue iterators

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import junit.framework.*;
10 import java.util.Arrays;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Queue;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.SynchronousQueue;
21 import static java.util.concurrent.TimeUnit.MILLISECONDS;
22
23 public class SynchronousQueueTest extends JSR166TestCase {
24
25 public static class Fair extends BlockingQueueTest {
26 protected BlockingQueue emptyCollection() {
27 return new SynchronousQueue(true);
28 }
29 }
30
31 public static class NonFair extends BlockingQueueTest {
32 protected BlockingQueue emptyCollection() {
33 return new SynchronousQueue(false);
34 }
35 }
36
37 public static void main(String[] args) {
38 junit.textui.TestRunner.run(suite());
39 }
40
41 public static Test suite() {
42 return newTestSuite(SynchronousQueueTest.class,
43 new Fair().testSuite(),
44 new NonFair().testSuite());
45 }
46
47 /**
48 * Any SynchronousQueue is both empty and full
49 */
50 public void testEmptyFull() { testEmptyFull(false); }
51 public void testEmptyFull_fair() { testEmptyFull(true); }
52 public void testEmptyFull(boolean fair) {
53 final SynchronousQueue q = new SynchronousQueue(fair);
54 assertTrue(q.isEmpty());
55 assertEquals(0, q.size());
56 assertEquals(0, q.remainingCapacity());
57 assertFalse(q.offer(zero));
58 }
59
60 /**
61 * offer fails if no active taker
62 */
63 public void testOffer() { testOffer(false); }
64 public void testOffer_fair() { testOffer(true); }
65 public void testOffer(boolean fair) {
66 SynchronousQueue q = new SynchronousQueue(fair);
67 assertFalse(q.offer(one));
68 }
69
70 /**
71 * add throws IllegalStateException if no active taker
72 */
73 public void testAdd() { testAdd(false); }
74 public void testAdd_fair() { testAdd(true); }
75 public void testAdd(boolean fair) {
76 SynchronousQueue q = new SynchronousQueue(fair);
77 assertEquals(0, q.remainingCapacity());
78 try {
79 q.add(one);
80 shouldThrow();
81 } catch (IllegalStateException success) {}
82 }
83
84 /**
85 * addAll(this) throws IllegalArgumentException
86 */
87 public void testAddAll_self() { testAddAll_self(false); }
88 public void testAddAll_self_fair() { testAddAll_self(true); }
89 public void testAddAll_self(boolean fair) {
90 SynchronousQueue q = new SynchronousQueue(fair);
91 try {
92 q.addAll(q);
93 shouldThrow();
94 } catch (IllegalArgumentException success) {}
95 }
96
97 /**
98 * addAll throws ISE if no active taker
99 */
100 public void testAddAll_ISE() { testAddAll_ISE(false); }
101 public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
102 public void testAddAll_ISE(boolean fair) {
103 SynchronousQueue q = new SynchronousQueue(fair);
104 Integer[] ints = new Integer[1];
105 for (int i = 0; i < ints.length; i++)
106 ints[i] = i;
107 Collection<Integer> coll = Arrays.asList(ints);
108 try {
109 q.addAll(coll);
110 shouldThrow();
111 } catch (IllegalStateException success) {}
112 }
113
114 /**
115 * put blocks interruptibly if no active taker
116 */
117 public void testBlockingPut() { testBlockingPut(false); }
118 public void testBlockingPut_fair() { testBlockingPut(true); }
119 public void testBlockingPut(boolean fair) {
120 final SynchronousQueue q = new SynchronousQueue(fair);
121 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
122 Thread t = newStartedThread(new CheckedRunnable() {
123 public void realRun() throws InterruptedException {
124 Thread.currentThread().interrupt();
125 try {
126 q.put(99);
127 shouldThrow();
128 } catch (InterruptedException success) {}
129 assertFalse(Thread.interrupted());
130
131 pleaseInterrupt.countDown();
132 try {
133 q.put(99);
134 shouldThrow();
135 } catch (InterruptedException success) {}
136 assertFalse(Thread.interrupted());
137 }});
138
139 await(pleaseInterrupt);
140 assertThreadStaysAlive(t);
141 t.interrupt();
142 awaitTermination(t);
143 assertEquals(0, q.remainingCapacity());
144 }
145
146 /**
147 * put blocks interruptibly waiting for take
148 */
149 public void testPutWithTake() { testPutWithTake(false); }
150 public void testPutWithTake_fair() { testPutWithTake(true); }
151 public void testPutWithTake(boolean fair) {
152 final SynchronousQueue q = new SynchronousQueue(fair);
153 final CountDownLatch pleaseTake = new CountDownLatch(1);
154 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
155 Thread t = newStartedThread(new CheckedRunnable() {
156 public void realRun() throws InterruptedException {
157 pleaseTake.countDown();
158 q.put(one);
159
160 pleaseInterrupt.countDown();
161 try {
162 q.put(99);
163 shouldThrow();
164 } catch (InterruptedException success) {}
165 assertFalse(Thread.interrupted());
166 }});
167
168 await(pleaseTake);
169 assertEquals(q.remainingCapacity(), 0);
170 try { assertSame(one, q.take()); }
171 catch (InterruptedException e) { threadUnexpectedException(e); }
172
173 await(pleaseInterrupt);
174 assertThreadStaysAlive(t);
175 t.interrupt();
176 awaitTermination(t);
177 assertEquals(q.remainingCapacity(), 0);
178 }
179
180 /**
181 * timed offer times out if elements not taken
182 */
183 public void testTimedOffer() { testTimedOffer(false); }
184 public void testTimedOffer_fair() { testTimedOffer(true); }
185 public void testTimedOffer(boolean fair) {
186 final SynchronousQueue q = new SynchronousQueue(fair);
187 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
188 Thread t = newStartedThread(new CheckedRunnable() {
189 public void realRun() throws InterruptedException {
190 long startTime = System.nanoTime();
191 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
192 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
193 pleaseInterrupt.countDown();
194 try {
195 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
196 shouldThrow();
197 } catch (InterruptedException success) {}
198 }});
199
200 await(pleaseInterrupt);
201 assertThreadStaysAlive(t);
202 t.interrupt();
203 awaitTermination(t);
204 }
205
206 /**
207 * poll return null if no active putter
208 */
209 public void testPoll() { testPoll(false); }
210 public void testPoll_fair() { testPoll(true); }
211 public void testPoll(boolean fair) {
212 final SynchronousQueue q = new SynchronousQueue(fair);
213 assertNull(q.poll());
214 }
215
216 /**
217 * timed poll with zero timeout times out if no active putter
218 */
219 public void testTimedPoll0() { testTimedPoll0(false); }
220 public void testTimedPoll0_fair() { testTimedPoll0(true); }
221 public void testTimedPoll0(boolean fair) {
222 final SynchronousQueue q = new SynchronousQueue(fair);
223 try { assertNull(q.poll(0, MILLISECONDS)); }
224 catch (InterruptedException e) { threadUnexpectedException(e); }
225 }
226
227 /**
228 * timed poll with nonzero timeout times out if no active putter
229 */
230 public void testTimedPoll() { testTimedPoll(false); }
231 public void testTimedPoll_fair() { testTimedPoll(true); }
232 public void testTimedPoll(boolean fair) {
233 final SynchronousQueue q = new SynchronousQueue(fair);
234 long startTime = System.nanoTime();
235 try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
236 catch (InterruptedException e) { threadUnexpectedException(e); }
237 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
238 }
239
240 /**
241 * timed poll before a delayed offer times out, returning null;
242 * after offer succeeds; on interruption throws
243 */
244 public void testTimedPollWithOffer() { testTimedPollWithOffer(false); }
245 public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
246 public void testTimedPollWithOffer(boolean fair) {
247 final SynchronousQueue q = new SynchronousQueue(fair);
248 final CountDownLatch pleaseOffer = new CountDownLatch(1);
249 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
250 Thread t = newStartedThread(new CheckedRunnable() {
251 public void realRun() throws InterruptedException {
252 long startTime = System.nanoTime();
253 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
254 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
255
256 pleaseOffer.countDown();
257 startTime = System.nanoTime();
258 assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
259 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
260
261 Thread.currentThread().interrupt();
262 try {
263 q.poll(LONG_DELAY_MS, MILLISECONDS);
264 shouldThrow();
265 } catch (InterruptedException success) {}
266 assertFalse(Thread.interrupted());
267
268 pleaseInterrupt.countDown();
269 try {
270 q.poll(LONG_DELAY_MS, MILLISECONDS);
271 shouldThrow();
272 } catch (InterruptedException success) {}
273 assertFalse(Thread.interrupted());
274 }});
275
276 await(pleaseOffer);
277 long startTime = System.nanoTime();
278 try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
279 catch (InterruptedException e) { threadUnexpectedException(e); }
280 assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
281
282 await(pleaseInterrupt);
283 assertThreadStaysAlive(t);
284 t.interrupt();
285 awaitTermination(t);
286 }
287
288 /**
289 * peek() returns null if no active putter
290 */
291 public void testPeek() { testPeek(false); }
292 public void testPeek_fair() { testPeek(true); }
293 public void testPeek(boolean fair) {
294 final SynchronousQueue q = new SynchronousQueue(fair);
295 assertNull(q.peek());
296 }
297
298 /**
299 * element() throws NoSuchElementException if no active putter
300 */
301 public void testElement() { testElement(false); }
302 public void testElement_fair() { testElement(true); }
303 public void testElement(boolean fair) {
304 final SynchronousQueue q = new SynchronousQueue(fair);
305 try {
306 q.element();
307 shouldThrow();
308 } catch (NoSuchElementException success) {}
309 }
310
311 /**
312 * remove() throws NoSuchElementException if no active putter
313 */
314 public void testRemove() { testRemove(false); }
315 public void testRemove_fair() { testRemove(true); }
316 public void testRemove(boolean fair) {
317 final SynchronousQueue q = new SynchronousQueue(fair);
318 try {
319 q.remove();
320 shouldThrow();
321 } catch (NoSuchElementException success) {}
322 }
323
324 /**
325 * contains returns false
326 */
327 public void testContains() { testContains(false); }
328 public void testContains_fair() { testContains(true); }
329 public void testContains(boolean fair) {
330 final SynchronousQueue q = new SynchronousQueue(fair);
331 assertFalse(q.contains(zero));
332 }
333
334 /**
335 * clear ensures isEmpty
336 */
337 public void testClear() { testClear(false); }
338 public void testClear_fair() { testClear(true); }
339 public void testClear(boolean fair) {
340 final SynchronousQueue q = new SynchronousQueue(fair);
341 q.clear();
342 assertTrue(q.isEmpty());
343 }
344
345 /**
346 * containsAll returns false unless empty
347 */
348 public void testContainsAll() { testContainsAll(false); }
349 public void testContainsAll_fair() { testContainsAll(true); }
350 public void testContainsAll(boolean fair) {
351 final SynchronousQueue q = new SynchronousQueue(fair);
352 Integer[] empty = new Integer[0];
353 assertTrue(q.containsAll(Arrays.asList(empty)));
354 Integer[] ints = new Integer[1]; ints[0] = zero;
355 assertFalse(q.containsAll(Arrays.asList(ints)));
356 }
357
358 /**
359 * retainAll returns false
360 */
361 public void testRetainAll() { testRetainAll(false); }
362 public void testRetainAll_fair() { testRetainAll(true); }
363 public void testRetainAll(boolean fair) {
364 final SynchronousQueue q = new SynchronousQueue(fair);
365 Integer[] empty = new Integer[0];
366 assertFalse(q.retainAll(Arrays.asList(empty)));
367 Integer[] ints = new Integer[1]; ints[0] = zero;
368 assertFalse(q.retainAll(Arrays.asList(ints)));
369 }
370
371 /**
372 * removeAll returns false
373 */
374 public void testRemoveAll() { testRemoveAll(false); }
375 public void testRemoveAll_fair() { testRemoveAll(true); }
376 public void testRemoveAll(boolean fair) {
377 final SynchronousQueue q = new SynchronousQueue(fair);
378 Integer[] empty = new Integer[0];
379 assertFalse(q.removeAll(Arrays.asList(empty)));
380 Integer[] ints = new Integer[1]; ints[0] = zero;
381 assertFalse(q.containsAll(Arrays.asList(ints)));
382 }
383
384 /**
385 * toArray is empty
386 */
387 public void testToArray() { testToArray(false); }
388 public void testToArray_fair() { testToArray(true); }
389 public void testToArray(boolean fair) {
390 final SynchronousQueue q = new SynchronousQueue(fair);
391 Object[] o = q.toArray();
392 assertEquals(o.length, 0);
393 }
394
395 /**
396 * toArray(a) is nulled at position 0
397 */
398 public void testToArray2() { testToArray2(false); }
399 public void testToArray2_fair() { testToArray2(true); }
400 public void testToArray2(boolean fair) {
401 final SynchronousQueue q = new SynchronousQueue(fair);
402 Integer[] ints = new Integer[1];
403 assertNull(ints[0]);
404 }
405
406 /**
407 * toArray(null) throws NPE
408 */
409 public void testToArray_null() { testToArray_null(false); }
410 public void testToArray_null_fair() { testToArray_null(true); }
411 public void testToArray_null(boolean fair) {
412 final SynchronousQueue q = new SynchronousQueue(fair);
413 try {
414 Object o[] = q.toArray(null);
415 shouldThrow();
416 } catch (NullPointerException success) {}
417 }
418
419 /**
420 * iterator does not traverse any elements
421 */
422 public void testIterator() { testIterator(false); }
423 public void testIterator_fair() { testIterator(true); }
424 public void testIterator(boolean fair) {
425 final SynchronousQueue q = new SynchronousQueue(fair);
426 Iterator it = q.iterator();
427 assertFalse(it.hasNext());
428 try {
429 Object x = it.next();
430 shouldThrow();
431 } catch (NoSuchElementException success) {}
432 }
433
434 /**
435 * iterator remove throws ISE
436 */
437 public void testIteratorRemove() { testIteratorRemove(false); }
438 public void testIteratorRemove_fair() { testIteratorRemove(true); }
439 public void testIteratorRemove(boolean fair) {
440 final SynchronousQueue q = new SynchronousQueue(fair);
441 Iterator it = q.iterator();
442 try {
443 it.remove();
444 shouldThrow();
445 } catch (IllegalStateException success) {}
446 }
447
448 /**
449 * toString returns a non-null string
450 */
451 public void testToString() { testToString(false); }
452 public void testToString_fair() { testToString(true); }
453 public void testToString(boolean fair) {
454 final SynchronousQueue q = new SynchronousQueue(fair);
455 String s = q.toString();
456 assertNotNull(s);
457 }
458
459 /**
460 * offer transfers elements across Executor tasks
461 */
462 public void testOfferInExecutor() { testOfferInExecutor(false); }
463 public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
464 public void testOfferInExecutor(boolean fair) {
465 final SynchronousQueue q = new SynchronousQueue(fair);
466 ExecutorService executor = Executors.newFixedThreadPool(2);
467 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
468
469 executor.execute(new CheckedRunnable() {
470 public void realRun() throws InterruptedException {
471 assertFalse(q.offer(one));
472 threadsStarted.await();
473 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
474 assertEquals(0, q.remainingCapacity());
475 }});
476
477 executor.execute(new CheckedRunnable() {
478 public void realRun() throws InterruptedException {
479 threadsStarted.await();
480 assertSame(one, q.take());
481 }});
482
483 joinPool(executor);
484 }
485
486 /**
487 * timed poll retrieves elements across Executor threads
488 */
489 public void testPollInExecutor() { testPollInExecutor(false); }
490 public void testPollInExecutor_fair() { testPollInExecutor(true); }
491 public void testPollInExecutor(boolean fair) {
492 final SynchronousQueue q = new SynchronousQueue(fair);
493 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
494 ExecutorService executor = Executors.newFixedThreadPool(2);
495 executor.execute(new CheckedRunnable() {
496 public void realRun() throws InterruptedException {
497 assertNull(q.poll());
498 threadsStarted.await();
499 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
500 assertTrue(q.isEmpty());
501 }});
502
503 executor.execute(new CheckedRunnable() {
504 public void realRun() throws InterruptedException {
505 threadsStarted.await();
506 q.put(one);
507 }});
508
509 joinPool(executor);
510 }
511
512 /**
513 * a deserialized serialized queue is usable
514 */
515 public void testSerialization() { testSerialization(false); }
516 public void testSerialization_fair() { testSerialization(true); }
517 public void testSerialization(boolean fair) {
518 final SynchronousQueue x = new SynchronousQueue(fair);
519 final SynchronousQueue y = serialClone(x);
520 assertTrue(x != y);
521 assertTrue(x.isEmpty());
522 assertTrue(y.isEmpty());
523 }
524
525 /**
526 * drainTo(c) of empty queue doesn't transfer elements
527 */
528 public void testDrainTo() { testDrainTo(false); }
529 public void testDrainTo_fair() { testDrainTo(true); }
530 public void testDrainTo(boolean fair) {
531 final SynchronousQueue q = new SynchronousQueue(fair);
532 ArrayList l = new ArrayList();
533 q.drainTo(l);
534 assertEquals(q.size(), 0);
535 assertEquals(l.size(), 0);
536 }
537
538 /**
539 * drainTo empties queue, unblocking a waiting put.
540 */
541 public void testDrainToWithActivePut() { testDrainToWithActivePut(false); }
542 public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
543 public void testDrainToWithActivePut(boolean fair) {
544 final SynchronousQueue q = new SynchronousQueue(fair);
545 Thread t = newStartedThread(new CheckedRunnable() {
546 public void realRun() throws InterruptedException {
547 q.put(one);
548 }});
549
550 ArrayList l = new ArrayList();
551 long startTime = System.nanoTime();
552 while (l.isEmpty()) {
553 q.drainTo(l);
554 if (millisElapsedSince(startTime) > LONG_DELAY_MS)
555 fail("timed out");
556 Thread.yield();
557 }
558 assertTrue(l.size() == 1);
559 assertSame(one, l.get(0));
560 awaitTermination(t);
561 }
562
563 /**
564 * drainTo(c, n) empties up to n elements of queue into c
565 */
566 public void testDrainToN() throws InterruptedException {
567 final SynchronousQueue q = new SynchronousQueue();
568 Thread t1 = newStartedThread(new CheckedRunnable() {
569 public void realRun() throws InterruptedException {
570 q.put(one);
571 }});
572
573 Thread t2 = newStartedThread(new CheckedRunnable() {
574 public void realRun() throws InterruptedException {
575 q.put(two);
576 }});
577
578 ArrayList l = new ArrayList();
579 delay(SHORT_DELAY_MS);
580 q.drainTo(l, 1);
581 assertEquals(1, l.size());
582 q.drainTo(l, 1);
583 assertEquals(2, l.size());
584 assertTrue(l.contains(one));
585 assertTrue(l.contains(two));
586 awaitTermination(t1);
587 awaitTermination(t2);
588 }
589
590 }