ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.47
Committed: Mon May 30 22:43:20 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.46: +13 -119 lines
Log Message:
refactor more generic BlockingQueue tests into BlockingQueueTest.java

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import java.io.BufferedInputStream;
9 import java.io.BufferedOutputStream;
10 import java.io.ByteArrayInputStream;
11 import java.io.ByteArrayOutputStream;
12 import java.io.ObjectInputStream;
13 import java.io.ObjectOutputStream;
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.Collection;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.NoSuchElementException;
20 import java.util.concurrent.BlockingQueue;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.LinkedTransferQueue;
25 import static java.util.concurrent.TimeUnit.MILLISECONDS;
26 import static java.util.concurrent.TimeUnit.NANOSECONDS;
27 import junit.framework.Test;
28 import junit.framework.TestSuite;
29
30 @SuppressWarnings({"unchecked", "rawtypes"})
31 public class LinkedTransferQueueTest extends JSR166TestCase {
32
33 public static class Generic extends BlockingQueueTest {
34 protected BlockingQueue emptyCollection() {
35 return new LinkedTransferQueue();
36 }
37 }
38
39 public static void main(String[] args) {
40 junit.textui.TestRunner.run(suite());
41 }
42
43 public static Test suite() {
44 return newTestSuite(LinkedTransferQueueTest.class,
45 new Generic().testSuite());
46 }
47
48 /**
49 * Constructor builds new queue with size being zero and empty
50 * being true
51 */
52 public void testConstructor1() {
53 assertEquals(0, new LinkedTransferQueue().size());
54 assertTrue(new LinkedTransferQueue().isEmpty());
55 }
56
57 /**
58 * Initializing constructor with null collection throws
59 * NullPointerException
60 */
61 public void testConstructor2() {
62 try {
63 new LinkedTransferQueue(null);
64 shouldThrow();
65 } catch (NullPointerException success) {}
66 }
67
68 /**
69 * Initializing from Collection of null elements throws
70 * NullPointerException
71 */
72 public void testConstructor3() {
73 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
74 try {
75 new LinkedTransferQueue(elements);
76 shouldThrow();
77 } catch (NullPointerException success) {}
78 }
79
80 /**
81 * Initializing constructor with a collection containing some null elements
82 * throws NullPointerException
83 */
84 public void testConstructor4() {
85 Integer[] ints = new Integer[SIZE];
86 for (int i = 0; i < SIZE-1; ++i)
87 ints[i] = i;
88 Collection<Integer> elements = Arrays.asList(ints);
89 try {
90 new LinkedTransferQueue(elements);
91 shouldThrow();
92 } catch (NullPointerException success) {}
93 }
94
95 /**
96 * Queue contains all elements of the collection it is initialized by
97 */
98 public void testConstructor5() {
99 Integer[] ints = new Integer[SIZE];
100 for (int i = 0; i < SIZE; ++i) {
101 ints[i] = i;
102 }
103 List intList = Arrays.asList(ints);
104 LinkedTransferQueue q
105 = new LinkedTransferQueue(intList);
106 assertEquals(q.size(), intList.size());
107 assertEquals(q.toString(), intList.toString());
108 assertTrue(Arrays.equals(q.toArray(),
109 intList.toArray()));
110 assertTrue(Arrays.equals(q.toArray(new Object[0]),
111 intList.toArray(new Object[0])));
112 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
113 intList.toArray(new Object[SIZE])));
114 for (int i = 0; i < SIZE; ++i) {
115 assertEquals(ints[i], q.poll());
116 }
117 }
118
119 /**
120 * remainingCapacity() always returns Integer.MAX_VALUE
121 */
122 public void testRemainingCapacity() {
123 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
124 for (int i = 0; i < SIZE; ++i) {
125 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
126 assertEquals(SIZE - i, q.size());
127 q.remove();
128 }
129 for (int i = 0; i < SIZE; ++i) {
130 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
131 assertEquals(i, q.size());
132 q.add(i);
133 }
134 }
135
136 /**
137 * addAll(this) throws IllegalArgumentException
138 */
139 public void testAddAllSelf() {
140 try {
141 LinkedTransferQueue q = populatedQueue(SIZE);
142 q.addAll(q);
143 shouldThrow();
144 } catch (IllegalArgumentException success) {}
145 }
146
147 /**
148 * addAll of a collection with any null elements throws
149 * NullPointerException after possibly adding some elements
150 */
151 public void testAddAll3() {
152 try {
153 LinkedTransferQueue q = new LinkedTransferQueue();
154 Integer[] ints = new Integer[SIZE];
155 for (int i = 0; i < SIZE - 1; ++i) {
156 ints[i] = i;
157 }
158 q.addAll(Arrays.asList(ints));
159 shouldThrow();
160 } catch (NullPointerException success) {}
161 }
162
163 /**
164 * Queue contains all elements, in traversal order, of successful addAll
165 */
166 public void testAddAll5() {
167 Integer[] empty = new Integer[0];
168 Integer[] ints = new Integer[SIZE];
169 for (int i = 0; i < SIZE; ++i) {
170 ints[i] = i;
171 }
172 LinkedTransferQueue q = new LinkedTransferQueue();
173 assertFalse(q.addAll(Arrays.asList(empty)));
174 assertTrue(q.addAll(Arrays.asList(ints)));
175 for (int i = 0; i < SIZE; ++i) {
176 assertEquals(ints[i], q.poll());
177 }
178 }
179
180 /**
181 * all elements successfully put are contained
182 */
183 public void testPut() {
184 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
185 for (int i = 0; i < SIZE; ++i) {
186 assertEquals(q.size(), i);
187 q.put(i);
188 assertTrue(q.contains(i));
189 }
190 }
191
192 /**
193 * take retrieves elements in FIFO order
194 */
195 public void testTake() throws InterruptedException {
196 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
197 for (int i = 0; i < SIZE; ++i) {
198 assertEquals(i, (int) q.take());
199 }
200 }
201
202 /**
203 * take removes existing elements until empty, then blocks interruptibly
204 */
205 public void testBlockingTake() throws InterruptedException {
206 final BlockingQueue q = populatedQueue(SIZE);
207 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
208 Thread t = newStartedThread(new CheckedRunnable() {
209 public void realRun() throws InterruptedException {
210 for (int i = 0; i < SIZE; ++i) {
211 assertEquals(i, q.take());
212 }
213
214 Thread.currentThread().interrupt();
215 try {
216 q.take();
217 shouldThrow();
218 } catch (InterruptedException success) {}
219 assertFalse(Thread.interrupted());
220
221 pleaseInterrupt.countDown();
222 try {
223 q.take();
224 shouldThrow();
225 } catch (InterruptedException success) {}
226 assertFalse(Thread.interrupted());
227 }});
228
229 await(pleaseInterrupt);
230 assertThreadStaysAlive(t);
231 t.interrupt();
232 awaitTermination(t);
233 }
234
235 /**
236 * poll succeeds unless empty
237 */
238 public void testPoll() throws InterruptedException {
239 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
240 for (int i = 0; i < SIZE; ++i) {
241 assertEquals(i, (int) q.poll());
242 }
243 assertNull(q.poll());
244 checkEmpty(q);
245 }
246
247 /**
248 * timed poll with zero timeout succeeds when non-empty, else times out
249 */
250 public void testTimedPoll0() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll(0, MILLISECONDS));
254 }
255 assertNull(q.poll(0, MILLISECONDS));
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with nonzero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 for (int i = 0; i < SIZE; ++i) {
265 long startTime = System.nanoTime();
266 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
268 }
269 long startTime = System.nanoTime();
270 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
272 checkEmpty(q);
273 }
274
275 /**
276 * Interrupted timed poll throws InterruptedException instead of
277 * returning timeout status
278 */
279 public void testInterruptedTimedPoll() throws InterruptedException {
280 final BlockingQueue<Integer> q = populatedQueue(SIZE);
281 final CountDownLatch aboutToWait = new CountDownLatch(1);
282 Thread t = newStartedThread(new CheckedRunnable() {
283 public void realRun() throws InterruptedException {
284 for (int i = 0; i < SIZE; ++i) {
285 long t0 = System.nanoTime();
286 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
287 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
288 }
289 long t0 = System.nanoTime();
290 aboutToWait.countDown();
291 try {
292 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
293 shouldThrow();
294 } catch (InterruptedException success) {
295 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
296 }
297 }});
298
299 aboutToWait.await();
300 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
301 t.interrupt();
302 awaitTermination(t, MEDIUM_DELAY_MS);
303 checkEmpty(q);
304 }
305
306 /**
307 * timed poll after thread interrupted throws InterruptedException
308 * instead of returning timeout status
309 */
310 public void testTimedPollAfterInterrupt() throws InterruptedException {
311 final BlockingQueue<Integer> q = populatedQueue(SIZE);
312 Thread t = newStartedThread(new CheckedRunnable() {
313 public void realRun() throws InterruptedException {
314 Thread.currentThread().interrupt();
315 for (int i = 0; i < SIZE; ++i) {
316 long t0 = System.nanoTime();
317 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
318 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
319 }
320 try {
321 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
322 shouldThrow();
323 } catch (InterruptedException success) {}
324 }});
325
326 awaitTermination(t, MEDIUM_DELAY_MS);
327 checkEmpty(q);
328 }
329
330 /**
331 * peek returns next element, or null if empty
332 */
333 public void testPeek() throws InterruptedException {
334 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
335 for (int i = 0; i < SIZE; ++i) {
336 assertEquals(i, (int) q.peek());
337 assertEquals(i, (int) q.poll());
338 assertTrue(q.peek() == null ||
339 i != (int) q.peek());
340 }
341 assertNull(q.peek());
342 checkEmpty(q);
343 }
344
345 /**
346 * element returns next element, or throws NoSuchElementException if empty
347 */
348 public void testElement() throws InterruptedException {
349 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
350 for (int i = 0; i < SIZE; ++i) {
351 assertEquals(i, (int) q.element());
352 assertEquals(i, (int) q.poll());
353 }
354 try {
355 q.element();
356 shouldThrow();
357 } catch (NoSuchElementException success) {}
358 checkEmpty(q);
359 }
360
361 /**
362 * remove removes next element, or throws NoSuchElementException if empty
363 */
364 public void testRemove() throws InterruptedException {
365 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
366 for (int i = 0; i < SIZE; ++i) {
367 assertEquals(i, (int) q.remove());
368 }
369 try {
370 q.remove();
371 shouldThrow();
372 } catch (NoSuchElementException success) {}
373 checkEmpty(q);
374 }
375
376 /**
377 * remove(x) removes x and returns true if present
378 */
379 public void testRemoveElement() throws InterruptedException {
380 LinkedTransferQueue q = populatedQueue(SIZE);
381 for (int i = 1; i < SIZE; i+=2) {
382 assertTrue(q.contains(i));
383 assertTrue(q.remove(i));
384 assertFalse(q.contains(i));
385 assertTrue(q.contains(i-1));
386 }
387 for (int i = 0; i < SIZE; i+=2) {
388 assertTrue(q.contains(i));
389 assertTrue(q.remove(i));
390 assertFalse(q.contains(i));
391 assertFalse(q.remove(i+1));
392 assertFalse(q.contains(i+1));
393 }
394 checkEmpty(q);
395 }
396
397 /**
398 * An add following remove(x) succeeds
399 */
400 public void testRemoveElementAndAdd() throws InterruptedException {
401 LinkedTransferQueue q = new LinkedTransferQueue();
402 assertTrue(q.add(one));
403 assertTrue(q.add(two));
404 assertTrue(q.remove(one));
405 assertTrue(q.remove(two));
406 assertTrue(q.add(three));
407 assertSame(q.take(), three);
408 }
409
410 /**
411 * contains(x) reports true when elements added but not yet removed
412 */
413 public void testContains() {
414 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
415 for (int i = 0; i < SIZE; ++i) {
416 assertTrue(q.contains(i));
417 assertEquals(i, (int) q.poll());
418 assertFalse(q.contains(i));
419 }
420 }
421
422 /**
423 * clear removes all elements
424 */
425 public void testClear() throws InterruptedException {
426 LinkedTransferQueue q = populatedQueue(SIZE);
427 q.clear();
428 checkEmpty(q);
429 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
430 q.add(one);
431 assertFalse(q.isEmpty());
432 assertEquals(1, q.size());
433 assertTrue(q.contains(one));
434 q.clear();
435 checkEmpty(q);
436 }
437
438 /**
439 * containsAll(c) is true when c contains a subset of elements
440 */
441 public void testContainsAll() {
442 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
443 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
444 for (int i = 0; i < SIZE; ++i) {
445 assertTrue(q.containsAll(p));
446 assertFalse(p.containsAll(q));
447 p.add(i);
448 }
449 assertTrue(p.containsAll(q));
450 }
451
452 /**
453 * retainAll(c) retains only those elements of c and reports true
454 * if changed
455 */
456 public void testRetainAll() {
457 LinkedTransferQueue q = populatedQueue(SIZE);
458 LinkedTransferQueue p = populatedQueue(SIZE);
459 for (int i = 0; i < SIZE; ++i) {
460 boolean changed = q.retainAll(p);
461 if (i == 0) {
462 assertFalse(changed);
463 } else {
464 assertTrue(changed);
465 }
466 assertTrue(q.containsAll(p));
467 assertEquals(SIZE - i, q.size());
468 p.remove();
469 }
470 }
471
472 /**
473 * removeAll(c) removes only those elements of c and reports true
474 * if changed
475 */
476 public void testRemoveAll() {
477 for (int i = 1; i < SIZE; ++i) {
478 LinkedTransferQueue q = populatedQueue(SIZE);
479 LinkedTransferQueue p = populatedQueue(i);
480 assertTrue(q.removeAll(p));
481 assertEquals(SIZE - i, q.size());
482 for (int j = 0; j < i; ++j) {
483 assertFalse(q.contains(p.remove()));
484 }
485 }
486 }
487
488 /**
489 * toArray() contains all elements in FIFO order
490 */
491 public void testToArray() {
492 LinkedTransferQueue q = populatedQueue(SIZE);
493 Object[] o = q.toArray();
494 for (int i = 0; i < o.length; i++) {
495 assertSame(o[i], q.poll());
496 }
497 }
498
499 /**
500 * toArray(a) contains all elements in FIFO order
501 */
502 public void testToArray2() {
503 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
504 Integer[] ints = new Integer[SIZE];
505 Integer[] array = q.toArray(ints);
506 assertSame(ints, array);
507 for (int i = 0; i < ints.length; i++) {
508 assertSame(ints[i], q.poll());
509 }
510 }
511
512 /**
513 * toArray(incompatible array type) throws ArrayStoreException
514 */
515 public void testToArray1_BadArg() {
516 LinkedTransferQueue q = populatedQueue(SIZE);
517 try {
518 q.toArray(new String[10]);
519 shouldThrow();
520 } catch (ArrayStoreException success) {}
521 }
522
523 /**
524 * iterator iterates through all elements
525 */
526 public void testIterator() throws InterruptedException {
527 LinkedTransferQueue q = populatedQueue(SIZE);
528 Iterator it = q.iterator();
529 int i = 0;
530 while (it.hasNext()) {
531 assertEquals(it.next(), i++);
532 }
533 assertEquals(i, SIZE);
534 }
535
536 /**
537 * iterator.remove() removes current element
538 */
539 public void testIteratorRemove() {
540 final LinkedTransferQueue q = new LinkedTransferQueue();
541 q.add(two);
542 q.add(one);
543 q.add(three);
544
545 Iterator it = q.iterator();
546 it.next();
547 it.remove();
548
549 it = q.iterator();
550 assertSame(it.next(), one);
551 assertSame(it.next(), three);
552 assertFalse(it.hasNext());
553 }
554
555 /**
556 * iterator ordering is FIFO
557 */
558 public void testIteratorOrdering() {
559 final LinkedTransferQueue<Integer> q
560 = new LinkedTransferQueue<Integer>();
561 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
562 q.add(one);
563 q.add(two);
564 q.add(three);
565 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
566 int k = 0;
567 for (Integer n : q) {
568 assertEquals(++k, (int) n);
569 }
570 assertEquals(3, k);
571 }
572
573 /**
574 * Modifications do not cause iterators to fail
575 */
576 public void testWeaklyConsistentIteration() {
577 final LinkedTransferQueue q = new LinkedTransferQueue();
578 q.add(one);
579 q.add(two);
580 q.add(three);
581 for (Iterator it = q.iterator(); it.hasNext();) {
582 q.remove();
583 it.next();
584 }
585 assertEquals(0, q.size());
586 }
587
588 /**
589 * toString contains toStrings of elements
590 */
591 public void testToString() {
592 LinkedTransferQueue q = populatedQueue(SIZE);
593 String s = q.toString();
594 for (int i = 0; i < SIZE; ++i) {
595 assertTrue(s.contains(String.valueOf(i)));
596 }
597 }
598
599 /**
600 * offer transfers elements across Executor tasks
601 */
602 public void testOfferInExecutor() {
603 final LinkedTransferQueue q = new LinkedTransferQueue();
604 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
605 ExecutorService executor = Executors.newFixedThreadPool(2);
606
607 executor.execute(new CheckedRunnable() {
608 public void realRun() throws InterruptedException {
609 threadsStarted.await();
610 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
611 }});
612
613 executor.execute(new CheckedRunnable() {
614 public void realRun() throws InterruptedException {
615 threadsStarted.await();
616 assertSame(one, q.take());
617 checkEmpty(q);
618 }});
619
620 joinPool(executor);
621 }
622
623 /**
624 * timed poll retrieves elements across Executor threads
625 */
626 public void testPollInExecutor() {
627 final LinkedTransferQueue q = new LinkedTransferQueue();
628 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
629 ExecutorService executor = Executors.newFixedThreadPool(2);
630
631 executor.execute(new CheckedRunnable() {
632 public void realRun() throws InterruptedException {
633 assertNull(q.poll());
634 threadsStarted.await();
635 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
636 checkEmpty(q);
637 }});
638
639 executor.execute(new CheckedRunnable() {
640 public void realRun() throws InterruptedException {
641 threadsStarted.await();
642 q.put(one);
643 }});
644
645 joinPool(executor);
646 }
647
648 /**
649 * A deserialized serialized queue has same elements in same order
650 */
651 public void testSerialization() throws Exception {
652 LinkedTransferQueue q = populatedQueue(SIZE);
653
654 ByteArrayOutputStream bout = new ByteArrayOutputStream(10000);
655 ObjectOutputStream out
656 = new ObjectOutputStream(new BufferedOutputStream(bout));
657 out.writeObject(q);
658 out.close();
659
660 ByteArrayInputStream bin
661 = new ByteArrayInputStream(bout.toByteArray());
662 ObjectInputStream in
663 = new ObjectInputStream(new BufferedInputStream(bin));
664 LinkedTransferQueue r = (LinkedTransferQueue) in.readObject();
665
666 assertEquals(q.size(), r.size());
667 assertEquals(q.toString(), r.toString());
668 assertTrue(Arrays.equals(q.toArray(), r.toArray()));
669 while (!q.isEmpty()) {
670 assertEquals(q.remove(), r.remove());
671 }
672 }
673
674 /**
675 * drainTo(c) empties queue into another collection c
676 */
677 public void testDrainTo() {
678 LinkedTransferQueue q = populatedQueue(SIZE);
679 ArrayList l = new ArrayList();
680 q.drainTo(l);
681 assertEquals(q.size(), 0);
682 assertEquals(l.size(), SIZE);
683 for (int i = 0; i < SIZE; ++i) {
684 assertEquals(l.get(i), i);
685 }
686 q.add(zero);
687 q.add(one);
688 assertFalse(q.isEmpty());
689 assertTrue(q.contains(zero));
690 assertTrue(q.contains(one));
691 l.clear();
692 q.drainTo(l);
693 assertEquals(q.size(), 0);
694 assertEquals(l.size(), 2);
695 for (int i = 0; i < 2; ++i) {
696 assertEquals(l.get(i), i);
697 }
698 }
699
700 /**
701 * drainTo(c) empties full queue, unblocking a waiting put.
702 */
703 public void testDrainToWithActivePut() throws InterruptedException {
704 final LinkedTransferQueue q = populatedQueue(SIZE);
705 Thread t = newStartedThread(new CheckedRunnable() {
706 public void realRun() {
707 q.put(SIZE + 1);
708 }});
709 ArrayList l = new ArrayList();
710 q.drainTo(l);
711 assertTrue(l.size() >= SIZE);
712 for (int i = 0; i < SIZE; ++i) {
713 assertEquals(l.get(i), i);
714 }
715 awaitTermination(t, MEDIUM_DELAY_MS);
716 assertTrue(q.size() + l.size() >= SIZE);
717 }
718
719 /**
720 * drainTo(c, n) empties first min(n, size) elements of queue into c
721 */
722 public void testDrainToN() {
723 LinkedTransferQueue q = new LinkedTransferQueue();
724 for (int i = 0; i < SIZE + 2; ++i) {
725 for (int j = 0; j < SIZE; j++) {
726 assertTrue(q.offer(j));
727 }
728 ArrayList l = new ArrayList();
729 q.drainTo(l, i);
730 int k = (i < SIZE) ? i : SIZE;
731 assertEquals(l.size(), k);
732 assertEquals(q.size(), SIZE - k);
733 for (int j = 0; j < k; ++j) {
734 assertEquals(l.get(j), j);
735 }
736 while (q.poll() != null)
737 ;
738 }
739 }
740
741 /**
742 * timed poll() or take() increments the waiting consumer count;
743 * offer(e) decrements the waiting consumer count
744 */
745 public void testWaitingConsumer() throws InterruptedException {
746 final LinkedTransferQueue q = new LinkedTransferQueue();
747 assertEquals(q.getWaitingConsumerCount(), 0);
748 assertFalse(q.hasWaitingConsumer());
749 final CountDownLatch threadStarted = new CountDownLatch(1);
750
751 Thread t = newStartedThread(new CheckedRunnable() {
752 public void realRun() throws InterruptedException {
753 threadStarted.countDown();
754 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
755 assertEquals(q.getWaitingConsumerCount(), 0);
756 assertFalse(q.hasWaitingConsumer());
757 }});
758
759 threadStarted.await();
760 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
761 assertEquals(q.getWaitingConsumerCount(), 1);
762 assertTrue(q.hasWaitingConsumer());
763
764 assertTrue(q.offer(one));
765 assertEquals(q.getWaitingConsumerCount(), 0);
766 assertFalse(q.hasWaitingConsumer());
767
768 awaitTermination(t, MEDIUM_DELAY_MS);
769 }
770
771 /**
772 * transfer(null) throws NullPointerException
773 */
774 public void testTransfer1() throws InterruptedException {
775 try {
776 LinkedTransferQueue q = new LinkedTransferQueue();
777 q.transfer(null);
778 shouldThrow();
779 } catch (NullPointerException success) {}
780 }
781
782 /**
783 * transfer waits until a poll occurs. The transfered element
784 * is returned by this associated poll.
785 */
786 public void testTransfer2() throws InterruptedException {
787 final LinkedTransferQueue<Integer> q
788 = new LinkedTransferQueue<Integer>();
789 final CountDownLatch threadStarted = new CountDownLatch(1);
790
791 Thread t = newStartedThread(new CheckedRunnable() {
792 public void realRun() throws InterruptedException {
793 threadStarted.countDown();
794 q.transfer(five);
795 checkEmpty(q);
796 }});
797
798 threadStarted.await();
799 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
800 assertEquals(1, q.size());
801 assertSame(five, q.poll());
802 checkEmpty(q);
803 awaitTermination(t, MEDIUM_DELAY_MS);
804 }
805
806 /**
807 * transfer waits until a poll occurs, and then transfers in fifo order
808 */
809 public void testTransfer3() throws InterruptedException {
810 final LinkedTransferQueue<Integer> q
811 = new LinkedTransferQueue<Integer>();
812
813 Thread first = newStartedThread(new CheckedRunnable() {
814 public void realRun() throws InterruptedException {
815 q.transfer(four);
816 assertTrue(!q.contains(four));
817 assertEquals(1, q.size());
818 }});
819
820 Thread interruptedThread = newStartedThread(
821 new CheckedInterruptedRunnable() {
822 public void realRun() throws InterruptedException {
823 while (q.isEmpty())
824 Thread.yield();
825 q.transfer(five);
826 }});
827
828 while (q.size() < 2)
829 Thread.yield();
830 assertEquals(2, q.size());
831 assertSame(four, q.poll());
832 first.join();
833 assertEquals(1, q.size());
834 interruptedThread.interrupt();
835 interruptedThread.join();
836 checkEmpty(q);
837 }
838
839 /**
840 * transfer waits until a poll occurs, at which point the polling
841 * thread returns the element
842 */
843 public void testTransfer4() throws InterruptedException {
844 final LinkedTransferQueue q = new LinkedTransferQueue();
845
846 Thread t = newStartedThread(new CheckedRunnable() {
847 public void realRun() throws InterruptedException {
848 q.transfer(four);
849 assertFalse(q.contains(four));
850 assertSame(three, q.poll());
851 }});
852
853 while (q.isEmpty())
854 Thread.yield();
855 assertFalse(q.isEmpty());
856 assertEquals(1, q.size());
857 assertTrue(q.offer(three));
858 assertSame(four, q.poll());
859 awaitTermination(t, MEDIUM_DELAY_MS);
860 }
861
862 /**
863 * transfer waits until a take occurs. The transfered element
864 * is returned by this associated take.
865 */
866 public void testTransfer5() throws InterruptedException {
867 final LinkedTransferQueue<Integer> q
868 = new LinkedTransferQueue<Integer>();
869
870 Thread t = newStartedThread(new CheckedRunnable() {
871 public void realRun() throws InterruptedException {
872 q.transfer(four);
873 checkEmpty(q);
874 }});
875
876 while (q.isEmpty())
877 Thread.yield();
878 assertFalse(q.isEmpty());
879 assertEquals(1, q.size());
880 assertSame(four, q.take());
881 checkEmpty(q);
882 awaitTermination(t, MEDIUM_DELAY_MS);
883 }
884
885 /**
886 * tryTransfer(null) throws NullPointerException
887 */
888 public void testTryTransfer1() {
889 try {
890 final LinkedTransferQueue q = new LinkedTransferQueue();
891 q.tryTransfer(null);
892 shouldThrow();
893 } catch (NullPointerException success) {}
894 }
895
896 /**
897 * tryTransfer returns false and does not enqueue if there are no
898 * consumers waiting to poll or take.
899 */
900 public void testTryTransfer2() throws InterruptedException {
901 final LinkedTransferQueue q = new LinkedTransferQueue();
902 assertFalse(q.tryTransfer(new Object()));
903 assertFalse(q.hasWaitingConsumer());
904 checkEmpty(q);
905 }
906
907 /**
908 * If there is a consumer waiting in timed poll, tryTransfer
909 * returns true while successfully transfering object.
910 */
911 public void testTryTransfer3() throws InterruptedException {
912 final Object hotPotato = new Object();
913 final LinkedTransferQueue q = new LinkedTransferQueue();
914
915 Thread t = newStartedThread(new CheckedRunnable() {
916 public void realRun() {
917 while (! q.hasWaitingConsumer())
918 Thread.yield();
919 assertTrue(q.hasWaitingConsumer());
920 checkEmpty(q);
921 assertTrue(q.tryTransfer(hotPotato));
922 }});
923
924 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
925 checkEmpty(q);
926 awaitTermination(t, MEDIUM_DELAY_MS);
927 }
928
929 /**
930 * If there is a consumer waiting in take, tryTransfer returns
931 * true while successfully transfering object.
932 */
933 public void testTryTransfer4() throws InterruptedException {
934 final Object hotPotato = new Object();
935 final LinkedTransferQueue q = new LinkedTransferQueue();
936
937 Thread t = newStartedThread(new CheckedRunnable() {
938 public void realRun() {
939 while (! q.hasWaitingConsumer())
940 Thread.yield();
941 assertTrue(q.hasWaitingConsumer());
942 checkEmpty(q);
943 assertTrue(q.tryTransfer(hotPotato));
944 }});
945
946 assertSame(q.take(), hotPotato);
947 checkEmpty(q);
948 awaitTermination(t, MEDIUM_DELAY_MS);
949 }
950
951 /**
952 * tryTransfer blocks interruptibly if no takers
953 */
954 public void testTryTransfer5() throws InterruptedException {
955 final LinkedTransferQueue q = new LinkedTransferQueue();
956 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
957 assertTrue(q.isEmpty());
958
959 Thread t = newStartedThread(new CheckedRunnable() {
960 public void realRun() throws InterruptedException {
961 Thread.currentThread().interrupt();
962 try {
963 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
964 shouldThrow();
965 } catch (InterruptedException success) {}
966 assertFalse(Thread.interrupted());
967
968 pleaseInterrupt.countDown();
969 try {
970 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
971 shouldThrow();
972 } catch (InterruptedException success) {}
973 assertFalse(Thread.interrupted());
974 }});
975
976 await(pleaseInterrupt);
977 assertThreadStaysAlive(t);
978 t.interrupt();
979 awaitTermination(t);
980 checkEmpty(q);
981 }
982
983 /**
984 * tryTransfer gives up after the timeout and returns false
985 */
986 public void testTryTransfer6() throws InterruptedException {
987 final LinkedTransferQueue q = new LinkedTransferQueue();
988
989 Thread t = newStartedThread(new CheckedRunnable() {
990 public void realRun() throws InterruptedException {
991 long t0 = System.nanoTime();
992 assertFalse(q.tryTransfer(new Object(),
993 timeoutMillis(), MILLISECONDS));
994 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
995 checkEmpty(q);
996 }});
997
998 awaitTermination(t);
999 checkEmpty(q);
1000 }
1001
1002 /**
1003 * tryTransfer waits for any elements previously in to be removed
1004 * before transfering to a poll or take
1005 */
1006 public void testTryTransfer7() throws InterruptedException {
1007 final LinkedTransferQueue q = new LinkedTransferQueue();
1008 assertTrue(q.offer(four));
1009
1010 Thread t = newStartedThread(new CheckedRunnable() {
1011 public void realRun() throws InterruptedException {
1012 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
1013 checkEmpty(q);
1014 }});
1015
1016 while (q.size() != 2)
1017 Thread.yield();
1018 assertEquals(2, q.size());
1019 assertSame(four, q.poll());
1020 assertSame(five, q.poll());
1021 checkEmpty(q);
1022 awaitTermination(t, MEDIUM_DELAY_MS);
1023 }
1024
1025 /**
1026 * tryTransfer attempts to enqueue into the queue and fails
1027 * returning false not enqueueing and the successive poll is null
1028 */
1029 public void testTryTransfer8() throws InterruptedException {
1030 final LinkedTransferQueue q = new LinkedTransferQueue();
1031 assertTrue(q.offer(four));
1032 assertEquals(1, q.size());
1033 long t0 = System.nanoTime();
1034 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1035 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
1036 assertEquals(1, q.size());
1037 assertSame(four, q.poll());
1038 assertNull(q.poll());
1039 checkEmpty(q);
1040 }
1041
1042 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1043 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1044 checkEmpty(q);
1045 for (int i = 0; i < n; i++) {
1046 assertEquals(i, q.size());
1047 assertTrue(q.offer(i));
1048 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1049 }
1050 assertFalse(q.isEmpty());
1051 return q;
1052 }
1053 }