ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.64
Committed: Tue Oct 6 00:03:55 2015 UTC (8 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.63: +5 -7 lines
Log Message:
improve testInterruptedTimedPoll

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.LinkedTransferQueue;
22
23 import junit.framework.Test;
24
25 @SuppressWarnings({"unchecked", "rawtypes"})
26 public class LinkedTransferQueueTest extends JSR166TestCase {
27 static class Implementation implements CollectionImplementation {
28 public Class<?> klazz() { return LinkedTransferQueue.class; }
29 public Collection emptyCollection() { return new LinkedTransferQueue(); }
30 public Object makeElement(int i) { return i; }
31 public boolean isConcurrent() { return true; }
32 public boolean permitsNulls() { return false; }
33 }
34
35 public static class Generic extends BlockingQueueTest {
36 protected BlockingQueue emptyCollection() {
37 return new LinkedTransferQueue();
38 }
39 }
40
41 public static void main(String[] args) {
42 main(suite(), args);
43 }
44
45 public static Test suite() {
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 assertEquals(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Integer[] ints = new Integer[SIZE];
89 for (int i = 0; i < SIZE - 1; ++i)
90 ints[i] = i;
91 Collection<Integer> elements = Arrays.asList(ints);
92 try {
93 new LinkedTransferQueue(elements);
94 shouldThrow();
95 } catch (NullPointerException success) {}
96 }
97
98 /**
99 * Queue contains all elements of the collection it is initialized by
100 */
101 public void testConstructor5() {
102 Integer[] ints = new Integer[SIZE];
103 for (int i = 0; i < SIZE; ++i) {
104 ints[i] = i;
105 }
106 List intList = Arrays.asList(ints);
107 LinkedTransferQueue q
108 = new LinkedTransferQueue(intList);
109 assertEquals(q.size(), intList.size());
110 assertEquals(q.toString(), intList.toString());
111 assertTrue(Arrays.equals(q.toArray(),
112 intList.toArray()));
113 assertTrue(Arrays.equals(q.toArray(new Object[0]),
114 intList.toArray(new Object[0])));
115 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
116 intList.toArray(new Object[SIZE])));
117 for (int i = 0; i < SIZE; ++i) {
118 assertEquals(ints[i], q.poll());
119 }
120 }
121
122 /**
123 * remainingCapacity() always returns Integer.MAX_VALUE
124 */
125 public void testRemainingCapacity() {
126 BlockingQueue q = populatedQueue(SIZE);
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
129 assertEquals(SIZE - i, q.size());
130 assertEquals(i, q.remove());
131 }
132 for (int i = 0; i < SIZE; ++i) {
133 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
134 assertEquals(i, q.size());
135 assertTrue(q.add(i));
136 }
137 }
138
139 /**
140 * addAll(this) throws IllegalArgumentException
141 */
142 public void testAddAllSelf() {
143 LinkedTransferQueue q = populatedQueue(SIZE);
144 try {
145 q.addAll(q);
146 shouldThrow();
147 } catch (IllegalArgumentException success) {}
148 }
149
150 /**
151 * addAll of a collection with any null elements throws
152 * NullPointerException after possibly adding some elements
153 */
154 public void testAddAll3() {
155 LinkedTransferQueue q = new LinkedTransferQueue();
156 Integer[] ints = new Integer[SIZE];
157 for (int i = 0; i < SIZE - 1; ++i)
158 ints[i] = i;
159 try {
160 q.addAll(Arrays.asList(ints));
161 shouldThrow();
162 } catch (NullPointerException success) {}
163 }
164
165 /**
166 * Queue contains all elements, in traversal order, of successful addAll
167 */
168 public void testAddAll5() {
169 Integer[] empty = new Integer[0];
170 Integer[] ints = new Integer[SIZE];
171 for (int i = 0; i < SIZE; ++i) {
172 ints[i] = i;
173 }
174 LinkedTransferQueue q = new LinkedTransferQueue();
175 assertFalse(q.addAll(Arrays.asList(empty)));
176 assertTrue(q.addAll(Arrays.asList(ints)));
177 for (int i = 0; i < SIZE; ++i) {
178 assertEquals(ints[i], q.poll());
179 }
180 }
181
182 /**
183 * all elements successfully put are contained
184 */
185 public void testPut() {
186 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(i, q.size());
189 q.put(i);
190 assertTrue(q.contains(i));
191 }
192 }
193
194 /**
195 * take retrieves elements in FIFO order
196 */
197 public void testTake() throws InterruptedException {
198 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
199 for (int i = 0; i < SIZE; ++i) {
200 assertEquals(i, (int) q.take());
201 }
202 }
203
204 /**
205 * take removes existing elements until empty, then blocks interruptibly
206 */
207 public void testBlockingTake() throws InterruptedException {
208 final BlockingQueue q = populatedQueue(SIZE);
209 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
210 Thread t = newStartedThread(new CheckedRunnable() {
211 public void realRun() throws InterruptedException {
212 for (int i = 0; i < SIZE; ++i) {
213 assertEquals(i, q.take());
214 }
215
216 Thread.currentThread().interrupt();
217 try {
218 q.take();
219 shouldThrow();
220 } catch (InterruptedException success) {}
221 assertFalse(Thread.interrupted());
222
223 pleaseInterrupt.countDown();
224 try {
225 q.take();
226 shouldThrow();
227 } catch (InterruptedException success) {}
228 assertFalse(Thread.interrupted());
229 }});
230
231 await(pleaseInterrupt);
232 assertThreadStaysAlive(t);
233 t.interrupt();
234 awaitTermination(t);
235 }
236
237 /**
238 * poll succeeds unless empty
239 */
240 public void testPoll() throws InterruptedException {
241 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
242 for (int i = 0; i < SIZE; ++i) {
243 assertEquals(i, (int) q.poll());
244 }
245 assertNull(q.poll());
246 checkEmpty(q);
247 }
248
249 /**
250 * timed poll with zero timeout succeeds when non-empty, else times out
251 */
252 public void testTimedPoll0() throws InterruptedException {
253 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
254 for (int i = 0; i < SIZE; ++i) {
255 assertEquals(i, (int) q.poll(0, MILLISECONDS));
256 }
257 assertNull(q.poll(0, MILLISECONDS));
258 checkEmpty(q);
259 }
260
261 /**
262 * timed poll with nonzero timeout succeeds when non-empty, else times out
263 */
264 public void testTimedPoll() throws InterruptedException {
265 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
266 for (int i = 0; i < SIZE; ++i) {
267 long startTime = System.nanoTime();
268 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
269 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
270 }
271 long startTime = System.nanoTime();
272 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
273 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
274 checkEmpty(q);
275 }
276
277 /**
278 * Interrupted timed poll throws InterruptedException instead of
279 * returning timeout status
280 */
281 public void testInterruptedTimedPoll() throws InterruptedException {
282 final BlockingQueue<Integer> q = populatedQueue(SIZE);
283 final CountDownLatch aboutToWait = new CountDownLatch(1);
284 Thread t = newStartedThread(new CheckedRunnable() {
285 public void realRun() throws InterruptedException {
286 long startTime = System.nanoTime();
287 for (int i = 0; i < SIZE; ++i) {
288 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
289 }
290 aboutToWait.countDown();
291 try {
292 q.poll(LONG_DELAY_MS, MILLISECONDS);
293 shouldThrow();
294 } catch (InterruptedException success) {
295 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
296 }
297 }});
298
299 aboutToWait.await();
300 waitForThreadToEnterWaitState(t, LONG_DELAY_MS);
301 t.interrupt();
302 awaitTermination(t);
303 checkEmpty(q);
304 }
305
306 /**
307 * timed poll after thread interrupted throws InterruptedException
308 * instead of returning timeout status
309 */
310 public void testTimedPollAfterInterrupt() throws InterruptedException {
311 final BlockingQueue<Integer> q = populatedQueue(SIZE);
312 Thread t = newStartedThread(new CheckedRunnable() {
313 public void realRun() throws InterruptedException {
314 Thread.currentThread().interrupt();
315 for (int i = 0; i < SIZE; ++i) {
316 long t0 = System.nanoTime();
317 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
318 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
319 }
320 try {
321 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
322 shouldThrow();
323 } catch (InterruptedException success) {}
324 }});
325
326 awaitTermination(t, MEDIUM_DELAY_MS);
327 checkEmpty(q);
328 }
329
330 /**
331 * peek returns next element, or null if empty
332 */
333 public void testPeek() throws InterruptedException {
334 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
335 for (int i = 0; i < SIZE; ++i) {
336 assertEquals(i, (int) q.peek());
337 assertEquals(i, (int) q.poll());
338 assertTrue(q.peek() == null ||
339 i != (int) q.peek());
340 }
341 assertNull(q.peek());
342 checkEmpty(q);
343 }
344
345 /**
346 * element returns next element, or throws NoSuchElementException if empty
347 */
348 public void testElement() throws InterruptedException {
349 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
350 for (int i = 0; i < SIZE; ++i) {
351 assertEquals(i, (int) q.element());
352 assertEquals(i, (int) q.poll());
353 }
354 try {
355 q.element();
356 shouldThrow();
357 } catch (NoSuchElementException success) {}
358 checkEmpty(q);
359 }
360
361 /**
362 * remove removes next element, or throws NoSuchElementException if empty
363 */
364 public void testRemove() throws InterruptedException {
365 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
366 for (int i = 0; i < SIZE; ++i) {
367 assertEquals(i, (int) q.remove());
368 }
369 try {
370 q.remove();
371 shouldThrow();
372 } catch (NoSuchElementException success) {}
373 checkEmpty(q);
374 }
375
376 /**
377 * An add following remove(x) succeeds
378 */
379 public void testRemoveElementAndAdd() throws InterruptedException {
380 LinkedTransferQueue q = new LinkedTransferQueue();
381 assertTrue(q.add(one));
382 assertTrue(q.add(two));
383 assertTrue(q.remove(one));
384 assertTrue(q.remove(two));
385 assertTrue(q.add(three));
386 assertSame(q.take(), three);
387 }
388
389 /**
390 * contains(x) reports true when elements added but not yet removed
391 */
392 public void testContains() {
393 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
394 for (int i = 0; i < SIZE; ++i) {
395 assertTrue(q.contains(i));
396 assertEquals(i, (int) q.poll());
397 assertFalse(q.contains(i));
398 }
399 }
400
401 /**
402 * clear removes all elements
403 */
404 public void testClear() throws InterruptedException {
405 LinkedTransferQueue q = populatedQueue(SIZE);
406 q.clear();
407 checkEmpty(q);
408 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
409 q.add(one);
410 assertFalse(q.isEmpty());
411 assertEquals(1, q.size());
412 assertTrue(q.contains(one));
413 q.clear();
414 checkEmpty(q);
415 }
416
417 /**
418 * containsAll(c) is true when c contains a subset of elements
419 */
420 public void testContainsAll() {
421 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
422 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<Integer>();
423 for (int i = 0; i < SIZE; ++i) {
424 assertTrue(q.containsAll(p));
425 assertFalse(p.containsAll(q));
426 p.add(i);
427 }
428 assertTrue(p.containsAll(q));
429 }
430
431 /**
432 * retainAll(c) retains only those elements of c and reports true
433 * if changed
434 */
435 public void testRetainAll() {
436 LinkedTransferQueue q = populatedQueue(SIZE);
437 LinkedTransferQueue p = populatedQueue(SIZE);
438 for (int i = 0; i < SIZE; ++i) {
439 boolean changed = q.retainAll(p);
440 if (i == 0) {
441 assertFalse(changed);
442 } else {
443 assertTrue(changed);
444 }
445 assertTrue(q.containsAll(p));
446 assertEquals(SIZE - i, q.size());
447 p.remove();
448 }
449 }
450
451 /**
452 * removeAll(c) removes only those elements of c and reports true
453 * if changed
454 */
455 public void testRemoveAll() {
456 for (int i = 1; i < SIZE; ++i) {
457 LinkedTransferQueue q = populatedQueue(SIZE);
458 LinkedTransferQueue p = populatedQueue(i);
459 assertTrue(q.removeAll(p));
460 assertEquals(SIZE - i, q.size());
461 for (int j = 0; j < i; ++j) {
462 assertFalse(q.contains(p.remove()));
463 }
464 }
465 }
466
467 /**
468 * toArray() contains all elements in FIFO order
469 */
470 public void testToArray() {
471 LinkedTransferQueue q = populatedQueue(SIZE);
472 Object[] o = q.toArray();
473 for (int i = 0; i < o.length; i++) {
474 assertSame(o[i], q.poll());
475 }
476 }
477
478 /**
479 * toArray(a) contains all elements in FIFO order
480 */
481 public void testToArray2() {
482 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
483 Integer[] ints = new Integer[SIZE];
484 Integer[] array = q.toArray(ints);
485 assertSame(ints, array);
486 for (int i = 0; i < ints.length; i++) {
487 assertSame(ints[i], q.poll());
488 }
489 }
490
491 /**
492 * toArray(incompatible array type) throws ArrayStoreException
493 */
494 public void testToArray1_BadArg() {
495 LinkedTransferQueue q = populatedQueue(SIZE);
496 try {
497 q.toArray(new String[10]);
498 shouldThrow();
499 } catch (ArrayStoreException success) {}
500 }
501
502 /**
503 * iterator iterates through all elements
504 */
505 public void testIterator() throws InterruptedException {
506 LinkedTransferQueue q = populatedQueue(SIZE);
507 Iterator it = q.iterator();
508 int i;
509 for (i = 0; it.hasNext(); i++)
510 assertTrue(q.contains(it.next()));
511 assertEquals(i, SIZE);
512 assertIteratorExhausted(it);
513
514 it = q.iterator();
515 for (i = 0; it.hasNext(); i++)
516 assertEquals(it.next(), q.take());
517 assertEquals(i, SIZE);
518 assertIteratorExhausted(it);
519 }
520
521 /**
522 * iterator of empty collection has no elements
523 */
524 public void testEmptyIterator() {
525 assertIteratorExhausted(new LinkedTransferQueue().iterator());
526 }
527
528 /**
529 * iterator.remove() removes current element
530 */
531 public void testIteratorRemove() {
532 final LinkedTransferQueue q = new LinkedTransferQueue();
533 q.add(two);
534 q.add(one);
535 q.add(three);
536
537 Iterator it = q.iterator();
538 it.next();
539 it.remove();
540
541 it = q.iterator();
542 assertSame(it.next(), one);
543 assertSame(it.next(), three);
544 assertFalse(it.hasNext());
545 }
546
547 /**
548 * iterator ordering is FIFO
549 */
550 public void testIteratorOrdering() {
551 final LinkedTransferQueue<Integer> q
552 = new LinkedTransferQueue<Integer>();
553 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
554 q.add(one);
555 q.add(two);
556 q.add(three);
557 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
558 int k = 0;
559 for (Integer n : q) {
560 assertEquals(++k, (int) n);
561 }
562 assertEquals(3, k);
563 }
564
565 /**
566 * Modifications do not cause iterators to fail
567 */
568 public void testWeaklyConsistentIteration() {
569 final LinkedTransferQueue q = new LinkedTransferQueue();
570 q.add(one);
571 q.add(two);
572 q.add(three);
573 for (Iterator it = q.iterator(); it.hasNext();) {
574 q.remove();
575 it.next();
576 }
577 assertEquals(0, q.size());
578 }
579
580 /**
581 * toString contains toStrings of elements
582 */
583 public void testToString() {
584 LinkedTransferQueue q = populatedQueue(SIZE);
585 String s = q.toString();
586 for (int i = 0; i < SIZE; ++i) {
587 assertTrue(s.contains(String.valueOf(i)));
588 }
589 }
590
591 /**
592 * offer transfers elements across Executor tasks
593 */
594 public void testOfferInExecutor() {
595 final LinkedTransferQueue q = new LinkedTransferQueue();
596 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
597 final ExecutorService executor = Executors.newFixedThreadPool(2);
598 try (PoolCleaner cleaner = cleaner(executor)) {
599
600 executor.execute(new CheckedRunnable() {
601 public void realRun() throws InterruptedException {
602 threadsStarted.await();
603 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
604 }});
605
606 executor.execute(new CheckedRunnable() {
607 public void realRun() throws InterruptedException {
608 threadsStarted.await();
609 assertSame(one, q.take());
610 checkEmpty(q);
611 }});
612 }
613 }
614
615 /**
616 * timed poll retrieves elements across Executor threads
617 */
618 public void testPollInExecutor() {
619 final LinkedTransferQueue q = new LinkedTransferQueue();
620 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
621 final ExecutorService executor = Executors.newFixedThreadPool(2);
622 try (PoolCleaner cleaner = cleaner(executor)) {
623
624 executor.execute(new CheckedRunnable() {
625 public void realRun() throws InterruptedException {
626 assertNull(q.poll());
627 threadsStarted.await();
628 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
629 checkEmpty(q);
630 }});
631
632 executor.execute(new CheckedRunnable() {
633 public void realRun() throws InterruptedException {
634 threadsStarted.await();
635 q.put(one);
636 }});
637 }
638 }
639
640 /**
641 * A deserialized serialized queue has same elements in same order
642 */
643 public void testSerialization() throws Exception {
644 Queue x = populatedQueue(SIZE);
645 Queue y = serialClone(x);
646
647 assertNotSame(y, x);
648 assertEquals(x.size(), y.size());
649 assertEquals(x.toString(), y.toString());
650 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
651 while (!x.isEmpty()) {
652 assertFalse(y.isEmpty());
653 assertEquals(x.remove(), y.remove());
654 }
655 assertTrue(y.isEmpty());
656 }
657
658 /**
659 * drainTo(c) empties queue into another collection c
660 */
661 public void testDrainTo() {
662 LinkedTransferQueue q = populatedQueue(SIZE);
663 ArrayList l = new ArrayList();
664 q.drainTo(l);
665 assertEquals(0, q.size());
666 assertEquals(SIZE, l.size());
667 for (int i = 0; i < SIZE; ++i) {
668 assertEquals(i, l.get(i));
669 }
670 q.add(zero);
671 q.add(one);
672 assertFalse(q.isEmpty());
673 assertTrue(q.contains(zero));
674 assertTrue(q.contains(one));
675 l.clear();
676 q.drainTo(l);
677 assertEquals(0, q.size());
678 assertEquals(2, l.size());
679 for (int i = 0; i < 2; ++i) {
680 assertEquals(i, l.get(i));
681 }
682 }
683
684 /**
685 * drainTo(c) empties full queue, unblocking a waiting put.
686 */
687 public void testDrainToWithActivePut() throws InterruptedException {
688 final LinkedTransferQueue q = populatedQueue(SIZE);
689 Thread t = newStartedThread(new CheckedRunnable() {
690 public void realRun() {
691 q.put(SIZE + 1);
692 }});
693 ArrayList l = new ArrayList();
694 q.drainTo(l);
695 assertTrue(l.size() >= SIZE);
696 for (int i = 0; i < SIZE; ++i)
697 assertEquals(i, l.get(i));
698 awaitTermination(t, MEDIUM_DELAY_MS);
699 assertTrue(q.size() + l.size() >= SIZE);
700 }
701
702 /**
703 * drainTo(c, n) empties first min(n, size) elements of queue into c
704 */
705 public void testDrainToN() {
706 LinkedTransferQueue q = new LinkedTransferQueue();
707 for (int i = 0; i < SIZE + 2; ++i) {
708 for (int j = 0; j < SIZE; j++) {
709 assertTrue(q.offer(j));
710 }
711 ArrayList l = new ArrayList();
712 q.drainTo(l, i);
713 int k = (i < SIZE) ? i : SIZE;
714 assertEquals(k, l.size());
715 assertEquals(SIZE - k, q.size());
716 for (int j = 0; j < k; ++j)
717 assertEquals(j, l.get(j));
718 do {} while (q.poll() != null);
719 }
720 }
721
722 /**
723 * timed poll() or take() increments the waiting consumer count;
724 * offer(e) decrements the waiting consumer count
725 */
726 public void testWaitingConsumer() throws InterruptedException {
727 final LinkedTransferQueue q = new LinkedTransferQueue();
728 assertEquals(0, q.getWaitingConsumerCount());
729 assertFalse(q.hasWaitingConsumer());
730 final CountDownLatch threadStarted = new CountDownLatch(1);
731
732 Thread t = newStartedThread(new CheckedRunnable() {
733 public void realRun() throws InterruptedException {
734 threadStarted.countDown();
735 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
736 assertEquals(0, q.getWaitingConsumerCount());
737 assertFalse(q.hasWaitingConsumer());
738 }});
739
740 threadStarted.await();
741 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
742 assertEquals(1, q.getWaitingConsumerCount());
743 assertTrue(q.hasWaitingConsumer());
744
745 assertTrue(q.offer(one));
746 assertEquals(0, q.getWaitingConsumerCount());
747 assertFalse(q.hasWaitingConsumer());
748
749 awaitTermination(t, MEDIUM_DELAY_MS);
750 }
751
752 /**
753 * transfer(null) throws NullPointerException
754 */
755 public void testTransfer1() throws InterruptedException {
756 try {
757 LinkedTransferQueue q = new LinkedTransferQueue();
758 q.transfer(null);
759 shouldThrow();
760 } catch (NullPointerException success) {}
761 }
762
763 /**
764 * transfer waits until a poll occurs. The transfered element
765 * is returned by this associated poll.
766 */
767 public void testTransfer2() throws InterruptedException {
768 final LinkedTransferQueue<Integer> q
769 = new LinkedTransferQueue<Integer>();
770 final CountDownLatch threadStarted = new CountDownLatch(1);
771
772 Thread t = newStartedThread(new CheckedRunnable() {
773 public void realRun() throws InterruptedException {
774 threadStarted.countDown();
775 q.transfer(five);
776 checkEmpty(q);
777 }});
778
779 threadStarted.await();
780 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
781 assertEquals(1, q.size());
782 assertSame(five, q.poll());
783 checkEmpty(q);
784 awaitTermination(t, MEDIUM_DELAY_MS);
785 }
786
787 /**
788 * transfer waits until a poll occurs, and then transfers in fifo order
789 */
790 public void testTransfer3() throws InterruptedException {
791 final LinkedTransferQueue<Integer> q
792 = new LinkedTransferQueue<Integer>();
793
794 Thread first = newStartedThread(new CheckedRunnable() {
795 public void realRun() throws InterruptedException {
796 q.transfer(four);
797 assertTrue(!q.contains(four));
798 assertEquals(1, q.size());
799 }});
800
801 Thread interruptedThread = newStartedThread(
802 new CheckedInterruptedRunnable() {
803 public void realRun() throws InterruptedException {
804 while (q.isEmpty())
805 Thread.yield();
806 q.transfer(five);
807 }});
808
809 while (q.size() < 2)
810 Thread.yield();
811 assertEquals(2, q.size());
812 assertSame(four, q.poll());
813 first.join();
814 assertEquals(1, q.size());
815 interruptedThread.interrupt();
816 interruptedThread.join();
817 checkEmpty(q);
818 }
819
820 /**
821 * transfer waits until a poll occurs, at which point the polling
822 * thread returns the element
823 */
824 public void testTransfer4() throws InterruptedException {
825 final LinkedTransferQueue q = new LinkedTransferQueue();
826
827 Thread t = newStartedThread(new CheckedRunnable() {
828 public void realRun() throws InterruptedException {
829 q.transfer(four);
830 assertFalse(q.contains(four));
831 assertSame(three, q.poll());
832 }});
833
834 while (q.isEmpty())
835 Thread.yield();
836 assertFalse(q.isEmpty());
837 assertEquals(1, q.size());
838 assertTrue(q.offer(three));
839 assertSame(four, q.poll());
840 awaitTermination(t, MEDIUM_DELAY_MS);
841 }
842
843 /**
844 * transfer waits until a take occurs. The transfered element
845 * is returned by this associated take.
846 */
847 public void testTransfer5() throws InterruptedException {
848 final LinkedTransferQueue<Integer> q
849 = new LinkedTransferQueue<Integer>();
850
851 Thread t = newStartedThread(new CheckedRunnable() {
852 public void realRun() throws InterruptedException {
853 q.transfer(four);
854 checkEmpty(q);
855 }});
856
857 while (q.isEmpty())
858 Thread.yield();
859 assertFalse(q.isEmpty());
860 assertEquals(1, q.size());
861 assertSame(four, q.take());
862 checkEmpty(q);
863 awaitTermination(t, MEDIUM_DELAY_MS);
864 }
865
866 /**
867 * tryTransfer(null) throws NullPointerException
868 */
869 public void testTryTransfer1() {
870 final LinkedTransferQueue q = new LinkedTransferQueue();
871 try {
872 q.tryTransfer(null);
873 shouldThrow();
874 } catch (NullPointerException success) {}
875 }
876
877 /**
878 * tryTransfer returns false and does not enqueue if there are no
879 * consumers waiting to poll or take.
880 */
881 public void testTryTransfer2() throws InterruptedException {
882 final LinkedTransferQueue q = new LinkedTransferQueue();
883 assertFalse(q.tryTransfer(new Object()));
884 assertFalse(q.hasWaitingConsumer());
885 checkEmpty(q);
886 }
887
888 /**
889 * If there is a consumer waiting in timed poll, tryTransfer
890 * returns true while successfully transfering object.
891 */
892 public void testTryTransfer3() throws InterruptedException {
893 final Object hotPotato = new Object();
894 final LinkedTransferQueue q = new LinkedTransferQueue();
895
896 Thread t = newStartedThread(new CheckedRunnable() {
897 public void realRun() {
898 while (! q.hasWaitingConsumer())
899 Thread.yield();
900 assertTrue(q.hasWaitingConsumer());
901 checkEmpty(q);
902 assertTrue(q.tryTransfer(hotPotato));
903 }});
904
905 assertSame(hotPotato, q.poll(MEDIUM_DELAY_MS, MILLISECONDS));
906 checkEmpty(q);
907 awaitTermination(t, MEDIUM_DELAY_MS);
908 }
909
910 /**
911 * If there is a consumer waiting in take, tryTransfer returns
912 * true while successfully transfering object.
913 */
914 public void testTryTransfer4() throws InterruptedException {
915 final Object hotPotato = new Object();
916 final LinkedTransferQueue q = new LinkedTransferQueue();
917
918 Thread t = newStartedThread(new CheckedRunnable() {
919 public void realRun() {
920 while (! q.hasWaitingConsumer())
921 Thread.yield();
922 assertTrue(q.hasWaitingConsumer());
923 checkEmpty(q);
924 assertTrue(q.tryTransfer(hotPotato));
925 }});
926
927 assertSame(q.take(), hotPotato);
928 checkEmpty(q);
929 awaitTermination(t, MEDIUM_DELAY_MS);
930 }
931
932 /**
933 * tryTransfer blocks interruptibly if no takers
934 */
935 public void testTryTransfer5() throws InterruptedException {
936 final LinkedTransferQueue q = new LinkedTransferQueue();
937 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
938 assertTrue(q.isEmpty());
939
940 Thread t = newStartedThread(new CheckedRunnable() {
941 public void realRun() throws InterruptedException {
942 Thread.currentThread().interrupt();
943 try {
944 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
945 shouldThrow();
946 } catch (InterruptedException success) {}
947 assertFalse(Thread.interrupted());
948
949 pleaseInterrupt.countDown();
950 try {
951 q.tryTransfer(new Object(), LONG_DELAY_MS, MILLISECONDS);
952 shouldThrow();
953 } catch (InterruptedException success) {}
954 assertFalse(Thread.interrupted());
955 }});
956
957 await(pleaseInterrupt);
958 assertThreadStaysAlive(t);
959 t.interrupt();
960 awaitTermination(t);
961 checkEmpty(q);
962 }
963
964 /**
965 * tryTransfer gives up after the timeout and returns false
966 */
967 public void testTryTransfer6() throws InterruptedException {
968 final LinkedTransferQueue q = new LinkedTransferQueue();
969
970 Thread t = newStartedThread(new CheckedRunnable() {
971 public void realRun() throws InterruptedException {
972 long t0 = System.nanoTime();
973 assertFalse(q.tryTransfer(new Object(),
974 timeoutMillis(), MILLISECONDS));
975 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
976 checkEmpty(q);
977 }});
978
979 awaitTermination(t);
980 checkEmpty(q);
981 }
982
983 /**
984 * tryTransfer waits for any elements previously in to be removed
985 * before transfering to a poll or take
986 */
987 public void testTryTransfer7() throws InterruptedException {
988 final LinkedTransferQueue q = new LinkedTransferQueue();
989 assertTrue(q.offer(four));
990
991 Thread t = newStartedThread(new CheckedRunnable() {
992 public void realRun() throws InterruptedException {
993 assertTrue(q.tryTransfer(five, MEDIUM_DELAY_MS, MILLISECONDS));
994 checkEmpty(q);
995 }});
996
997 while (q.size() != 2)
998 Thread.yield();
999 assertEquals(2, q.size());
1000 assertSame(four, q.poll());
1001 assertSame(five, q.poll());
1002 checkEmpty(q);
1003 awaitTermination(t, MEDIUM_DELAY_MS);
1004 }
1005
1006 /**
1007 * tryTransfer attempts to enqueue into the queue and fails
1008 * returning false not enqueueing and the successive poll is null
1009 */
1010 public void testTryTransfer8() throws InterruptedException {
1011 final LinkedTransferQueue q = new LinkedTransferQueue();
1012 assertTrue(q.offer(four));
1013 assertEquals(1, q.size());
1014 long t0 = System.nanoTime();
1015 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1016 assertTrue(millisElapsedSince(t0) >= timeoutMillis());
1017 assertEquals(1, q.size());
1018 assertSame(four, q.poll());
1019 assertNull(q.poll());
1020 checkEmpty(q);
1021 }
1022
1023 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1024 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<Integer>();
1025 checkEmpty(q);
1026 for (int i = 0; i < n; i++) {
1027 assertEquals(i, q.size());
1028 assertTrue(q.offer(i));
1029 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1030 }
1031 assertFalse(q.isEmpty());
1032 return q;
1033 }
1034
1035 /**
1036 * remove(null), contains(null) always return false
1037 */
1038 public void testNeverContainsNull() {
1039 Collection<?>[] qs = {
1040 new LinkedTransferQueue<Object>(),
1041 populatedQueue(2),
1042 };
1043
1044 for (Collection<?> q : qs) {
1045 assertFalse(q.contains(null));
1046 assertFalse(q.remove(null));
1047 }
1048 }
1049 }