ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.48
Committed: Tue May 31 16:16:24 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.47: +13 -27 lines
Log Message:
use serialClone in serialization tests; update imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import junit.framework.*;
9 import java.util.Arrays;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.List;
14 import java.util.NoSuchElementException;
15 import java.util.Queue;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.LinkedTransferQueue;
21 import static java.util.concurrent.TimeUnit.MILLISECONDS;
22 import static java.util.concurrent.TimeUnit.NANOSECONDS;
23
24 @SuppressWarnings({"unchecked", "rawtypes"})
25 public class LinkedTransferQueueTest extends JSR166TestCase {
26
27 public static class Generic extends BlockingQueueTest {
28 protected BlockingQueue emptyCollection() {
29 return new LinkedTransferQueue();
30 }
31 }
32
33 public static void main(String[] args) {
34 junit.textui.TestRunner.run(suite());
35 }
36
37 public static Test suite() {
38 return newTestSuite(LinkedTransferQueueTest.class,
39 new Generic().testSuite());
40 }
41
42 /**
43 * Constructor builds new queue with size being zero and empty
44 * being true
45 */
46 public void testConstructor1() {
47 assertEquals(0, new LinkedTransferQueue().size());
48 assertTrue(new LinkedTransferQueue().isEmpty());
49 }
50
51 /**
52 * Initializing constructor with null collection throws
53 * NullPointerException
54 */
55 public void testConstructor2() {
56 try {
57 new LinkedTransferQueue(null);
58 shouldThrow();
59 } catch (NullPointerException success) {}
60 }
61
62 /**
63 * Initializing from Collection of null elements throws
64 * NullPointerException
65 */
66 public void testConstructor3() {
67 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
68 try {
69 new LinkedTransferQueue(elements);
70 shouldThrow();
71 } catch (NullPointerException success) {}
72 }
73
74 /**
75 * Initializing constructor with a collection containing some null elements
76 * throws NullPointerException
77 */
78 public void testConstructor4() {
79 Integer[] ints = new Integer[SIZE];
80 for (int i = 0; i < SIZE-1; ++i)
81 ints[i] = i;
82 Collection<Integer> elements = Arrays.asList(ints);
83 try {
84 new LinkedTransferQueue(elements);
85 shouldThrow();
86 } catch (NullPointerException success) {}
87 }
88
89 /**
90 * Queue contains all elements of the collection it is initialized by
91 */
92 public void testConstructor5() {
93 Integer[] ints = new Integer[SIZE];
94 for (int i = 0; i < SIZE; ++i) {
95 ints[i] = i;
96 }
97 List intList = Arrays.asList(ints);
98 LinkedTransferQueue q
99 = new LinkedTransferQueue(intList);
100 assertEquals(q.size(), intList.size());
101 assertEquals(q.toString(), intList.toString());
102 assertTrue(Arrays.equals(q.toArray(),
103 intList.toArray()));
104 assertTrue(Arrays.equals(q.toArray(new Object[0]),
105 intList.toArray(new Object[0])));
106 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
107 intList.toArray(new Object[SIZE])));
108 for (int i = 0; i < SIZE; ++i) {
109 assertEquals(ints[i], q.poll());
110 }
111 }
112
113 /**
114 * remainingCapacity() always returns Integer.MAX_VALUE
115 */
116 public void testRemainingCapacity() {
117 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
118 for (int i = 0; i < SIZE; ++i) {
119 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
120 assertEquals(SIZE - i, q.size());
121 q.remove();
122 }
123 for (int i = 0; i < SIZE; ++i) {
124 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
125 assertEquals(i, q.size());
126 q.add(i);
127 }
128 }
129
130 /**
131 * addAll(this) throws IllegalArgumentException
132 */
133 public void testAddAllSelf() {
134 try {
135 LinkedTransferQueue q = populatedQueue(SIZE);
136 q.addAll(q);
137 shouldThrow();
138 } catch (IllegalArgumentException success) {}
139 }
140
141 /**
142 * addAll of a collection with any null elements throws
143 * NullPointerException after possibly adding some elements
144 */
145 public void testAddAll3() {
146 try {
147 LinkedTransferQueue q = new LinkedTransferQueue();
148 Integer[] ints = new Integer[SIZE];
149 for (int i = 0; i < SIZE - 1; ++i) {
150 ints[i] = i;
151 }
152 q.addAll(Arrays.asList(ints));
153 shouldThrow();
154 } catch (NullPointerException success) {}
155 }
156
157 /**
158 * Queue contains all elements, in traversal order, of successful addAll
159 */
160 public void testAddAll5() {
161 Integer[] empty = new Integer[0];
162 Integer[] ints = new Integer[SIZE];
163 for (int i = 0; i < SIZE; ++i) {
164 ints[i] = i;
165 }
166 LinkedTransferQueue q = new LinkedTransferQueue();
167 assertFalse(q.addAll(Arrays.asList(empty)));
168 assertTrue(q.addAll(Arrays.asList(ints)));
169 for (int i = 0; i < SIZE; ++i) {
170 assertEquals(ints[i], q.poll());
171 }
172 }
173
174 /**
175 * all elements successfully put are contained
176 */
177 public void testPut() {
178 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
179 for (int i = 0; i < SIZE; ++i) {
180 assertEquals(q.size(), i);
181 q.put(i);
182 assertTrue(q.contains(i));
183 }
184 }
185
186 /**
187 * take retrieves elements in FIFO order
188 */
189 public void testTake() throws InterruptedException {
190 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
191 for (int i = 0; i < SIZE; ++i) {
192 assertEquals(i, (int) q.take());
193 }
194 }
195
196 /**
197 * take removes existing elements until empty, then blocks interruptibly
198 */
199 public void testBlockingTake() throws InterruptedException {
200 final BlockingQueue q = populatedQueue(SIZE);
201 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
202 Thread t = newStartedThread(new CheckedRunnable() {
203 public void realRun() throws InterruptedException {
204 for (int i = 0; i < SIZE; ++i) {
205 assertEquals(i, q.take());
206 }
207
208 Thread.currentThread().interrupt();
209 try {
210 q.take();
211 shouldThrow();
212 } catch (InterruptedException success) {}
213 assertFalse(Thread.interrupted());
214
215 pleaseInterrupt.countDown();
216 try {
217 q.take();
218 shouldThrow();
219 } catch (InterruptedException success) {}
220 assertFalse(Thread.interrupted());
221 }});
222
223 await(pleaseInterrupt);
224 assertThreadStaysAlive(t);
225 t.interrupt();
226 awaitTermination(t);
227 }
228
229 /**
230 * poll succeeds unless empty
231 */
232 public void testPoll() throws InterruptedException {
233 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
234 for (int i = 0; i < SIZE; ++i) {
235 assertEquals(i, (int) q.poll());
236 }
237 assertNull(q.poll());
238 checkEmpty(q);
239 }
240
241 /**
242 * timed poll with zero timeout succeeds when non-empty, else times out
243 */
244 public void testTimedPoll0() throws InterruptedException {
245 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
246 for (int i = 0; i < SIZE; ++i) {
247 assertEquals(i, (int) q.poll(0, MILLISECONDS));
248 }
249 assertNull(q.poll(0, MILLISECONDS));
250 checkEmpty(q);
251 }
252
253 /**
254 * timed poll with nonzero timeout succeeds when non-empty, else times out
255 */
256 public void testTimedPoll() throws InterruptedException {
257 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
258 for (int i = 0; i < SIZE; ++i) {
259 long startTime = System.nanoTime();
260 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
261 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
262 }
263 long startTime = System.nanoTime();
264 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
265 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
266 checkEmpty(q);
267 }
268
269 /**
270 * Interrupted timed poll throws InterruptedException instead of
271 * returning timeout status
272 */
273 public void testInterruptedTimedPoll() throws InterruptedException {
274 final BlockingQueue<Integer> q = populatedQueue(SIZE);
275 final CountDownLatch aboutToWait = new CountDownLatch(1);
276 Thread t = newStartedThread(new CheckedRunnable() {
277 public void realRun() throws InterruptedException {
278 for (int i = 0; i < SIZE; ++i) {
279 long t0 = System.nanoTime();
280 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
281 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
282 }
283 long t0 = System.nanoTime();
284 aboutToWait.countDown();
285 try {
286 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
287 shouldThrow();
288 } catch (InterruptedException success) {
289 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
290 }
291 }});
292
293 aboutToWait.await();
294 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
295 t.interrupt();
296 awaitTermination(t, MEDIUM_DELAY_MS);
297 checkEmpty(q);
298 }
299
300 /**
301 * timed poll after thread interrupted throws InterruptedException
302 * instead of returning timeout status
303 */
304 public void testTimedPollAfterInterrupt() throws InterruptedException {
305 final BlockingQueue<Integer> q = populatedQueue(SIZE);
306 Thread t = newStartedThread(new CheckedRunnable() {
307 public void realRun() throws InterruptedException {
308 Thread.currentThread().interrupt();
309 for (int i = 0; i < SIZE; ++i) {
310 long t0 = System.nanoTime();
311 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
312 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
313 }
314 try {
315 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
316 shouldThrow();
317 } catch (InterruptedException success) {}
318 }});
319
320 awaitTermination(t, MEDIUM_DELAY_MS);
321 checkEmpty(q);
322 }
323
324 /**
325 * peek returns next element, or null if empty
326 */
327 public void testPeek() throws InterruptedException {
328 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
329 for (int i = 0; i < SIZE; ++i) {
330 assertEquals(i, (int) q.peek());
331 assertEquals(i, (int) q.poll());
332 assertTrue(q.peek() == null ||
333 i != (int) q.peek());
334 }
335 assertNull(q.peek());
336 checkEmpty(q);
337 }
338
339 /**
340 * element returns next element, or throws NoSuchElementException if empty
341 */
342 public void testElement() throws InterruptedException {
343 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
344 for (int i = 0; i < SIZE; ++i) {
345 assertEquals(i, (int) q.element());
346 assertEquals(i, (int) q.poll());
347 }
348 try {
349 q.element();
350 shouldThrow();
351 } catch (NoSuchElementException success) {}
352 checkEmpty(q);
353 }
354
355 /**
356 * remove removes next element, or throws NoSuchElementException if empty
357 */
358 public void testRemove() throws InterruptedException {
359 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
360 for (int i = 0; i < SIZE; ++i) {
361 assertEquals(i, (int) q.remove());
362 }
363 try {
364 q.remove();
365 shouldThrow();
366 } catch (NoSuchElementException success) {}
367 checkEmpty(q);
368 }
369
370 /**
371 * remove(x) removes x and returns true if present
372 */
373 public void testRemoveElement() throws InterruptedException {
374 LinkedTransferQueue q = populatedQueue(SIZE);
375 for (int i = 1; i < SIZE; i+=2) {
376 assertTrue(q.contains(i));
377 assertTrue(q.remove(i));
378 assertFalse(q.contains(i));
379 assertTrue(q.contains(i-1));
380 }
381 for (int i = 0; i < SIZE; i+=2) {
382 assertTrue(q.contains(i));
383 assertTrue(q.remove(i));
384 assertFalse(q.contains(i));
385 assertFalse(q.remove(i+1));
386 assertFalse(q.contains(i+1));
387 }
388 checkEmpty(q);
389 }
390
391 /**
392 * An add following remove(x) succeeds
393 */
394 public void testRemoveElementAndAdd() throws InterruptedException {
395 LinkedTransferQueue q = new LinkedTransferQueue();
396 assertTrue(q.add(one));
397 assertTrue(q.add(two));
398 assertTrue(q.remove(one));
399 assertTrue(q.remove(two));
400 assertTrue(q.add(three));
401 assertSame(q.take(), three);
402 }
403
404 /**
405 * contains(x) reports true when elements added but not yet removed
406 */
407 public void testContains() {
408 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
409 for (int i = 0; i < SIZE; ++i) {
410 assertTrue(q.contains(i));
411 assertEquals(i, (int) q.poll());
412 assertFalse(q.contains(i));
413 }
414 }
415
416 /**
417 * clear removes all elements
418 */
419 public void testClear() throws InterruptedException {
420 LinkedTransferQueue q = populatedQueue(SIZE);
421 q.clear();
422 checkEmpty(q);
423 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
424 q.add(one);
425 assertFalse(q.isEmpty());
426 assertEquals(1, q.size());
427 assertTrue(q.contains(one));
428 q.clear();
429 checkEmpty(q);
430 }
431
432 /**
433 * containsAll(c) is true when c contains a subset of elements
434 */
435 public void testContainsAll() {
436 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
437 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
438 for (int i = 0; i < SIZE; ++i) {
439 assertTrue(q.containsAll(p));
440 assertFalse(p.containsAll(q));
441 p.add(i);
442 }
443 assertTrue(p.containsAll(q));
444 }
445
446 /**
447 * retainAll(c) retains only those elements of c and reports true
448 * if changed
449 */
450 public void testRetainAll() {
451 LinkedTransferQueue q = populatedQueue(SIZE);
452 LinkedTransferQueue p = populatedQueue(SIZE);
453 for (int i = 0; i < SIZE; ++i) {
454 boolean changed = q.retainAll(p);
455 if (i == 0) {
456 assertFalse(changed);
457 } else {
458 assertTrue(changed);
459 }
460 assertTrue(q.containsAll(p));
461 assertEquals(SIZE - i, q.size());
462 p.remove();
463 }
464 }
465
466 /**
467 * removeAll(c) removes only those elements of c and reports true
468 * if changed
469 */
470 public void testRemoveAll() {
471 for (int i = 1; i < SIZE; ++i) {
472 LinkedTransferQueue q = populatedQueue(SIZE);
473 LinkedTransferQueue p = populatedQueue(i);
474 assertTrue(q.removeAll(p));
475 assertEquals(SIZE - i, q.size());
476 for (int j = 0; j < i; ++j) {
477 assertFalse(q.contains(p.remove()));
478 }
479 }
480 }
481
482 /**
483 * toArray() contains all elements in FIFO order
484 */
485 public void testToArray() {
486 LinkedTransferQueue q = populatedQueue(SIZE);
487 Object[] o = q.toArray();
488 for (int i = 0; i < o.length; i++) {
489 assertSame(o[i], q.poll());
490 }
491 }
492
493 /**
494 * toArray(a) contains all elements in FIFO order
495 */
496 public void testToArray2() {
497 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
498 Integer[] ints = new Integer[SIZE];
499 Integer[] array = q.toArray(ints);
500 assertSame(ints, array);
501 for (int i = 0; i < ints.length; i++) {
502 assertSame(ints[i], q.poll());
503 }
504 }
505
506 /**
507 * toArray(incompatible array type) throws ArrayStoreException
508 */
509 public void testToArray1_BadArg() {
510 LinkedTransferQueue q = populatedQueue(SIZE);
511 try {
512 q.toArray(new String[10]);
513 shouldThrow();
514 } catch (ArrayStoreException success) {}
515 }
516
517 /**
518 * iterator iterates through all elements
519 */
520 public void testIterator() throws InterruptedException {
521 LinkedTransferQueue q = populatedQueue(SIZE);
522 Iterator it = q.iterator();
523 int i = 0;
524 while (it.hasNext()) {
525 assertEquals(it.next(), i++);
526 }
527 assertEquals(i, SIZE);
528 }
529
530 /**
531 * iterator.remove() removes current element
532 */
533 public void testIteratorRemove() {
534 final LinkedTransferQueue q = new LinkedTransferQueue();
535 q.add(two);
536 q.add(one);
537 q.add(three);
538
539 Iterator it = q.iterator();
540 it.next();
541 it.remove();
542
543 it = q.iterator();
544 assertSame(it.next(), one);
545 assertSame(it.next(), three);
546 assertFalse(it.hasNext());
547 }
548
549 /**
550 * iterator ordering is FIFO
551 */
552 public void testIteratorOrdering() {
553 final LinkedTransferQueue<Integer> q
554 = new LinkedTransferQueue<Integer>();
555 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
556 q.add(one);
557 q.add(two);
558 q.add(three);
559 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
560 int k = 0;
561 for (Integer n : q) {
562 assertEquals(++k, (int) n);
563 }
564 assertEquals(3, k);
565 }
566
567 /**
568 * Modifications do not cause iterators to fail
569 */
570 public void testWeaklyConsistentIteration() {
571 final LinkedTransferQueue q = new LinkedTransferQueue();
572 q.add(one);
573 q.add(two);
574 q.add(three);
575 for (Iterator it = q.iterator(); it.hasNext();) {
576 q.remove();
577 it.next();
578 }
579 assertEquals(0, q.size());
580 }
581
582 /**
583 * toString contains toStrings of elements
584 */
585 public void testToString() {
586 LinkedTransferQueue q = populatedQueue(SIZE);
587 String s = q.toString();
588 for (int i = 0; i < SIZE; ++i) {
589 assertTrue(s.contains(String.valueOf(i)));
590 }
591 }
592
593 /**
594 * offer transfers elements across Executor tasks
595 */
596 public void testOfferInExecutor() {
597 final LinkedTransferQueue q = new LinkedTransferQueue();
598 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
599 ExecutorService executor = Executors.newFixedThreadPool(2);
600
601 executor.execute(new CheckedRunnable() {
602 public void realRun() throws InterruptedException {
603 threadsStarted.await();
604 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
605 }});
606
607 executor.execute(new CheckedRunnable() {
608 public void realRun() throws InterruptedException {
609 threadsStarted.await();
610 assertSame(one, q.take());
611 checkEmpty(q);
612 }});
613
614 joinPool(executor);
615 }
616
617 /**
618 * timed poll retrieves elements across Executor threads
619 */
620 public void testPollInExecutor() {
621 final LinkedTransferQueue q = new LinkedTransferQueue();
622 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
623 ExecutorService executor = Executors.newFixedThreadPool(2);
624
625 executor.execute(new CheckedRunnable() {
626 public void realRun() throws InterruptedException {
627 assertNull(q.poll());
628 threadsStarted.await();
629 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
630 checkEmpty(q);
631 }});
632
633 executor.execute(new CheckedRunnable() {
634 public void realRun() throws InterruptedException {
635 threadsStarted.await();
636 q.put(one);
637 }});
638
639 joinPool(executor);
640 }
641
642 /**
643 * A deserialized serialized queue has same elements in same order
644 */
645 public void testSerialization() throws Exception {
646 Queue x = populatedQueue(SIZE);
647 Queue y = serialClone(x);
648
649 assertTrue(x != y);
650 assertEquals(x.size(), y.size());
651 assertEquals(x.toString(), y.toString());
652 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
653 while (!x.isEmpty()) {
654 assertFalse(y.isEmpty());
655 assertEquals(x.remove(), y.remove());
656 }
657 assertTrue(y.isEmpty());
658 }
659
660 /**
661 * drainTo(c) empties queue into another collection c
662 */
663 public void testDrainTo() {
664 LinkedTransferQueue q = populatedQueue(SIZE);
665 ArrayList l = new ArrayList();
666 q.drainTo(l);
667 assertEquals(q.size(), 0);
668 assertEquals(l.size(), SIZE);
669 for (int i = 0; i < SIZE; ++i) {
670 assertEquals(l.get(i), i);
671 }
672 q.add(zero);
673 q.add(one);
674 assertFalse(q.isEmpty());
675 assertTrue(q.contains(zero));
676 assertTrue(q.contains(one));
677 l.clear();
678 q.drainTo(l);
679 assertEquals(q.size(), 0);
680 assertEquals(l.size(), 2);
681 for (int i = 0; i < 2; ++i) {
682 assertEquals(l.get(i), i);
683 }
684 }
685
686 /**
687 * drainTo(c) empties full queue, unblocking a waiting put.
688 */
689 public void testDrainToWithActivePut() throws InterruptedException {
690 final LinkedTransferQueue q = populatedQueue(SIZE);
691 Thread t = newStartedThread(new CheckedRunnable() {
692 public void realRun() {
693 q.put(SIZE + 1);
694 }});
695 ArrayList l = new ArrayList();
696 q.drainTo(l);
697 assertTrue(l.size() >= SIZE);
698 for (int i = 0; i < SIZE; ++i) {
699 assertEquals(l.get(i), i);
700 }
701 awaitTermination(t, MEDIUM_DELAY_MS);
702 assertTrue(q.size() + l.size() >= SIZE);
703 }
704
705 /**
706 * drainTo(c, n) empties first min(n, size) elements of queue into c
707 */
708 public void testDrainToN() {
709 LinkedTransferQueue q = new LinkedTransferQueue();
710 for (int i = 0; i < SIZE + 2; ++i) {
711 for (int j = 0; j < SIZE; j++) {
712 assertTrue(q.offer(j));
713 }
714 ArrayList l = new ArrayList();
715 q.drainTo(l, i);
716 int k = (i < SIZE) ? i : SIZE;
717 assertEquals(l.size(), k);
718 assertEquals(q.size(), SIZE - k);
719 for (int j = 0; j < k; ++j) {
720 assertEquals(l.get(j), j);
721 }
722 while (q.poll() != null)
723 ;
724 }
725 }
726
727 /**
728 * timed poll() or take() increments the waiting consumer count;
729 * offer(e) decrements the waiting consumer count
730 */
731 public void testWaitingConsumer() throws InterruptedException {
732 final LinkedTransferQueue q = new LinkedTransferQueue();
733 assertEquals(q.getWaitingConsumerCount(), 0);
734 assertFalse(q.hasWaitingConsumer());
735 final CountDownLatch threadStarted = new CountDownLatch(1);
736
737 Thread t = newStartedThread(new CheckedRunnable() {
738 public void realRun() throws InterruptedException {
739 threadStarted.countDown();
740 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
741 assertEquals(q.getWaitingConsumerCount(), 0);
742 assertFalse(q.hasWaitingConsumer());
743 }});
744
745 threadStarted.await();
746 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
747 assertEquals(q.getWaitingConsumerCount(), 1);
748 assertTrue(q.hasWaitingConsumer());
749
750 assertTrue(q.offer(one));
751 assertEquals(q.getWaitingConsumerCount(), 0);
752 assertFalse(q.hasWaitingConsumer());
753
754 awaitTermination(t, MEDIUM_DELAY_MS);
755 }
756
757 /**
758 * transfer(null) throws NullPointerException
759 */
760 public void testTransfer1() throws InterruptedException {
761 try {
762 LinkedTransferQueue q = new LinkedTransferQueue();
763 q.transfer(null);
764 shouldThrow();
765 } catch (NullPointerException success) {}
766 }
767
768 /**
769 * transfer waits until a poll occurs. The transfered element
770 * is returned by this associated poll.
771 */
772 public void testTransfer2() throws InterruptedException {
773 final LinkedTransferQueue<Integer> q
774 = new LinkedTransferQueue<Integer>();
775 final CountDownLatch threadStarted = new CountDownLatch(1);
776
777 Thread t = newStartedThread(new CheckedRunnable() {
778 public void realRun() throws InterruptedException {
779 threadStarted.countDown();
780 q.transfer(five);
781 checkEmpty(q);
782 }});
783
784 threadStarted.await();
785 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
786 assertEquals(1, q.size());
787 assertSame(five, q.poll());
788 checkEmpty(q);
789 awaitTermination(t, MEDIUM_DELAY_MS);
790 }
791
792 /**
793 * transfer waits until a poll occurs, and then transfers in fifo order
794 */
795 public void testTransfer3() throws InterruptedException {
796 final LinkedTransferQueue<Integer> q
797 = new LinkedTransferQueue<Integer>();
798
799 Thread first = newStartedThread(new CheckedRunnable() {
800 public void realRun() throws InterruptedException {
801 q.transfer(four);
802 assertTrue(!q.contains(four));
803 assertEquals(1, q.size());
804 }});
805
806 Thread interruptedThread = newStartedThread(
807 new CheckedInterruptedRunnable() {
808 public void realRun() throws InterruptedException {
809 while (q.isEmpty())
810 Thread.yield();
811 q.transfer(five);
812 }});
813
814 while (q.size() < 2)
815 Thread.yield();
816 assertEquals(2, q.size());
817 assertSame(four, q.poll());
818 first.join();
819 assertEquals(1, q.size());
820 interruptedThread.interrupt();
821 interruptedThread.join();
822 checkEmpty(q);
823 }
824
825 /**
826 * transfer waits until a poll occurs, at which point the polling
827 * thread returns the element
828 */
829 public void testTransfer4() throws InterruptedException {
830 final LinkedTransferQueue q = new LinkedTransferQueue();
831
832 Thread t = newStartedThread(new CheckedRunnable() {
833 public void realRun() throws InterruptedException {
834 q.transfer(four);
835 assertFalse(q.contains(four));
836 assertSame(three, q.poll());
837 }});
838
839 while (q.isEmpty())
840 Thread.yield();
841 assertFalse(q.isEmpty());
842 assertEquals(1, q.size());
843 assertTrue(q.offer(three));
844 assertSame(four, q.poll());
845 awaitTermination(t, MEDIUM_DELAY_MS);
846 }
847
848 /**
849 * transfer waits until a take occurs. The transfered element
850 * is returned by this associated take.
851 */
852 public void testTransfer5() throws InterruptedException {
853 final LinkedTransferQueue<Integer> q
854 = new LinkedTransferQueue<Integer>();
855
856 Thread t = newStartedThread(new CheckedRunnable() {
857 public void realRun() throws InterruptedException {
858 q.transfer(four);
859 checkEmpty(q);
860 }});
861
862 while (q.isEmpty())
863 Thread.yield();
864 assertFalse(q.isEmpty());
865 assertEquals(1, q.size());
866 assertSame(four, q.take());
867 checkEmpty(q);
868 awaitTermination(t, MEDIUM_DELAY_MS);
869 }
870
871 /**
872 * tryTransfer(null) throws NullPointerException
873 */
874 public void testTryTransfer1() {
875 try {
876 final LinkedTransferQueue q = new LinkedTransferQueue();
877 q.tryTransfer(null);
878 shouldThrow();
879 } catch (NullPointerException success) {}
880 }
881
882 /**
883 * tryTransfer returns false and does not enqueue if there are no
884 * consumers waiting to poll or take.
885 */
886 public void testTryTransfer2() throws InterruptedException {
887 final LinkedTransferQueue q = new LinkedTransferQueue();
888 assertFalse(q.tryTransfer(new Object()));
889 assertFalse(q.hasWaitingConsumer());
890 checkEmpty(q);
891 }
892
893 /**
894 * If there is a consumer waiting in timed poll, tryTransfer
895 * returns true while successfully transfering object.
896 */
897 public void testTryTransfer3() throws InterruptedException {
898 final Object hotPotato = new Object();
899 final LinkedTransferQueue q = new LinkedTransferQueue();
900
901 Thread t = newStartedThread(new CheckedRunnable() {
902 public void realRun() {
903 while (! q.hasWaitingConsumer())
904 Thread.yield();
905 assertTrue(q.hasWaitingConsumer());
906 checkEmpty(q);
907 assertTrue(q.tryTransfer(hotPotato));
908 }});
909
910 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
911 checkEmpty(q);
912 awaitTermination(t, MEDIUM_DELAY_MS);
913 }
914
915 /**
916 * If there is a consumer waiting in take, tryTransfer returns
917 * true while successfully transfering object.
918 */
919 public void testTryTransfer4() throws InterruptedException {
920 final Object hotPotato = new Object();
921 final LinkedTransferQueue q = new LinkedTransferQueue();
922
923 Thread t = newStartedThread(new CheckedRunnable() {
924 public void realRun() {
925 while (! q.hasWaitingConsumer())
926 Thread.yield();
927 assertTrue(q.hasWaitingConsumer());
928 checkEmpty(q);
929 assertTrue(q.tryTransfer(hotPotato));
930 }});
931
932 assertSame(q.take(), hotPotato);
933 checkEmpty(q);
934 awaitTermination(t, MEDIUM_DELAY_MS);
935 }
936
937 /**
938 * tryTransfer blocks interruptibly if no takers
939 */
940 public void testTryTransfer5() throws InterruptedException {
941 final LinkedTransferQueue q = new LinkedTransferQueue();
942 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
943 assertTrue(q.isEmpty());
944
945 Thread t = newStartedThread(new CheckedRunnable() {
946 public void realRun() throws InterruptedException {
947 Thread.currentThread().interrupt();
948 try {
949 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
950 shouldThrow();
951 } catch (InterruptedException success) {}
952 assertFalse(Thread.interrupted());
953
954 pleaseInterrupt.countDown();
955 try {
956 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
957 shouldThrow();
958 } catch (InterruptedException success) {}
959 assertFalse(Thread.interrupted());
960 }});
961
962 await(pleaseInterrupt);
963 assertThreadStaysAlive(t);
964 t.interrupt();
965 awaitTermination(t);
966 checkEmpty(q);
967 }
968
969 /**
970 * tryTransfer gives up after the timeout and returns false
971 */
972 public void testTryTransfer6() throws InterruptedException {
973 final LinkedTransferQueue q = new LinkedTransferQueue();
974
975 Thread t = newStartedThread(new CheckedRunnable() {
976 public void realRun() throws InterruptedException {
977 long t0 = System.nanoTime();
978 assertFalse(q.tryTransfer(new Object(),
979 timeoutMillis(), MILLISECONDS));
980 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
981 checkEmpty(q);
982 }});
983
984 awaitTermination(t);
985 checkEmpty(q);
986 }
987
988 /**
989 * tryTransfer waits for any elements previously in to be removed
990 * before transfering to a poll or take
991 */
992 public void testTryTransfer7() throws InterruptedException {
993 final LinkedTransferQueue q = new LinkedTransferQueue();
994 assertTrue(q.offer(four));
995
996 Thread t = newStartedThread(new CheckedRunnable() {
997 public void realRun() throws InterruptedException {
998 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
999 checkEmpty(q);
1000 }});
1001
1002 while (q.size() != 2)
1003 Thread.yield();
1004 assertEquals(2, q.size());
1005 assertSame(four, q.poll());
1006 assertSame(five, q.poll());
1007 checkEmpty(q);
1008 awaitTermination(t, MEDIUM_DELAY_MS);
1009 }
1010
1011 /**
1012 * tryTransfer attempts to enqueue into the queue and fails
1013 * returning false not enqueueing and the successive poll is null
1014 */
1015 public void testTryTransfer8() throws InterruptedException {
1016 final LinkedTransferQueue q = new LinkedTransferQueue();
1017 assertTrue(q.offer(four));
1018 assertEquals(1, q.size());
1019 long t0 = System.nanoTime();
1020 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1021 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
1022 assertEquals(1, q.size());
1023 assertSame(four, q.poll());
1024 assertNull(q.poll());
1025 checkEmpty(q);
1026 }
1027
1028 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1029 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1030 checkEmpty(q);
1031 for (int i = 0; i < n; i++) {
1032 assertEquals(i, q.size());
1033 assertTrue(q.offer(i));
1034 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1035 }
1036 assertFalse(q.isEmpty());
1037 return q;
1038 }
1039 }