ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.49
Committed: Fri Jul 15 18:49:31 2011 UTC (12 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.48: +0 -21 lines
Log Message:
Robust weak consistency for ArrayBlockingQueue iterators

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import junit.framework.*;
9 import java.util.Arrays;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.List;
14 import java.util.NoSuchElementException;
15 import java.util.Queue;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.LinkedTransferQueue;
21 import static java.util.concurrent.TimeUnit.MILLISECONDS;
22 import static java.util.concurrent.TimeUnit.NANOSECONDS;
23
24 @SuppressWarnings({"unchecked", "rawtypes"})
25 public class LinkedTransferQueueTest extends JSR166TestCase {
26
27 public static class Generic extends BlockingQueueTest {
28 protected BlockingQueue emptyCollection() {
29 return new LinkedTransferQueue();
30 }
31 }
32
33 public static void main(String[] args) {
34 junit.textui.TestRunner.run(suite());
35 }
36
37 public static Test suite() {
38 return newTestSuite(LinkedTransferQueueTest.class,
39 new Generic().testSuite());
40 }
41
42 /**
43 * Constructor builds new queue with size being zero and empty
44 * being true
45 */
46 public void testConstructor1() {
47 assertEquals(0, new LinkedTransferQueue().size());
48 assertTrue(new LinkedTransferQueue().isEmpty());
49 }
50
51 /**
52 * Initializing constructor with null collection throws
53 * NullPointerException
54 */
55 public void testConstructor2() {
56 try {
57 new LinkedTransferQueue(null);
58 shouldThrow();
59 } catch (NullPointerException success) {}
60 }
61
62 /**
63 * Initializing from Collection of null elements throws
64 * NullPointerException
65 */
66 public void testConstructor3() {
67 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
68 try {
69 new LinkedTransferQueue(elements);
70 shouldThrow();
71 } catch (NullPointerException success) {}
72 }
73
74 /**
75 * Initializing constructor with a collection containing some null elements
76 * throws NullPointerException
77 */
78 public void testConstructor4() {
79 Integer[] ints = new Integer[SIZE];
80 for (int i = 0; i < SIZE-1; ++i)
81 ints[i] = i;
82 Collection<Integer> elements = Arrays.asList(ints);
83 try {
84 new LinkedTransferQueue(elements);
85 shouldThrow();
86 } catch (NullPointerException success) {}
87 }
88
89 /**
90 * Queue contains all elements of the collection it is initialized by
91 */
92 public void testConstructor5() {
93 Integer[] ints = new Integer[SIZE];
94 for (int i = 0; i < SIZE; ++i) {
95 ints[i] = i;
96 }
97 List intList = Arrays.asList(ints);
98 LinkedTransferQueue q
99 = new LinkedTransferQueue(intList);
100 assertEquals(q.size(), intList.size());
101 assertEquals(q.toString(), intList.toString());
102 assertTrue(Arrays.equals(q.toArray(),
103 intList.toArray()));
104 assertTrue(Arrays.equals(q.toArray(new Object[0]),
105 intList.toArray(new Object[0])));
106 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
107 intList.toArray(new Object[SIZE])));
108 for (int i = 0; i < SIZE; ++i) {
109 assertEquals(ints[i], q.poll());
110 }
111 }
112
113 /**
114 * remainingCapacity() always returns Integer.MAX_VALUE
115 */
116 public void testRemainingCapacity() {
117 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
118 for (int i = 0; i < SIZE; ++i) {
119 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
120 assertEquals(SIZE - i, q.size());
121 q.remove();
122 }
123 for (int i = 0; i < SIZE; ++i) {
124 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
125 assertEquals(i, q.size());
126 q.add(i);
127 }
128 }
129
130 /**
131 * addAll(this) throws IllegalArgumentException
132 */
133 public void testAddAllSelf() {
134 try {
135 LinkedTransferQueue q = populatedQueue(SIZE);
136 q.addAll(q);
137 shouldThrow();
138 } catch (IllegalArgumentException success) {}
139 }
140
141 /**
142 * addAll of a collection with any null elements throws
143 * NullPointerException after possibly adding some elements
144 */
145 public void testAddAll3() {
146 try {
147 LinkedTransferQueue q = new LinkedTransferQueue();
148 Integer[] ints = new Integer[SIZE];
149 for (int i = 0; i < SIZE - 1; ++i) {
150 ints[i] = i;
151 }
152 q.addAll(Arrays.asList(ints));
153 shouldThrow();
154 } catch (NullPointerException success) {}
155 }
156
157 /**
158 * Queue contains all elements, in traversal order, of successful addAll
159 */
160 public void testAddAll5() {
161 Integer[] empty = new Integer[0];
162 Integer[] ints = new Integer[SIZE];
163 for (int i = 0; i < SIZE; ++i) {
164 ints[i] = i;
165 }
166 LinkedTransferQueue q = new LinkedTransferQueue();
167 assertFalse(q.addAll(Arrays.asList(empty)));
168 assertTrue(q.addAll(Arrays.asList(ints)));
169 for (int i = 0; i < SIZE; ++i) {
170 assertEquals(ints[i], q.poll());
171 }
172 }
173
174 /**
175 * all elements successfully put are contained
176 */
177 public void testPut() {
178 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
179 for (int i = 0; i < SIZE; ++i) {
180 assertEquals(q.size(), i);
181 q.put(i);
182 assertTrue(q.contains(i));
183 }
184 }
185
186 /**
187 * take retrieves elements in FIFO order
188 */
189 public void testTake() throws InterruptedException {
190 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
191 for (int i = 0; i < SIZE; ++i) {
192 assertEquals(i, (int) q.take());
193 }
194 }
195
196 /**
197 * take removes existing elements until empty, then blocks interruptibly
198 */
199 public void testBlockingTake() throws InterruptedException {
200 final BlockingQueue q = populatedQueue(SIZE);
201 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
202 Thread t = newStartedThread(new CheckedRunnable() {
203 public void realRun() throws InterruptedException {
204 for (int i = 0; i < SIZE; ++i) {
205 assertEquals(i, q.take());
206 }
207
208 Thread.currentThread().interrupt();
209 try {
210 q.take();
211 shouldThrow();
212 } catch (InterruptedException success) {}
213 assertFalse(Thread.interrupted());
214
215 pleaseInterrupt.countDown();
216 try {
217 q.take();
218 shouldThrow();
219 } catch (InterruptedException success) {}
220 assertFalse(Thread.interrupted());
221 }});
222
223 await(pleaseInterrupt);
224 assertThreadStaysAlive(t);
225 t.interrupt();
226 awaitTermination(t);
227 }
228
229 /**
230 * poll succeeds unless empty
231 */
232 public void testPoll() throws InterruptedException {
233 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
234 for (int i = 0; i < SIZE; ++i) {
235 assertEquals(i, (int) q.poll());
236 }
237 assertNull(q.poll());
238 checkEmpty(q);
239 }
240
241 /**
242 * timed poll with zero timeout succeeds when non-empty, else times out
243 */
244 public void testTimedPoll0() throws InterruptedException {
245 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
246 for (int i = 0; i < SIZE; ++i) {
247 assertEquals(i, (int) q.poll(0, MILLISECONDS));
248 }
249 assertNull(q.poll(0, MILLISECONDS));
250 checkEmpty(q);
251 }
252
253 /**
254 * timed poll with nonzero timeout succeeds when non-empty, else times out
255 */
256 public void testTimedPoll() throws InterruptedException {
257 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
258 for (int i = 0; i < SIZE; ++i) {
259 long startTime = System.nanoTime();
260 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
261 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
262 }
263 long startTime = System.nanoTime();
264 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
265 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
266 checkEmpty(q);
267 }
268
269 /**
270 * Interrupted timed poll throws InterruptedException instead of
271 * returning timeout status
272 */
273 public void testInterruptedTimedPoll() throws InterruptedException {
274 final BlockingQueue<Integer> q = populatedQueue(SIZE);
275 final CountDownLatch aboutToWait = new CountDownLatch(1);
276 Thread t = newStartedThread(new CheckedRunnable() {
277 public void realRun() throws InterruptedException {
278 for (int i = 0; i < SIZE; ++i) {
279 long t0 = System.nanoTime();
280 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
281 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
282 }
283 long t0 = System.nanoTime();
284 aboutToWait.countDown();
285 try {
286 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
287 shouldThrow();
288 } catch (InterruptedException success) {
289 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
290 }
291 }});
292
293 aboutToWait.await();
294 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
295 t.interrupt();
296 awaitTermination(t, MEDIUM_DELAY_MS);
297 checkEmpty(q);
298 }
299
300 /**
301 * timed poll after thread interrupted throws InterruptedException
302 * instead of returning timeout status
303 */
304 public void testTimedPollAfterInterrupt() throws InterruptedException {
305 final BlockingQueue<Integer> q = populatedQueue(SIZE);
306 Thread t = newStartedThread(new CheckedRunnable() {
307 public void realRun() throws InterruptedException {
308 Thread.currentThread().interrupt();
309 for (int i = 0; i < SIZE; ++i) {
310 long t0 = System.nanoTime();
311 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
312 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
313 }
314 try {
315 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
316 shouldThrow();
317 } catch (InterruptedException success) {}
318 }});
319
320 awaitTermination(t, MEDIUM_DELAY_MS);
321 checkEmpty(q);
322 }
323
324 /**
325 * peek returns next element, or null if empty
326 */
327 public void testPeek() throws InterruptedException {
328 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
329 for (int i = 0; i < SIZE; ++i) {
330 assertEquals(i, (int) q.peek());
331 assertEquals(i, (int) q.poll());
332 assertTrue(q.peek() == null ||
333 i != (int) q.peek());
334 }
335 assertNull(q.peek());
336 checkEmpty(q);
337 }
338
339 /**
340 * element returns next element, or throws NoSuchElementException if empty
341 */
342 public void testElement() throws InterruptedException {
343 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
344 for (int i = 0; i < SIZE; ++i) {
345 assertEquals(i, (int) q.element());
346 assertEquals(i, (int) q.poll());
347 }
348 try {
349 q.element();
350 shouldThrow();
351 } catch (NoSuchElementException success) {}
352 checkEmpty(q);
353 }
354
355 /**
356 * remove removes next element, or throws NoSuchElementException if empty
357 */
358 public void testRemove() throws InterruptedException {
359 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
360 for (int i = 0; i < SIZE; ++i) {
361 assertEquals(i, (int) q.remove());
362 }
363 try {
364 q.remove();
365 shouldThrow();
366 } catch (NoSuchElementException success) {}
367 checkEmpty(q);
368 }
369
370 /**
371 * An add following remove(x) succeeds
372 */
373 public void testRemoveElementAndAdd() throws InterruptedException {
374 LinkedTransferQueue q = new LinkedTransferQueue();
375 assertTrue(q.add(one));
376 assertTrue(q.add(two));
377 assertTrue(q.remove(one));
378 assertTrue(q.remove(two));
379 assertTrue(q.add(three));
380 assertSame(q.take(), three);
381 }
382
383 /**
384 * contains(x) reports true when elements added but not yet removed
385 */
386 public void testContains() {
387 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
388 for (int i = 0; i < SIZE; ++i) {
389 assertTrue(q.contains(i));
390 assertEquals(i, (int) q.poll());
391 assertFalse(q.contains(i));
392 }
393 }
394
395 /**
396 * clear removes all elements
397 */
398 public void testClear() throws InterruptedException {
399 LinkedTransferQueue q = populatedQueue(SIZE);
400 q.clear();
401 checkEmpty(q);
402 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
403 q.add(one);
404 assertFalse(q.isEmpty());
405 assertEquals(1, q.size());
406 assertTrue(q.contains(one));
407 q.clear();
408 checkEmpty(q);
409 }
410
411 /**
412 * containsAll(c) is true when c contains a subset of elements
413 */
414 public void testContainsAll() {
415 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
416 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
417 for (int i = 0; i < SIZE; ++i) {
418 assertTrue(q.containsAll(p));
419 assertFalse(p.containsAll(q));
420 p.add(i);
421 }
422 assertTrue(p.containsAll(q));
423 }
424
425 /**
426 * retainAll(c) retains only those elements of c and reports true
427 * if changed
428 */
429 public void testRetainAll() {
430 LinkedTransferQueue q = populatedQueue(SIZE);
431 LinkedTransferQueue p = populatedQueue(SIZE);
432 for (int i = 0; i < SIZE; ++i) {
433 boolean changed = q.retainAll(p);
434 if (i == 0) {
435 assertFalse(changed);
436 } else {
437 assertTrue(changed);
438 }
439 assertTrue(q.containsAll(p));
440 assertEquals(SIZE - i, q.size());
441 p.remove();
442 }
443 }
444
445 /**
446 * removeAll(c) removes only those elements of c and reports true
447 * if changed
448 */
449 public void testRemoveAll() {
450 for (int i = 1; i < SIZE; ++i) {
451 LinkedTransferQueue q = populatedQueue(SIZE);
452 LinkedTransferQueue p = populatedQueue(i);
453 assertTrue(q.removeAll(p));
454 assertEquals(SIZE - i, q.size());
455 for (int j = 0; j < i; ++j) {
456 assertFalse(q.contains(p.remove()));
457 }
458 }
459 }
460
461 /**
462 * toArray() contains all elements in FIFO order
463 */
464 public void testToArray() {
465 LinkedTransferQueue q = populatedQueue(SIZE);
466 Object[] o = q.toArray();
467 for (int i = 0; i < o.length; i++) {
468 assertSame(o[i], q.poll());
469 }
470 }
471
472 /**
473 * toArray(a) contains all elements in FIFO order
474 */
475 public void testToArray2() {
476 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
477 Integer[] ints = new Integer[SIZE];
478 Integer[] array = q.toArray(ints);
479 assertSame(ints, array);
480 for (int i = 0; i < ints.length; i++) {
481 assertSame(ints[i], q.poll());
482 }
483 }
484
485 /**
486 * toArray(incompatible array type) throws ArrayStoreException
487 */
488 public void testToArray1_BadArg() {
489 LinkedTransferQueue q = populatedQueue(SIZE);
490 try {
491 q.toArray(new String[10]);
492 shouldThrow();
493 } catch (ArrayStoreException success) {}
494 }
495
496 /**
497 * iterator iterates through all elements
498 */
499 public void testIterator() throws InterruptedException {
500 LinkedTransferQueue q = populatedQueue(SIZE);
501 Iterator it = q.iterator();
502 int i = 0;
503 while (it.hasNext()) {
504 assertEquals(it.next(), i++);
505 }
506 assertEquals(i, SIZE);
507 }
508
509 /**
510 * iterator.remove() removes current element
511 */
512 public void testIteratorRemove() {
513 final LinkedTransferQueue q = new LinkedTransferQueue();
514 q.add(two);
515 q.add(one);
516 q.add(three);
517
518 Iterator it = q.iterator();
519 it.next();
520 it.remove();
521
522 it = q.iterator();
523 assertSame(it.next(), one);
524 assertSame(it.next(), three);
525 assertFalse(it.hasNext());
526 }
527
528 /**
529 * iterator ordering is FIFO
530 */
531 public void testIteratorOrdering() {
532 final LinkedTransferQueue<Integer> q
533 = new LinkedTransferQueue<Integer>();
534 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
535 q.add(one);
536 q.add(two);
537 q.add(three);
538 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
539 int k = 0;
540 for (Integer n : q) {
541 assertEquals(++k, (int) n);
542 }
543 assertEquals(3, k);
544 }
545
546 /**
547 * Modifications do not cause iterators to fail
548 */
549 public void testWeaklyConsistentIteration() {
550 final LinkedTransferQueue q = new LinkedTransferQueue();
551 q.add(one);
552 q.add(two);
553 q.add(three);
554 for (Iterator it = q.iterator(); it.hasNext();) {
555 q.remove();
556 it.next();
557 }
558 assertEquals(0, q.size());
559 }
560
561 /**
562 * toString contains toStrings of elements
563 */
564 public void testToString() {
565 LinkedTransferQueue q = populatedQueue(SIZE);
566 String s = q.toString();
567 for (int i = 0; i < SIZE; ++i) {
568 assertTrue(s.contains(String.valueOf(i)));
569 }
570 }
571
572 /**
573 * offer transfers elements across Executor tasks
574 */
575 public void testOfferInExecutor() {
576 final LinkedTransferQueue q = new LinkedTransferQueue();
577 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
578 ExecutorService executor = Executors.newFixedThreadPool(2);
579
580 executor.execute(new CheckedRunnable() {
581 public void realRun() throws InterruptedException {
582 threadsStarted.await();
583 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
584 }});
585
586 executor.execute(new CheckedRunnable() {
587 public void realRun() throws InterruptedException {
588 threadsStarted.await();
589 assertSame(one, q.take());
590 checkEmpty(q);
591 }});
592
593 joinPool(executor);
594 }
595
596 /**
597 * timed poll retrieves elements across Executor threads
598 */
599 public void testPollInExecutor() {
600 final LinkedTransferQueue q = new LinkedTransferQueue();
601 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
602 ExecutorService executor = Executors.newFixedThreadPool(2);
603
604 executor.execute(new CheckedRunnable() {
605 public void realRun() throws InterruptedException {
606 assertNull(q.poll());
607 threadsStarted.await();
608 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
609 checkEmpty(q);
610 }});
611
612 executor.execute(new CheckedRunnable() {
613 public void realRun() throws InterruptedException {
614 threadsStarted.await();
615 q.put(one);
616 }});
617
618 joinPool(executor);
619 }
620
621 /**
622 * A deserialized serialized queue has same elements in same order
623 */
624 public void testSerialization() throws Exception {
625 Queue x = populatedQueue(SIZE);
626 Queue y = serialClone(x);
627
628 assertTrue(x != y);
629 assertEquals(x.size(), y.size());
630 assertEquals(x.toString(), y.toString());
631 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
632 while (!x.isEmpty()) {
633 assertFalse(y.isEmpty());
634 assertEquals(x.remove(), y.remove());
635 }
636 assertTrue(y.isEmpty());
637 }
638
639 /**
640 * drainTo(c) empties queue into another collection c
641 */
642 public void testDrainTo() {
643 LinkedTransferQueue q = populatedQueue(SIZE);
644 ArrayList l = new ArrayList();
645 q.drainTo(l);
646 assertEquals(q.size(), 0);
647 assertEquals(l.size(), SIZE);
648 for (int i = 0; i < SIZE; ++i) {
649 assertEquals(l.get(i), i);
650 }
651 q.add(zero);
652 q.add(one);
653 assertFalse(q.isEmpty());
654 assertTrue(q.contains(zero));
655 assertTrue(q.contains(one));
656 l.clear();
657 q.drainTo(l);
658 assertEquals(q.size(), 0);
659 assertEquals(l.size(), 2);
660 for (int i = 0; i < 2; ++i) {
661 assertEquals(l.get(i), i);
662 }
663 }
664
665 /**
666 * drainTo(c) empties full queue, unblocking a waiting put.
667 */
668 public void testDrainToWithActivePut() throws InterruptedException {
669 final LinkedTransferQueue q = populatedQueue(SIZE);
670 Thread t = newStartedThread(new CheckedRunnable() {
671 public void realRun() {
672 q.put(SIZE + 1);
673 }});
674 ArrayList l = new ArrayList();
675 q.drainTo(l);
676 assertTrue(l.size() >= SIZE);
677 for (int i = 0; i < SIZE; ++i) {
678 assertEquals(l.get(i), i);
679 }
680 awaitTermination(t, MEDIUM_DELAY_MS);
681 assertTrue(q.size() + l.size() >= SIZE);
682 }
683
684 /**
685 * drainTo(c, n) empties first min(n, size) elements of queue into c
686 */
687 public void testDrainToN() {
688 LinkedTransferQueue q = new LinkedTransferQueue();
689 for (int i = 0; i < SIZE + 2; ++i) {
690 for (int j = 0; j < SIZE; j++) {
691 assertTrue(q.offer(j));
692 }
693 ArrayList l = new ArrayList();
694 q.drainTo(l, i);
695 int k = (i < SIZE) ? i : SIZE;
696 assertEquals(l.size(), k);
697 assertEquals(q.size(), SIZE - k);
698 for (int j = 0; j < k; ++j) {
699 assertEquals(l.get(j), j);
700 }
701 while (q.poll() != null)
702 ;
703 }
704 }
705
706 /**
707 * timed poll() or take() increments the waiting consumer count;
708 * offer(e) decrements the waiting consumer count
709 */
710 public void testWaitingConsumer() throws InterruptedException {
711 final LinkedTransferQueue q = new LinkedTransferQueue();
712 assertEquals(q.getWaitingConsumerCount(), 0);
713 assertFalse(q.hasWaitingConsumer());
714 final CountDownLatch threadStarted = new CountDownLatch(1);
715
716 Thread t = newStartedThread(new CheckedRunnable() {
717 public void realRun() throws InterruptedException {
718 threadStarted.countDown();
719 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
720 assertEquals(q.getWaitingConsumerCount(), 0);
721 assertFalse(q.hasWaitingConsumer());
722 }});
723
724 threadStarted.await();
725 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
726 assertEquals(q.getWaitingConsumerCount(), 1);
727 assertTrue(q.hasWaitingConsumer());
728
729 assertTrue(q.offer(one));
730 assertEquals(q.getWaitingConsumerCount(), 0);
731 assertFalse(q.hasWaitingConsumer());
732
733 awaitTermination(t, MEDIUM_DELAY_MS);
734 }
735
736 /**
737 * transfer(null) throws NullPointerException
738 */
739 public void testTransfer1() throws InterruptedException {
740 try {
741 LinkedTransferQueue q = new LinkedTransferQueue();
742 q.transfer(null);
743 shouldThrow();
744 } catch (NullPointerException success) {}
745 }
746
747 /**
748 * transfer waits until a poll occurs. The transfered element
749 * is returned by this associated poll.
750 */
751 public void testTransfer2() throws InterruptedException {
752 final LinkedTransferQueue<Integer> q
753 = new LinkedTransferQueue<Integer>();
754 final CountDownLatch threadStarted = new CountDownLatch(1);
755
756 Thread t = newStartedThread(new CheckedRunnable() {
757 public void realRun() throws InterruptedException {
758 threadStarted.countDown();
759 q.transfer(five);
760 checkEmpty(q);
761 }});
762
763 threadStarted.await();
764 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
765 assertEquals(1, q.size());
766 assertSame(five, q.poll());
767 checkEmpty(q);
768 awaitTermination(t, MEDIUM_DELAY_MS);
769 }
770
771 /**
772 * transfer waits until a poll occurs, and then transfers in fifo order
773 */
774 public void testTransfer3() throws InterruptedException {
775 final LinkedTransferQueue<Integer> q
776 = new LinkedTransferQueue<Integer>();
777
778 Thread first = newStartedThread(new CheckedRunnable() {
779 public void realRun() throws InterruptedException {
780 q.transfer(four);
781 assertTrue(!q.contains(four));
782 assertEquals(1, q.size());
783 }});
784
785 Thread interruptedThread = newStartedThread(
786 new CheckedInterruptedRunnable() {
787 public void realRun() throws InterruptedException {
788 while (q.isEmpty())
789 Thread.yield();
790 q.transfer(five);
791 }});
792
793 while (q.size() < 2)
794 Thread.yield();
795 assertEquals(2, q.size());
796 assertSame(four, q.poll());
797 first.join();
798 assertEquals(1, q.size());
799 interruptedThread.interrupt();
800 interruptedThread.join();
801 checkEmpty(q);
802 }
803
804 /**
805 * transfer waits until a poll occurs, at which point the polling
806 * thread returns the element
807 */
808 public void testTransfer4() throws InterruptedException {
809 final LinkedTransferQueue q = new LinkedTransferQueue();
810
811 Thread t = newStartedThread(new CheckedRunnable() {
812 public void realRun() throws InterruptedException {
813 q.transfer(four);
814 assertFalse(q.contains(four));
815 assertSame(three, q.poll());
816 }});
817
818 while (q.isEmpty())
819 Thread.yield();
820 assertFalse(q.isEmpty());
821 assertEquals(1, q.size());
822 assertTrue(q.offer(three));
823 assertSame(four, q.poll());
824 awaitTermination(t, MEDIUM_DELAY_MS);
825 }
826
827 /**
828 * transfer waits until a take occurs. The transfered element
829 * is returned by this associated take.
830 */
831 public void testTransfer5() throws InterruptedException {
832 final LinkedTransferQueue<Integer> q
833 = new LinkedTransferQueue<Integer>();
834
835 Thread t = newStartedThread(new CheckedRunnable() {
836 public void realRun() throws InterruptedException {
837 q.transfer(four);
838 checkEmpty(q);
839 }});
840
841 while (q.isEmpty())
842 Thread.yield();
843 assertFalse(q.isEmpty());
844 assertEquals(1, q.size());
845 assertSame(four, q.take());
846 checkEmpty(q);
847 awaitTermination(t, MEDIUM_DELAY_MS);
848 }
849
850 /**
851 * tryTransfer(null) throws NullPointerException
852 */
853 public void testTryTransfer1() {
854 try {
855 final LinkedTransferQueue q = new LinkedTransferQueue();
856 q.tryTransfer(null);
857 shouldThrow();
858 } catch (NullPointerException success) {}
859 }
860
861 /**
862 * tryTransfer returns false and does not enqueue if there are no
863 * consumers waiting to poll or take.
864 */
865 public void testTryTransfer2() throws InterruptedException {
866 final LinkedTransferQueue q = new LinkedTransferQueue();
867 assertFalse(q.tryTransfer(new Object()));
868 assertFalse(q.hasWaitingConsumer());
869 checkEmpty(q);
870 }
871
872 /**
873 * If there is a consumer waiting in timed poll, tryTransfer
874 * returns true while successfully transfering object.
875 */
876 public void testTryTransfer3() throws InterruptedException {
877 final Object hotPotato = new Object();
878 final LinkedTransferQueue q = new LinkedTransferQueue();
879
880 Thread t = newStartedThread(new CheckedRunnable() {
881 public void realRun() {
882 while (! q.hasWaitingConsumer())
883 Thread.yield();
884 assertTrue(q.hasWaitingConsumer());
885 checkEmpty(q);
886 assertTrue(q.tryTransfer(hotPotato));
887 }});
888
889 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
890 checkEmpty(q);
891 awaitTermination(t, MEDIUM_DELAY_MS);
892 }
893
894 /**
895 * If there is a consumer waiting in take, tryTransfer returns
896 * true while successfully transfering object.
897 */
898 public void testTryTransfer4() throws InterruptedException {
899 final Object hotPotato = new Object();
900 final LinkedTransferQueue q = new LinkedTransferQueue();
901
902 Thread t = newStartedThread(new CheckedRunnable() {
903 public void realRun() {
904 while (! q.hasWaitingConsumer())
905 Thread.yield();
906 assertTrue(q.hasWaitingConsumer());
907 checkEmpty(q);
908 assertTrue(q.tryTransfer(hotPotato));
909 }});
910
911 assertSame(q.take(), hotPotato);
912 checkEmpty(q);
913 awaitTermination(t, MEDIUM_DELAY_MS);
914 }
915
916 /**
917 * tryTransfer blocks interruptibly if no takers
918 */
919 public void testTryTransfer5() throws InterruptedException {
920 final LinkedTransferQueue q = new LinkedTransferQueue();
921 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
922 assertTrue(q.isEmpty());
923
924 Thread t = newStartedThread(new CheckedRunnable() {
925 public void realRun() throws InterruptedException {
926 Thread.currentThread().interrupt();
927 try {
928 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
929 shouldThrow();
930 } catch (InterruptedException success) {}
931 assertFalse(Thread.interrupted());
932
933 pleaseInterrupt.countDown();
934 try {
935 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
936 shouldThrow();
937 } catch (InterruptedException success) {}
938 assertFalse(Thread.interrupted());
939 }});
940
941 await(pleaseInterrupt);
942 assertThreadStaysAlive(t);
943 t.interrupt();
944 awaitTermination(t);
945 checkEmpty(q);
946 }
947
948 /**
949 * tryTransfer gives up after the timeout and returns false
950 */
951 public void testTryTransfer6() throws InterruptedException {
952 final LinkedTransferQueue q = new LinkedTransferQueue();
953
954 Thread t = newStartedThread(new CheckedRunnable() {
955 public void realRun() throws InterruptedException {
956 long t0 = System.nanoTime();
957 assertFalse(q.tryTransfer(new Object(),
958 timeoutMillis(), MILLISECONDS));
959 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
960 checkEmpty(q);
961 }});
962
963 awaitTermination(t);
964 checkEmpty(q);
965 }
966
967 /**
968 * tryTransfer waits for any elements previously in to be removed
969 * before transfering to a poll or take
970 */
971 public void testTryTransfer7() throws InterruptedException {
972 final LinkedTransferQueue q = new LinkedTransferQueue();
973 assertTrue(q.offer(four));
974
975 Thread t = newStartedThread(new CheckedRunnable() {
976 public void realRun() throws InterruptedException {
977 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
978 checkEmpty(q);
979 }});
980
981 while (q.size() != 2)
982 Thread.yield();
983 assertEquals(2, q.size());
984 assertSame(four, q.poll());
985 assertSame(five, q.poll());
986 checkEmpty(q);
987 awaitTermination(t, MEDIUM_DELAY_MS);
988 }
989
990 /**
991 * tryTransfer attempts to enqueue into the queue and fails
992 * returning false not enqueueing and the successive poll is null
993 */
994 public void testTryTransfer8() throws InterruptedException {
995 final LinkedTransferQueue q = new LinkedTransferQueue();
996 assertTrue(q.offer(four));
997 assertEquals(1, q.size());
998 long t0 = System.nanoTime();
999 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1000 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
1001 assertEquals(1, q.size());
1002 assertSame(four, q.poll());
1003 assertNull(q.poll());
1004 checkEmpty(q);
1005 }
1006
1007 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1008 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1009 checkEmpty(q);
1010 for (int i = 0; i < n; i++) {
1011 assertEquals(i, q.size());
1012 assertTrue(q.offer(i));
1013 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1014 }
1015 assertFalse(q.isEmpty());
1016 return q;
1017 }
1018 }