ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.67
Committed: Sun Oct 16 20:16:36 2016 UTC (7 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.66: +7 -8 lines
Log Message:
extend CollectionImplementation mechanism to other collections

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.LinkedTransferQueue;
22
23 import junit.framework.Test;
24
25 @SuppressWarnings({"unchecked", "rawtypes"})
26 public class LinkedTransferQueueTest extends JSR166TestCase {
27 public static class Generic extends BlockingQueueTest {
28 protected BlockingQueue emptyCollection() {
29 return new LinkedTransferQueue();
30 }
31 }
32
33 public static void main(String[] args) {
34 main(suite(), args);
35 }
36
37 public static Test suite() {
38 class Implementation implements CollectionImplementation {
39 public Class<?> klazz() { return LinkedTransferQueue.class; }
40 public Collection emptyCollection() { return new LinkedTransferQueue(); }
41 public Object makeElement(int i) { return i; }
42 public boolean isConcurrent() { return true; }
43 public boolean permitsNulls() { return false; }
44 }
45 return newTestSuite(LinkedTransferQueueTest.class,
46 new Generic().testSuite(),
47 CollectionTest.testSuite(new Implementation()));
48 }
49
50 /**
51 * Constructor builds new queue with size being zero and empty
52 * being true
53 */
54 public void testConstructor1() {
55 assertEquals(0, new LinkedTransferQueue().size());
56 assertTrue(new LinkedTransferQueue().isEmpty());
57 }
58
59 /**
60 * Initializing constructor with null collection throws
61 * NullPointerException
62 */
63 public void testConstructor2() {
64 try {
65 new LinkedTransferQueue(null);
66 shouldThrow();
67 } catch (NullPointerException success) {}
68 }
69
70 /**
71 * Initializing from Collection of null elements throws
72 * NullPointerException
73 */
74 public void testConstructor3() {
75 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
76 try {
77 new LinkedTransferQueue(elements);
78 shouldThrow();
79 } catch (NullPointerException success) {}
80 }
81
82 /**
83 * Initializing constructor with a collection containing some null elements
84 * throws NullPointerException
85 */
86 public void testConstructor4() {
87 Integer[] ints = new Integer[SIZE];
88 for (int i = 0; i < SIZE - 1; ++i)
89 ints[i] = i;
90 Collection<Integer> elements = Arrays.asList(ints);
91 try {
92 new LinkedTransferQueue(elements);
93 shouldThrow();
94 } catch (NullPointerException success) {}
95 }
96
97 /**
98 * Queue contains all elements of the collection it is initialized by
99 */
100 public void testConstructor5() {
101 Integer[] ints = new Integer[SIZE];
102 for (int i = 0; i < SIZE; ++i) {
103 ints[i] = i;
104 }
105 List intList = Arrays.asList(ints);
106 LinkedTransferQueue q
107 = new LinkedTransferQueue(intList);
108 assertEquals(q.size(), intList.size());
109 assertEquals(q.toString(), intList.toString());
110 assertTrue(Arrays.equals(q.toArray(),
111 intList.toArray()));
112 assertTrue(Arrays.equals(q.toArray(new Object[0]),
113 intList.toArray(new Object[0])));
114 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
115 intList.toArray(new Object[SIZE])));
116 for (int i = 0; i < SIZE; ++i) {
117 assertEquals(ints[i], q.poll());
118 }
119 }
120
121 /**
122 * remainingCapacity() always returns Integer.MAX_VALUE
123 */
124 public void testRemainingCapacity() {
125 BlockingQueue q = populatedQueue(SIZE);
126 for (int i = 0; i < SIZE; ++i) {
127 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
128 assertEquals(SIZE - i, q.size());
129 assertEquals(i, q.remove());
130 }
131 for (int i = 0; i < SIZE; ++i) {
132 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
133 assertEquals(i, q.size());
134 assertTrue(q.add(i));
135 }
136 }
137
138 /**
139 * addAll(this) throws IllegalArgumentException
140 */
141 public void testAddAllSelf() {
142 LinkedTransferQueue q = populatedQueue(SIZE);
143 try {
144 q.addAll(q);
145 shouldThrow();
146 } catch (IllegalArgumentException success) {}
147 }
148
149 /**
150 * addAll of a collection with any null elements throws
151 * NullPointerException after possibly adding some elements
152 */
153 public void testAddAll3() {
154 LinkedTransferQueue q = new LinkedTransferQueue();
155 Integer[] ints = new Integer[SIZE];
156 for (int i = 0; i < SIZE - 1; ++i)
157 ints[i] = i;
158 try {
159 q.addAll(Arrays.asList(ints));
160 shouldThrow();
161 } catch (NullPointerException success) {}
162 }
163
164 /**
165 * Queue contains all elements, in traversal order, of successful addAll
166 */
167 public void testAddAll5() {
168 Integer[] empty = new Integer[0];
169 Integer[] ints = new Integer[SIZE];
170 for (int i = 0; i < SIZE; ++i) {
171 ints[i] = i;
172 }
173 LinkedTransferQueue q = new LinkedTransferQueue();
174 assertFalse(q.addAll(Arrays.asList(empty)));
175 assertTrue(q.addAll(Arrays.asList(ints)));
176 for (int i = 0; i < SIZE; ++i) {
177 assertEquals(ints[i], q.poll());
178 }
179 }
180
181 /**
182 * all elements successfully put are contained
183 */
184 public void testPut() {
185 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
186 for (int i = 0; i < SIZE; ++i) {
187 assertEquals(i, q.size());
188 q.put(i);
189 assertTrue(q.contains(i));
190 }
191 }
192
193 /**
194 * take retrieves elements in FIFO order
195 */
196 public void testTake() throws InterruptedException {
197 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
198 for (int i = 0; i < SIZE; ++i) {
199 assertEquals(i, (int) q.take());
200 }
201 }
202
203 /**
204 * take removes existing elements until empty, then blocks interruptibly
205 */
206 public void testBlockingTake() throws InterruptedException {
207 final BlockingQueue q = populatedQueue(SIZE);
208 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
209 Thread t = newStartedThread(new CheckedRunnable() {
210 public void realRun() throws InterruptedException {
211 for (int i = 0; i < SIZE; ++i) {
212 assertEquals(i, q.take());
213 }
214
215 Thread.currentThread().interrupt();
216 try {
217 q.take();
218 shouldThrow();
219 } catch (InterruptedException success) {}
220 assertFalse(Thread.interrupted());
221
222 pleaseInterrupt.countDown();
223 try {
224 q.take();
225 shouldThrow();
226 } catch (InterruptedException success) {}
227 assertFalse(Thread.interrupted());
228 }});
229
230 await(pleaseInterrupt);
231 assertThreadStaysAlive(t);
232 t.interrupt();
233 awaitTermination(t);
234 }
235
236 /**
237 * poll succeeds unless empty
238 */
239 public void testPoll() throws InterruptedException {
240 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
241 for (int i = 0; i < SIZE; ++i) {
242 assertEquals(i, (int) q.poll());
243 }
244 assertNull(q.poll());
245 checkEmpty(q);
246 }
247
248 /**
249 * timed poll with zero timeout succeeds when non-empty, else times out
250 */
251 public void testTimedPoll0() throws InterruptedException {
252 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
253 for (int i = 0; i < SIZE; ++i) {
254 assertEquals(i, (int) q.poll(0, MILLISECONDS));
255 }
256 assertNull(q.poll(0, MILLISECONDS));
257 checkEmpty(q);
258 }
259
260 /**
261 * timed poll with nonzero timeout succeeds when non-empty, else times out
262 */
263 public void testTimedPoll() throws InterruptedException {
264 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
265 long startTime = System.nanoTime();
266 for (int i = 0; i < SIZE; ++i)
267 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
268 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
269
270 startTime = System.nanoTime();
271 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
272 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
273 checkEmpty(q);
274 }
275
276 /**
277 * Interrupted timed poll throws InterruptedException instead of
278 * returning timeout status
279 */
280 public void testInterruptedTimedPoll() throws InterruptedException {
281 final BlockingQueue<Integer> q = populatedQueue(SIZE);
282 final CountDownLatch aboutToWait = new CountDownLatch(1);
283 Thread t = newStartedThread(new CheckedRunnable() {
284 public void realRun() throws InterruptedException {
285 long startTime = System.nanoTime();
286 for (int i = 0; i < SIZE; ++i)
287 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
288 aboutToWait.countDown();
289 try {
290 q.poll(LONG_DELAY_MS, MILLISECONDS);
291 shouldThrow();
292 } catch (InterruptedException success) {}
293 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
294 }});
295
296 aboutToWait.await();
297 waitForThreadToEnterWaitState(t);
298 t.interrupt();
299 awaitTermination(t);
300 checkEmpty(q);
301 }
302
303 /**
304 * timed poll after thread interrupted throws InterruptedException
305 * instead of returning timeout status
306 */
307 public void testTimedPollAfterInterrupt() throws InterruptedException {
308 final BlockingQueue<Integer> q = populatedQueue(SIZE);
309 Thread t = newStartedThread(new CheckedRunnable() {
310 public void realRun() throws InterruptedException {
311 long startTime = System.nanoTime();
312 Thread.currentThread().interrupt();
313 for (int i = 0; i < SIZE; ++i)
314 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
315 try {
316 q.poll(LONG_DELAY_MS, MILLISECONDS);
317 shouldThrow();
318 } catch (InterruptedException success) {}
319 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
320 }});
321
322 awaitTermination(t);
323 checkEmpty(q);
324 }
325
326 /**
327 * peek returns next element, or null if empty
328 */
329 public void testPeek() throws InterruptedException {
330 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
331 for (int i = 0; i < SIZE; ++i) {
332 assertEquals(i, (int) q.peek());
333 assertEquals(i, (int) q.poll());
334 assertTrue(q.peek() == null ||
335 i != (int) q.peek());
336 }
337 assertNull(q.peek());
338 checkEmpty(q);
339 }
340
341 /**
342 * element returns next element, or throws NoSuchElementException if empty
343 */
344 public void testElement() throws InterruptedException {
345 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
346 for (int i = 0; i < SIZE; ++i) {
347 assertEquals(i, (int) q.element());
348 assertEquals(i, (int) q.poll());
349 }
350 try {
351 q.element();
352 shouldThrow();
353 } catch (NoSuchElementException success) {}
354 checkEmpty(q);
355 }
356
357 /**
358 * remove removes next element, or throws NoSuchElementException if empty
359 */
360 public void testRemove() throws InterruptedException {
361 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
362 for (int i = 0; i < SIZE; ++i) {
363 assertEquals(i, (int) q.remove());
364 }
365 try {
366 q.remove();
367 shouldThrow();
368 } catch (NoSuchElementException success) {}
369 checkEmpty(q);
370 }
371
372 /**
373 * An add following remove(x) succeeds
374 */
375 public void testRemoveElementAndAdd() throws InterruptedException {
376 LinkedTransferQueue q = new LinkedTransferQueue();
377 assertTrue(q.add(one));
378 assertTrue(q.add(two));
379 assertTrue(q.remove(one));
380 assertTrue(q.remove(two));
381 assertTrue(q.add(three));
382 assertSame(q.take(), three);
383 }
384
385 /**
386 * contains(x) reports true when elements added but not yet removed
387 */
388 public void testContains() {
389 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
390 for (int i = 0; i < SIZE; ++i) {
391 assertTrue(q.contains(i));
392 assertEquals(i, (int) q.poll());
393 assertFalse(q.contains(i));
394 }
395 }
396
397 /**
398 * clear removes all elements
399 */
400 public void testClear() throws InterruptedException {
401 LinkedTransferQueue q = populatedQueue(SIZE);
402 q.clear();
403 checkEmpty(q);
404 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
405 q.add(one);
406 assertFalse(q.isEmpty());
407 assertEquals(1, q.size());
408 assertTrue(q.contains(one));
409 q.clear();
410 checkEmpty(q);
411 }
412
413 /**
414 * containsAll(c) is true when c contains a subset of elements
415 */
416 public void testContainsAll() {
417 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
418 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
419 for (int i = 0; i < SIZE; ++i) {
420 assertTrue(q.containsAll(p));
421 assertFalse(p.containsAll(q));
422 p.add(i);
423 }
424 assertTrue(p.containsAll(q));
425 }
426
427 /**
428 * retainAll(c) retains only those elements of c and reports true
429 * if changed
430 */
431 public void testRetainAll() {
432 LinkedTransferQueue q = populatedQueue(SIZE);
433 LinkedTransferQueue p = populatedQueue(SIZE);
434 for (int i = 0; i < SIZE; ++i) {
435 boolean changed = q.retainAll(p);
436 if (i == 0) {
437 assertFalse(changed);
438 } else {
439 assertTrue(changed);
440 }
441 assertTrue(q.containsAll(p));
442 assertEquals(SIZE - i, q.size());
443 p.remove();
444 }
445 }
446
447 /**
448 * removeAll(c) removes only those elements of c and reports true
449 * if changed
450 */
451 public void testRemoveAll() {
452 for (int i = 1; i < SIZE; ++i) {
453 LinkedTransferQueue q = populatedQueue(SIZE);
454 LinkedTransferQueue p = populatedQueue(i);
455 assertTrue(q.removeAll(p));
456 assertEquals(SIZE - i, q.size());
457 for (int j = 0; j < i; ++j) {
458 assertFalse(q.contains(p.remove()));
459 }
460 }
461 }
462
463 /**
464 * toArray() contains all elements in FIFO order
465 */
466 public void testToArray() {
467 LinkedTransferQueue q = populatedQueue(SIZE);
468 Object[] o = q.toArray();
469 for (int i = 0; i < o.length; i++) {
470 assertSame(o[i], q.poll());
471 }
472 }
473
474 /**
475 * toArray(a) contains all elements in FIFO order
476 */
477 public void testToArray2() {
478 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
479 Integer[] ints = new Integer[SIZE];
480 Integer[] array = q.toArray(ints);
481 assertSame(ints, array);
482 for (int i = 0; i < ints.length; i++) {
483 assertSame(ints[i], q.poll());
484 }
485 }
486
487 /**
488 * toArray(incompatible array type) throws ArrayStoreException
489 */
490 public void testToArray1_BadArg() {
491 LinkedTransferQueue q = populatedQueue(SIZE);
492 try {
493 q.toArray(new String[10]);
494 shouldThrow();
495 } catch (ArrayStoreException success) {}
496 }
497
498 /**
499 * iterator iterates through all elements
500 */
501 public void testIterator() throws InterruptedException {
502 LinkedTransferQueue q = populatedQueue(SIZE);
503 Iterator it = q.iterator();
504 int i;
505 for (i = 0; it.hasNext(); i++)
506 assertTrue(q.contains(it.next()));
507 assertEquals(i, SIZE);
508 assertIteratorExhausted(it);
509
510 it = q.iterator();
511 for (i = 0; it.hasNext(); i++)
512 assertEquals(it.next(), q.take());
513 assertEquals(i, SIZE);
514 assertIteratorExhausted(it);
515 }
516
517 /**
518 * iterator of empty collection has no elements
519 */
520 public void testEmptyIterator() {
521 assertIteratorExhausted(new LinkedTransferQueue().iterator());
522 }
523
524 /**
525 * iterator.remove() removes current element
526 */
527 public void testIteratorRemove() {
528 final LinkedTransferQueue q = new LinkedTransferQueue();
529 q.add(two);
530 q.add(one);
531 q.add(three);
532
533 Iterator it = q.iterator();
534 it.next();
535 it.remove();
536
537 it = q.iterator();
538 assertSame(it.next(), one);
539 assertSame(it.next(), three);
540 assertFalse(it.hasNext());
541 }
542
543 /**
544 * iterator ordering is FIFO
545 */
546 public void testIteratorOrdering() {
547 final LinkedTransferQueue<Integer> q
548 = new LinkedTransferQueue<Integer>();
549 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
550 q.add(one);
551 q.add(two);
552 q.add(three);
553 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
554 int k = 0;
555 for (Integer n : q) {
556 assertEquals(++k, (int) n);
557 }
558 assertEquals(3, k);
559 }
560
561 /**
562 * Modifications do not cause iterators to fail
563 */
564 public void testWeaklyConsistentIteration() {
565 final LinkedTransferQueue q = new LinkedTransferQueue();
566 q.add(one);
567 q.add(two);
568 q.add(three);
569 for (Iterator it = q.iterator(); it.hasNext();) {
570 q.remove();
571 it.next();
572 }
573 assertEquals(0, q.size());
574 }
575
576 /**
577 * toString contains toStrings of elements
578 */
579 public void testToString() {
580 LinkedTransferQueue q = populatedQueue(SIZE);
581 String s = q.toString();
582 for (int i = 0; i < SIZE; ++i) {
583 assertTrue(s.contains(String.valueOf(i)));
584 }
585 }
586
587 /**
588 * offer transfers elements across Executor tasks
589 */
590 public void testOfferInExecutor() {
591 final LinkedTransferQueue q = new LinkedTransferQueue();
592 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
593 final ExecutorService executor = Executors.newFixedThreadPool(2);
594 try (PoolCleaner cleaner = cleaner(executor)) {
595
596 executor.execute(new CheckedRunnable() {
597 public void realRun() throws InterruptedException {
598 threadsStarted.await();
599 long startTime = System.nanoTime();
600 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
601 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
602 }});
603
604 executor.execute(new CheckedRunnable() {
605 public void realRun() throws InterruptedException {
606 threadsStarted.await();
607 assertSame(one, q.take());
608 checkEmpty(q);
609 }});
610 }
611 }
612
613 /**
614 * timed poll retrieves elements across Executor threads
615 */
616 public void testPollInExecutor() {
617 final LinkedTransferQueue q = new LinkedTransferQueue();
618 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
619 final ExecutorService executor = Executors.newFixedThreadPool(2);
620 try (PoolCleaner cleaner = cleaner(executor)) {
621
622 executor.execute(new CheckedRunnable() {
623 public void realRun() throws InterruptedException {
624 assertNull(q.poll());
625 threadsStarted.await();
626 long startTime = System.nanoTime();
627 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
628 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
629 checkEmpty(q);
630 }});
631
632 executor.execute(new CheckedRunnable() {
633 public void realRun() throws InterruptedException {
634 threadsStarted.await();
635 q.put(one);
636 }});
637 }
638 }
639
640 /**
641 * A deserialized serialized queue has same elements in same order
642 */
643 public void testSerialization() throws Exception {
644 Queue x = populatedQueue(SIZE);
645 Queue y = serialClone(x);
646
647 assertNotSame(y, x);
648 assertEquals(x.size(), y.size());
649 assertEquals(x.toString(), y.toString());
650 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
651 while (!x.isEmpty()) {
652 assertFalse(y.isEmpty());
653 assertEquals(x.remove(), y.remove());
654 }
655 assertTrue(y.isEmpty());
656 }
657
658 /**
659 * drainTo(c) empties queue into another collection c
660 */
661 public void testDrainTo() {
662 LinkedTransferQueue q = populatedQueue(SIZE);
663 ArrayList l = new ArrayList();
664 q.drainTo(l);
665 assertEquals(0, q.size());
666 assertEquals(SIZE, l.size());
667 for (int i = 0; i < SIZE; ++i) {
668 assertEquals(i, l.get(i));
669 }
670 q.add(zero);
671 q.add(one);
672 assertFalse(q.isEmpty());
673 assertTrue(q.contains(zero));
674 assertTrue(q.contains(one));
675 l.clear();
676 q.drainTo(l);
677 assertEquals(0, q.size());
678 assertEquals(2, l.size());
679 for (int i = 0; i < 2; ++i) {
680 assertEquals(i, l.get(i));
681 }
682 }
683
684 /**
685 * drainTo(c) empties full queue, unblocking a waiting put.
686 */
687 public void testDrainToWithActivePut() throws InterruptedException {
688 final LinkedTransferQueue q = populatedQueue(SIZE);
689 Thread t = newStartedThread(new CheckedRunnable() {
690 public void realRun() {
691 q.put(SIZE + 1);
692 }});
693 ArrayList l = new ArrayList();
694 q.drainTo(l);
695 assertTrue(l.size() >= SIZE);
696 for (int i = 0; i < SIZE; ++i)
697 assertEquals(i, l.get(i));
698 awaitTermination(t);
699 assertTrue(q.size() + l.size() >= SIZE);
700 }
701
702 /**
703 * drainTo(c, n) empties first min(n, size) elements of queue into c
704 */
705 public void testDrainToN() {
706 LinkedTransferQueue q = new LinkedTransferQueue();
707 for (int i = 0; i < SIZE + 2; ++i) {
708 for (int j = 0; j < SIZE; j++) {
709 assertTrue(q.offer(j));
710 }
711 ArrayList l = new ArrayList();
712 q.drainTo(l, i);
713 int k = (i < SIZE) ? i : SIZE;
714 assertEquals(k, l.size());
715 assertEquals(SIZE - k, q.size());
716 for (int j = 0; j < k; ++j)
717 assertEquals(j, l.get(j));
718 do {} while (q.poll() != null);
719 }
720 }
721
722 /**
723 * timed poll() or take() increments the waiting consumer count;
724 * offer(e) decrements the waiting consumer count
725 */
726 public void testWaitingConsumer() throws InterruptedException {
727 final LinkedTransferQueue q = new LinkedTransferQueue();
728 assertEquals(0, q.getWaitingConsumerCount());
729 assertFalse(q.hasWaitingConsumer());
730 final CountDownLatch threadStarted = new CountDownLatch(1);
731
732 Thread t = newStartedThread(new CheckedRunnable() {
733 public void realRun() throws InterruptedException {
734 threadStarted.countDown();
735 long startTime = System.nanoTime();
736 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
737 assertEquals(0, q.getWaitingConsumerCount());
738 assertFalse(q.hasWaitingConsumer());
739 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
740 }});
741
742 threadStarted.await();
743 waitForThreadToEnterWaitState(t);
744 assertEquals(1, q.getWaitingConsumerCount());
745 assertTrue(q.hasWaitingConsumer());
746
747 assertTrue(q.offer(one));
748 assertEquals(0, q.getWaitingConsumerCount());
749 assertFalse(q.hasWaitingConsumer());
750
751 awaitTermination(t);
752 }
753
754 /**
755 * transfer(null) throws NullPointerException
756 */
757 public void testTransfer1() throws InterruptedException {
758 try {
759 LinkedTransferQueue q = new LinkedTransferQueue();
760 q.transfer(null);
761 shouldThrow();
762 } catch (NullPointerException success) {}
763 }
764
765 /**
766 * transfer waits until a poll occurs. The transfered element
767 * is returned by this associated poll.
768 */
769 public void testTransfer2() throws InterruptedException {
770 final LinkedTransferQueue<Integer> q
771 = new LinkedTransferQueue<Integer>();
772 final CountDownLatch threadStarted = new CountDownLatch(1);
773
774 Thread t = newStartedThread(new CheckedRunnable() {
775 public void realRun() throws InterruptedException {
776 threadStarted.countDown();
777 q.transfer(five);
778 checkEmpty(q);
779 }});
780
781 threadStarted.await();
782 waitForThreadToEnterWaitState(t);
783 assertEquals(1, q.size());
784 assertSame(five, q.poll());
785 checkEmpty(q);
786 awaitTermination(t);
787 }
788
789 /**
790 * transfer waits until a poll occurs, and then transfers in fifo order
791 */
792 public void testTransfer3() throws InterruptedException {
793 final LinkedTransferQueue<Integer> q
794 = new LinkedTransferQueue<Integer>();
795
796 Thread first = newStartedThread(new CheckedRunnable() {
797 public void realRun() throws InterruptedException {
798 q.transfer(four);
799 assertTrue(!q.contains(four));
800 assertEquals(1, q.size());
801 }});
802
803 Thread interruptedThread = newStartedThread(
804 new CheckedInterruptedRunnable() {
805 public void realRun() throws InterruptedException {
806 while (q.isEmpty())
807 Thread.yield();
808 q.transfer(five);
809 }});
810
811 while (q.size() < 2)
812 Thread.yield();
813 assertEquals(2, q.size());
814 assertSame(four, q.poll());
815 first.join();
816 assertEquals(1, q.size());
817 interruptedThread.interrupt();
818 interruptedThread.join();
819 checkEmpty(q);
820 }
821
822 /**
823 * transfer waits until a poll occurs, at which point the polling
824 * thread returns the element
825 */
826 public void testTransfer4() throws InterruptedException {
827 final LinkedTransferQueue q = new LinkedTransferQueue();
828
829 Thread t = newStartedThread(new CheckedRunnable() {
830 public void realRun() throws InterruptedException {
831 q.transfer(four);
832 assertFalse(q.contains(four));
833 assertSame(three, q.poll());
834 }});
835
836 while (q.isEmpty())
837 Thread.yield();
838 assertFalse(q.isEmpty());
839 assertEquals(1, q.size());
840 assertTrue(q.offer(three));
841 assertSame(four, q.poll());
842 awaitTermination(t);
843 }
844
845 /**
846 * transfer waits until a take occurs. The transfered element
847 * is returned by this associated take.
848 */
849 public void testTransfer5() throws InterruptedException {
850 final LinkedTransferQueue<Integer> q
851 = new LinkedTransferQueue<Integer>();
852
853 Thread t = newStartedThread(new CheckedRunnable() {
854 public void realRun() throws InterruptedException {
855 q.transfer(four);
856 checkEmpty(q);
857 }});
858
859 while (q.isEmpty())
860 Thread.yield();
861 assertFalse(q.isEmpty());
862 assertEquals(1, q.size());
863 assertSame(four, q.take());
864 checkEmpty(q);
865 awaitTermination(t);
866 }
867
868 /**
869 * tryTransfer(null) throws NullPointerException
870 */
871 public void testTryTransfer1() {
872 final LinkedTransferQueue q = new LinkedTransferQueue();
873 try {
874 q.tryTransfer(null);
875 shouldThrow();
876 } catch (NullPointerException success) {}
877 }
878
879 /**
880 * tryTransfer returns false and does not enqueue if there are no
881 * consumers waiting to poll or take.
882 */
883 public void testTryTransfer2() throws InterruptedException {
884 final LinkedTransferQueue q = new LinkedTransferQueue();
885 assertFalse(q.tryTransfer(new Object()));
886 assertFalse(q.hasWaitingConsumer());
887 checkEmpty(q);
888 }
889
890 /**
891 * If there is a consumer waiting in timed poll, tryTransfer
892 * returns true while successfully transfering object.
893 */
894 public void testTryTransfer3() throws InterruptedException {
895 final Object hotPotato = new Object();
896 final LinkedTransferQueue q = new LinkedTransferQueue();
897
898 Thread t = newStartedThread(new CheckedRunnable() {
899 public void realRun() {
900 while (! q.hasWaitingConsumer())
901 Thread.yield();
902 assertTrue(q.hasWaitingConsumer());
903 checkEmpty(q);
904 assertTrue(q.tryTransfer(hotPotato));
905 }});
906
907 long startTime = System.nanoTime();
908 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
909 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
910 checkEmpty(q);
911 awaitTermination(t);
912 }
913
914 /**
915 * If there is a consumer waiting in take, tryTransfer returns
916 * true while successfully transfering object.
917 */
918 public void testTryTransfer4() throws InterruptedException {
919 final Object hotPotato = new Object();
920 final LinkedTransferQueue q = new LinkedTransferQueue();
921
922 Thread t = newStartedThread(new CheckedRunnable() {
923 public void realRun() {
924 while (! q.hasWaitingConsumer())
925 Thread.yield();
926 assertTrue(q.hasWaitingConsumer());
927 checkEmpty(q);
928 assertTrue(q.tryTransfer(hotPotato));
929 }});
930
931 assertSame(q.take(), hotPotato);
932 checkEmpty(q);
933 awaitTermination(t);
934 }
935
936 /**
937 * tryTransfer blocks interruptibly if no takers
938 */
939 public void testTryTransfer5() throws InterruptedException {
940 final LinkedTransferQueue q = new LinkedTransferQueue();
941 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
942 assertTrue(q.isEmpty());
943
944 Thread t = newStartedThread(new CheckedRunnable() {
945 public void realRun() throws InterruptedException {
946 long startTime = System.nanoTime();
947 Thread.currentThread().interrupt();
948 try {
949 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
950 shouldThrow();
951 } catch (InterruptedException success) {}
952 assertFalse(Thread.interrupted());
953
954 pleaseInterrupt.countDown();
955 try {
956 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
957 shouldThrow();
958 } catch (InterruptedException success) {}
959 assertFalse(Thread.interrupted());
960 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
961 }});
962
963 await(pleaseInterrupt);
964 assertThreadStaysAlive(t);
965 t.interrupt();
966 awaitTermination(t);
967 checkEmpty(q);
968 }
969
970 /**
971 * tryTransfer gives up after the timeout and returns false
972 */
973 public void testTryTransfer6() throws InterruptedException {
974 final LinkedTransferQueue q = new LinkedTransferQueue();
975
976 Thread t = newStartedThread(new CheckedRunnable() {
977 public void realRun() throws InterruptedException {
978 long startTime = System.nanoTime();
979 assertFalse(q.tryTransfer(new Object(),
980 timeoutMillis(), MILLISECONDS));
981 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
982 checkEmpty(q);
983 }});
984
985 awaitTermination(t);
986 checkEmpty(q);
987 }
988
989 /**
990 * tryTransfer waits for any elements previously in to be removed
991 * before transfering to a poll or take
992 */
993 public void testTryTransfer7() throws InterruptedException {
994 final LinkedTransferQueue q = new LinkedTransferQueue();
995 assertTrue(q.offer(four));
996
997 Thread t = newStartedThread(new CheckedRunnable() {
998 public void realRun() throws InterruptedException {
999 long startTime = System.nanoTime();
1000 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1001 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1002 checkEmpty(q);
1003 }});
1004
1005 while (q.size() != 2)
1006 Thread.yield();
1007 assertEquals(2, q.size());
1008 assertSame(four, q.poll());
1009 assertSame(five, q.poll());
1010 checkEmpty(q);
1011 awaitTermination(t);
1012 }
1013
1014 /**
1015 * tryTransfer attempts to enqueue into the queue and fails
1016 * returning false not enqueueing and the successive poll is null
1017 */
1018 public void testTryTransfer8() throws InterruptedException {
1019 final LinkedTransferQueue q = new LinkedTransferQueue();
1020 assertTrue(q.offer(four));
1021 assertEquals(1, q.size());
1022 long startTime = System.nanoTime();
1023 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1024 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1025 assertEquals(1, q.size());
1026 assertSame(four, q.poll());
1027 assertNull(q.poll());
1028 checkEmpty(q);
1029 }
1030
1031 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1032 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1033 checkEmpty(q);
1034 for (int i = 0; i < n; i++) {
1035 assertEquals(i, q.size());
1036 assertTrue(q.offer(i));
1037 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1038 }
1039 assertFalse(q.isEmpty());
1040 return q;
1041 }
1042
1043 /**
1044 * remove(null), contains(null) always return false
1045 */
1046 public void testNeverContainsNull() {
1047 Collection<?>[] qs = {
1048 new LinkedTransferQueue<Object>(),
1049 populatedQueue(2),
1050 };
1051
1052 for (Collection<?> q : qs) {
1053 assertFalse(q.contains(null));
1054 assertFalse(q.remove(null));
1055 }
1056 }
1057 }