ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.87
Committed: Wed Jan 27 01:57:24 2021 UTC (3 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.86: +24 -25 lines
Log Message:
use diamond <> pervasively

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23
24 import junit.framework.Test;
25
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends JSR166TestCase {
28 public static class Generic extends BlockingQueueTest {
29 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
31 }
32 }
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37
38 public static Test suite() {
39 class Implementation implements CollectionImplementation {
40 public Class<?> klazz() { return LinkedTransferQueue.class; }
41 public Collection emptyCollection() { return new LinkedTransferQueue(); }
42 public Object makeElement(int i) { return JSR166TestCase.itemFor(i); }
43 public boolean isConcurrent() { return true; }
44 public boolean permitsNulls() { return false; }
45 }
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 mustEqual(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Item> elements = Arrays.asList(new Item[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Item[] items = new Item[2];
89 items[0] = zero;
90 Collection<Item> elements = Arrays.asList(items);
91 try {
92 new LinkedTransferQueue(elements);
93 shouldThrow();
94 } catch (NullPointerException success) {}
95 }
96
97 /**
98 * Queue contains all elements of the collection it is initialized by
99 */
100 public void testConstructor5() {
101 Item[] items = defaultItems;
102 List<Item> intList = Arrays.asList(items);
103 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>(intList);
104 mustEqual(q.size(), intList.size());
105 mustEqual(q.toString(), intList.toString());
106 assertTrue(Arrays.equals(q.toArray(),
107 intList.toArray()));
108 assertTrue(Arrays.equals(q.toArray(new Object[0]),
109 intList.toArray(new Object[0])));
110 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
111 intList.toArray(new Object[SIZE])));
112 for (int i = 0; i < SIZE; ++i) {
113 mustEqual(items[i], q.poll());
114 }
115 }
116
117 /**
118 * remainingCapacity() always returns Integer.MAX_VALUE
119 */
120 public void testRemainingCapacity() {
121 BlockingQueue<Item> q = populatedQueue(SIZE);
122 for (int i = 0; i < SIZE; ++i) {
123 mustEqual(Integer.MAX_VALUE, q.remainingCapacity());
124 mustEqual(SIZE - i, q.size());
125 mustEqual(i, q.remove());
126 }
127 for (int i = 0; i < SIZE; ++i) {
128 mustEqual(Integer.MAX_VALUE, q.remainingCapacity());
129 mustEqual(i, q.size());
130 mustAdd(q, i);
131 }
132 }
133
134 /**
135 * addAll(this) throws IllegalArgumentException
136 */
137 public void testAddAllSelf() {
138 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
139 try {
140 q.addAll(q);
141 shouldThrow();
142 } catch (IllegalArgumentException success) {}
143 }
144
145 /**
146 * addAll of a collection with any null elements throws
147 * NullPointerException after possibly adding some elements
148 */
149 public void testAddAll3() {
150 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
151 Item[] items = new Item[2]; items[0] = zero;
152 try {
153 q.addAll(Arrays.asList(items));
154 shouldThrow();
155 } catch (NullPointerException success) {}
156 }
157
158 /**
159 * Queue contains all elements, in traversal order, of successful addAll
160 */
161 public void testAddAll5() {
162 Item[] empty = new Item[0];
163 Item[] items = defaultItems;
164 LinkedTransferQueue q = new LinkedTransferQueue();
165 assertFalse(q.addAll(Arrays.asList(empty)));
166 assertTrue(q.addAll(Arrays.asList(items)));
167 for (int i = 0; i < SIZE; ++i) {
168 mustEqual(items[i], q.poll());
169 }
170 }
171
172 /**
173 * all elements successfully put are contained
174 */
175 public void testPut() {
176 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
177 Item[] items = defaultItems;
178 for (int i = 0; i < SIZE; ++i) {
179 mustEqual(i, q.size());
180 q.put(items[i]);
181 mustContain(q, items[i]);
182 }
183 }
184
185 /**
186 * take retrieves elements in FIFO order
187 */
188 public void testTake() throws InterruptedException {
189 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
190 for (int i = 0; i < SIZE; ++i) {
191 mustEqual(i, q.take());
192 }
193 }
194
195 /**
196 * take removes existing elements until empty, then blocks interruptibly
197 */
198 public void testBlockingTake() throws InterruptedException {
199 final BlockingQueue<Item> q = populatedQueue(SIZE);
200 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
201 Thread t = newStartedThread(new CheckedRunnable() {
202 public void realRun() throws InterruptedException {
203 for (int i = 0; i < SIZE; i++) mustEqual(i, q.take());
204
205 Thread.currentThread().interrupt();
206 try {
207 q.take();
208 shouldThrow();
209 } catch (InterruptedException success) {}
210 assertFalse(Thread.interrupted());
211
212 pleaseInterrupt.countDown();
213 try {
214 q.take();
215 shouldThrow();
216 } catch (InterruptedException success) {}
217 assertFalse(Thread.interrupted());
218 }});
219
220 await(pleaseInterrupt);
221 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
222 t.interrupt();
223 awaitTermination(t);
224 }
225
226 /**
227 * poll succeeds unless empty
228 */
229 public void testPoll() throws InterruptedException {
230 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
231 for (int i = 0; i < SIZE; ++i) {
232 mustEqual(i, q.poll());
233 }
234 assertNull(q.poll());
235 checkEmpty(q);
236 }
237
238 /**
239 * timed poll with zero timeout succeeds when non-empty, else times out
240 */
241 public void testTimedPoll0() throws InterruptedException {
242 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
243 for (int i = 0; i < SIZE; ++i) {
244 mustEqual(i, q.poll(0, MILLISECONDS));
245 }
246 assertNull(q.poll(0, MILLISECONDS));
247 checkEmpty(q);
248 }
249
250 /**
251 * timed poll with nonzero timeout succeeds when non-empty, else times out
252 */
253 public void testTimedPoll() throws InterruptedException {
254 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
255 long startTime = System.nanoTime();
256 for (int i = 0; i < SIZE; ++i)
257 mustEqual(i, q.poll(LONG_DELAY_MS, MILLISECONDS));
258 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
259
260 startTime = System.nanoTime();
261 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
262 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
263 checkEmpty(q);
264 }
265
266 /**
267 * Interrupted timed poll throws InterruptedException instead of
268 * returning timeout status
269 */
270 public void testInterruptedTimedPoll() throws InterruptedException {
271 final BlockingQueue<Item> q = populatedQueue(SIZE);
272 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
273 Thread t = newStartedThread(new CheckedRunnable() {
274 public void realRun() throws InterruptedException {
275 for (int i = 0; i < SIZE; i++)
276 mustEqual(i, q.poll(LONG_DELAY_MS, MILLISECONDS));
277
278 Thread.currentThread().interrupt();
279 try {
280 q.poll(randomTimeout(), randomTimeUnit());
281 shouldThrow();
282 } catch (InterruptedException success) {}
283 assertFalse(Thread.interrupted());
284
285 pleaseInterrupt.countDown();
286 try {
287 q.poll(LONGER_DELAY_MS, MILLISECONDS);
288 shouldThrow();
289 } catch (InterruptedException success) {}
290 assertFalse(Thread.interrupted());
291 }});
292
293 await(pleaseInterrupt);
294 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
295 t.interrupt();
296 awaitTermination(t);
297 checkEmpty(q);
298 }
299
300 /**
301 * timed poll after thread interrupted throws InterruptedException
302 * instead of returning timeout status
303 */
304 public void testTimedPollAfterInterrupt() throws InterruptedException {
305 final BlockingQueue<Item> q = populatedQueue(SIZE);
306 Thread t = newStartedThread(new CheckedRunnable() {
307 public void realRun() throws InterruptedException {
308 Thread.currentThread().interrupt();
309 for (int i = 0; i < SIZE; ++i)
310 mustEqual(i, q.poll(randomTimeout(), randomTimeUnit()));
311 try {
312 q.poll(randomTimeout(), randomTimeUnit());
313 shouldThrow();
314 } catch (InterruptedException success) {}
315 assertFalse(Thread.interrupted());
316 }});
317
318 awaitTermination(t);
319 checkEmpty(q);
320 }
321
322 /**
323 * peek returns next element, or null if empty
324 */
325 public void testPeek() throws InterruptedException {
326 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
327 for (int i = 0; i < SIZE; ++i) {
328 mustEqual(i, q.peek());
329 mustEqual(i, q.poll());
330 assertTrue(q.peek() == null ||
331 i != q.peek().value);
332 }
333 assertNull(q.peek());
334 checkEmpty(q);
335 }
336
337 /**
338 * element returns next element, or throws NoSuchElementException if empty
339 */
340 public void testElement() throws InterruptedException {
341 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
342 for (int i = 0; i < SIZE; ++i) {
343 mustEqual(i, q.element());
344 mustEqual(i, q.poll());
345 }
346 try {
347 q.element();
348 shouldThrow();
349 } catch (NoSuchElementException success) {}
350 checkEmpty(q);
351 }
352
353 /**
354 * remove removes next element, or throws NoSuchElementException if empty
355 */
356 public void testRemove() throws InterruptedException {
357 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
358 for (int i = 0; i < SIZE; ++i) {
359 mustEqual(i, q.remove());
360 }
361 try {
362 q.remove();
363 shouldThrow();
364 } catch (NoSuchElementException success) {}
365 checkEmpty(q);
366 }
367
368 /**
369 * An add following remove(x) succeeds
370 */
371 public void testRemoveElementAndAdd() throws InterruptedException {
372 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
373 mustAdd(q, one);
374 mustAdd(q, two);
375 mustRemove(q, one);
376 mustRemove(q, two);
377 mustAdd(q, three);
378 mustEqual(q.take(), three);
379 }
380
381 /**
382 * contains(x) reports true when elements added but not yet removed
383 */
384 public void testContains() {
385 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
386 for (int i = 0; i < SIZE; ++i) {
387 mustContain(q, i);
388 mustEqual(i, q.poll());
389 mustNotContain(q, i);
390 }
391 }
392
393 /**
394 * clear removes all elements
395 */
396 public void testClear() throws InterruptedException {
397 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
398 q.clear();
399 checkEmpty(q);
400 mustEqual(Integer.MAX_VALUE, q.remainingCapacity());
401 q.add(one);
402 assertFalse(q.isEmpty());
403 mustEqual(1, q.size());
404 mustContain(q, one);
405 q.clear();
406 checkEmpty(q);
407 }
408
409 /**
410 * containsAll(c) is true when c contains a subset of elements
411 */
412 public void testContainsAll() {
413 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
414 LinkedTransferQueue<Item> p = new LinkedTransferQueue<>();
415 for (int i = 0; i < SIZE; ++i) {
416 assertTrue(q.containsAll(p));
417 assertFalse(p.containsAll(q));
418 mustAdd(p, i);
419 }
420 assertTrue(p.containsAll(q));
421 }
422
423 /**
424 * retainAll(c) retains only those elements of c and reports true
425 * if changed
426 */
427 public void testRetainAll() {
428 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
429 LinkedTransferQueue<Item> p = populatedQueue(SIZE);
430 for (int i = 0; i < SIZE; ++i) {
431 boolean changed = q.retainAll(p);
432 if (i == 0) {
433 assertFalse(changed);
434 } else {
435 assertTrue(changed);
436 }
437 assertTrue(q.containsAll(p));
438 mustEqual(SIZE - i, q.size());
439 p.remove();
440 }
441 }
442
443 /**
444 * removeAll(c) removes only those elements of c and reports true
445 * if changed
446 */
447 public void testRemoveAll() {
448 for (int i = 1; i < SIZE; ++i) {
449 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
450 LinkedTransferQueue<Item> p = populatedQueue(i);
451 assertTrue(q.removeAll(p));
452 mustEqual(SIZE - i, q.size());
453 for (int j = 0; j < i; ++j) {
454 mustNotContain(q, p.remove());
455 }
456 }
457 }
458
459 /**
460 * toArray() contains all elements in FIFO order
461 */
462 public void testToArray() {
463 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
464 Object[] a = q.toArray();
465 assertSame(Object[].class, a.getClass());
466 for (Object o : a)
467 assertSame(o, q.poll());
468 assertTrue(q.isEmpty());
469 }
470
471 /**
472 * toArray(a) contains all elements in FIFO order
473 */
474 public void testToArray2() {
475 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
476 Item[] items = new Item[SIZE];
477 Item[] array = q.toArray(items);
478 assertSame(items, array);
479 for (Item o : items)
480 assertSame(o, q.poll());
481 assertTrue(q.isEmpty());
482 }
483
484 /**
485 * toArray(incompatible array type) throws ArrayStoreException
486 */
487 public void testToArray1_BadArg() {
488 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
489 try {
490 q.toArray(new String[10]);
491 shouldThrow();
492 } catch (ArrayStoreException success) {}
493 }
494
495 /**
496 * iterator iterates through all elements
497 */
498 public void testIterator() throws InterruptedException {
499 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
500 Iterator<? extends Item> it = q.iterator();
501 int i;
502 for (i = 0; it.hasNext(); i++)
503 mustContain(q, it.next());
504 mustEqual(i, SIZE);
505 assertIteratorExhausted(it);
506
507 it = q.iterator();
508 for (i = 0; it.hasNext(); i++)
509 mustEqual(it.next(), q.take());
510 mustEqual(i, SIZE);
511 assertIteratorExhausted(it);
512 }
513
514 /**
515 * iterator of empty collection has no elements
516 */
517 public void testEmptyIterator() {
518 assertIteratorExhausted(new LinkedTransferQueue().iterator());
519 }
520
521 /**
522 * iterator.remove() removes current element
523 */
524 public void testIteratorRemove() {
525 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
526 q.add(two);
527 q.add(one);
528 q.add(three);
529
530 Iterator<? extends Item> it = q.iterator();
531 it.next();
532 it.remove();
533
534 it = q.iterator();
535 assertSame(it.next(), one);
536 assertSame(it.next(), three);
537 assertFalse(it.hasNext());
538 }
539
540 /**
541 * iterator ordering is FIFO
542 */
543 public void testIteratorOrdering() {
544 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
545 mustEqual(Integer.MAX_VALUE, q.remainingCapacity());
546 q.add(one);
547 q.add(two);
548 q.add(three);
549 mustEqual(Integer.MAX_VALUE, q.remainingCapacity());
550 int k = 0;
551 for (Item n : q) {
552 mustEqual(++k, n);
553 }
554 mustEqual(3, k);
555 }
556
557 /**
558 * Modifications do not cause iterators to fail
559 */
560 public void testWeaklyConsistentIteration() {
561 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
562 q.add(one);
563 q.add(two);
564 q.add(three);
565 for (Iterator<? extends Item> it = q.iterator(); it.hasNext();) {
566 q.remove();
567 it.next();
568 }
569 mustEqual(0, q.size());
570 }
571
572 /**
573 * toString contains toStrings of elements
574 */
575 public void testToString() {
576 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
577 String s = q.toString();
578 for (int i = 0; i < SIZE; ++i) {
579 assertTrue(s.contains(String.valueOf(i)));
580 }
581 }
582
583 /**
584 * offer transfers elements across Executor tasks
585 */
586 public void testOfferInExecutor() {
587 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
588 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
589 final ExecutorService executor = Executors.newFixedThreadPool(2);
590 try (PoolCleaner cleaner = cleaner(executor)) {
591
592 executor.execute(new CheckedRunnable() {
593 public void realRun() throws InterruptedException {
594 threadsStarted.await();
595 long startTime = System.nanoTime();
596 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
597 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
598 }});
599
600 executor.execute(new CheckedRunnable() {
601 public void realRun() throws InterruptedException {
602 threadsStarted.await();
603 assertSame(one, q.take());
604 checkEmpty(q);
605 }});
606 }
607 }
608
609 /**
610 * timed poll retrieves elements across Executor threads
611 */
612 public void testPollInExecutor() {
613 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
614 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
615 final ExecutorService executor = Executors.newFixedThreadPool(2);
616 try (PoolCleaner cleaner = cleaner(executor)) {
617
618 executor.execute(new CheckedRunnable() {
619 public void realRun() throws InterruptedException {
620 assertNull(q.poll());
621 threadsStarted.await();
622 long startTime = System.nanoTime();
623 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
624 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
625 checkEmpty(q);
626 }});
627
628 executor.execute(new CheckedRunnable() {
629 public void realRun() throws InterruptedException {
630 threadsStarted.await();
631 q.put(one);
632 }});
633 }
634 }
635
636 /**
637 * A deserialized/reserialized queue has same elements in same order
638 */
639 public void testSerialization() throws Exception {
640 Queue<Item> x = populatedQueue(SIZE);
641 Queue<Item> y = serialClone(x);
642
643 assertNotSame(y, x);
644 mustEqual(x.size(), y.size());
645 mustEqual(x.toString(), y.toString());
646 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
647 while (!x.isEmpty()) {
648 assertFalse(y.isEmpty());
649 mustEqual(x.remove(), y.remove());
650 }
651 assertTrue(y.isEmpty());
652 }
653
654 /**
655 * drainTo(c) empties queue into another collection c
656 */
657 public void testDrainTo() {
658 LinkedTransferQueue<Item> q = populatedQueue(SIZE);
659 ArrayList l = new ArrayList();
660 q.drainTo(l);
661 mustEqual(0, q.size());
662 mustEqual(SIZE, l.size());
663 for (int i = 0; i < SIZE; ++i) {
664 mustEqual(i, l.get(i));
665 }
666 q.add(zero);
667 q.add(one);
668 assertFalse(q.isEmpty());
669 mustContain(q, zero);
670 mustContain(q, one);
671 l.clear();
672 q.drainTo(l);
673 mustEqual(0, q.size());
674 mustEqual(2, l.size());
675 for (int i = 0; i < 2; ++i) {
676 mustEqual(i, l.get(i));
677 }
678 }
679
680 /**
681 * drainTo(c) empties full queue, unblocking a waiting put.
682 */
683 public void testDrainToWithActivePut() throws InterruptedException {
684 final LinkedTransferQueue<Item> q = populatedQueue(SIZE);
685 Thread t = newStartedThread(new CheckedRunnable() {
686 public void realRun() {
687 q.put(new Item(SIZE + 1));
688 }});
689 ArrayList l = new ArrayList();
690 q.drainTo(l);
691 assertTrue(l.size() >= SIZE);
692 for (int i = 0; i < SIZE; ++i)
693 mustEqual(i, l.get(i));
694 awaitTermination(t);
695 assertTrue(q.size() + l.size() >= SIZE);
696 }
697
698 /**
699 * drainTo(c, n) empties first min(n, size) elements of queue into c
700 */
701 public void testDrainToN() {
702 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
703 for (int i = 0; i < SIZE + 2; ++i) {
704 for (int j = 0; j < SIZE; j++) {
705 mustOffer(q, j);
706 }
707 ArrayList l = new ArrayList();
708 q.drainTo(l, i);
709 int k = (i < SIZE) ? i : SIZE;
710 mustEqual(k, l.size());
711 mustEqual(SIZE - k, q.size());
712 for (int j = 0; j < k; ++j)
713 mustEqual(j, l.get(j));
714 do {} while (q.poll() != null);
715 }
716 }
717
718 /**
719 * timed poll() or take() increments the waiting consumer count;
720 * offer(e) decrements the waiting consumer count
721 */
722 public void testWaitingConsumer() throws InterruptedException {
723 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
724 mustEqual(0, q.getWaitingConsumerCount());
725 assertFalse(q.hasWaitingConsumer());
726 final CountDownLatch threadStarted = new CountDownLatch(1);
727
728 Thread t = newStartedThread(new CheckedRunnable() {
729 public void realRun() throws InterruptedException {
730 threadStarted.countDown();
731 long startTime = System.nanoTime();
732 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
733 mustEqual(0, q.getWaitingConsumerCount());
734 assertFalse(q.hasWaitingConsumer());
735 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
736 }});
737
738 threadStarted.await();
739 Callable<Boolean> oneConsumer = new Callable<>() {
740 public Boolean call() {
741 return q.hasWaitingConsumer()
742 && q.getWaitingConsumerCount() == 1; }};
743 waitForThreadToEnterWaitState(t, oneConsumer);
744
745 assertTrue(q.offer(one));
746 mustEqual(0, q.getWaitingConsumerCount());
747 assertFalse(q.hasWaitingConsumer());
748
749 awaitTermination(t);
750 }
751
752 /**
753 * transfer(null) throws NullPointerException
754 */
755 public void testTransfer1() throws InterruptedException {
756 try {
757 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
758 q.transfer(null);
759 shouldThrow();
760 } catch (NullPointerException success) {}
761 }
762
763 /**
764 * transfer waits until a poll occurs. The transferred element
765 * is returned by the associated poll.
766 */
767 public void testTransfer2() throws InterruptedException {
768 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
769 final CountDownLatch threadStarted = new CountDownLatch(1);
770
771 Thread t = newStartedThread(new CheckedRunnable() {
772 public void realRun() throws InterruptedException {
773 threadStarted.countDown();
774 q.transfer(five);
775 checkEmpty(q);
776 }});
777
778 threadStarted.await();
779 Callable<Boolean> oneElement = new Callable<>() {
780 public Boolean call() {
781 return !q.isEmpty() && q.size() == 1; }};
782 waitForThreadToEnterWaitState(t, oneElement);
783
784 assertSame(five, q.poll());
785 checkEmpty(q);
786 awaitTermination(t);
787 }
788
789 /**
790 * transfer waits until a poll occurs, and then transfers in fifo order
791 */
792 public void testTransfer3() throws InterruptedException {
793 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
794
795 Thread first = newStartedThread(new CheckedRunnable() {
796 public void realRun() throws InterruptedException {
797 q.transfer(four);
798 mustNotContain(q, four);
799 mustEqual(1, q.size());
800 }});
801
802 Thread interruptedThread = newStartedThread(
803 new CheckedInterruptedRunnable() {
804 public void realRun() throws InterruptedException {
805 while (q.isEmpty())
806 Thread.yield();
807 q.transfer(five);
808 }});
809
810 while (q.size() < 2)
811 Thread.yield();
812 mustEqual(2, q.size());
813 assertSame(four, q.poll());
814 first.join();
815 mustEqual(1, q.size());
816 interruptedThread.interrupt();
817 interruptedThread.join();
818 checkEmpty(q);
819 }
820
821 /**
822 * transfer waits until a poll occurs, at which point the polling
823 * thread returns the element
824 */
825 public void testTransfer4() throws InterruptedException {
826 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
827
828 Thread t = newStartedThread(new CheckedRunnable() {
829 public void realRun() throws InterruptedException {
830 q.transfer(four);
831 mustNotContain(q, four);
832 assertSame(three, q.poll());
833 }});
834
835 while (q.isEmpty())
836 Thread.yield();
837 assertFalse(q.isEmpty());
838 mustEqual(1, q.size());
839 assertTrue(q.offer(three));
840 assertSame(four, q.poll());
841 awaitTermination(t);
842 }
843
844 /**
845 * transfer waits until a take occurs. The transferred element
846 * is returned by the associated take.
847 */
848 public void testTransfer5() throws InterruptedException {
849 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
850
851 Thread t = newStartedThread(new CheckedRunnable() {
852 public void realRun() throws InterruptedException {
853 q.transfer(four);
854 checkEmpty(q);
855 }});
856
857 while (q.isEmpty())
858 Thread.yield();
859 assertFalse(q.isEmpty());
860 mustEqual(1, q.size());
861 assertSame(four, q.take());
862 checkEmpty(q);
863 awaitTermination(t);
864 }
865
866 /**
867 * tryTransfer(null) throws NullPointerException
868 */
869 public void testTryTransfer1() {
870 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
871 try {
872 q.tryTransfer(null);
873 shouldThrow();
874 } catch (NullPointerException success) {}
875 }
876
877 /**
878 * tryTransfer returns false and does not enqueue if there are no
879 * consumers waiting to poll or take.
880 */
881 public void testTryTransfer2() throws InterruptedException {
882 final LinkedTransferQueue<Object> q = new LinkedTransferQueue<>();
883 assertFalse(q.tryTransfer(new Object()));
884 assertFalse(q.hasWaitingConsumer());
885 checkEmpty(q);
886 }
887
888 /**
889 * If there is a consumer waiting in timed poll, tryTransfer
890 * returns true while successfully transfering object.
891 */
892 public void testTryTransfer3() throws InterruptedException {
893 final Object hotPotato = new Object();
894 final LinkedTransferQueue<Object> q = new LinkedTransferQueue<>();
895
896 Thread t = newStartedThread(new CheckedRunnable() {
897 public void realRun() {
898 while (! q.hasWaitingConsumer())
899 Thread.yield();
900 assertTrue(q.hasWaitingConsumer());
901 checkEmpty(q);
902 assertTrue(q.tryTransfer(hotPotato));
903 }});
904
905 long startTime = System.nanoTime();
906 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
907 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
908 checkEmpty(q);
909 awaitTermination(t);
910 }
911
912 /**
913 * If there is a consumer waiting in take, tryTransfer returns
914 * true while successfully transfering object.
915 */
916 public void testTryTransfer4() throws InterruptedException {
917 final Object hotPotato = new Object();
918 final LinkedTransferQueue<Object> q = new LinkedTransferQueue<>();
919
920 Thread t = newStartedThread(new CheckedRunnable() {
921 public void realRun() {
922 while (! q.hasWaitingConsumer())
923 Thread.yield();
924 assertTrue(q.hasWaitingConsumer());
925 checkEmpty(q);
926 assertTrue(q.tryTransfer(hotPotato));
927 }});
928
929 assertSame(q.take(), hotPotato);
930 checkEmpty(q);
931 awaitTermination(t);
932 }
933
934 /**
935 * tryTransfer blocks interruptibly if no takers
936 */
937 public void testTryTransfer5() throws InterruptedException {
938 final LinkedTransferQueue<Object> q = new LinkedTransferQueue<>();
939 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
940 assertTrue(q.isEmpty());
941
942 Thread t = newStartedThread(new CheckedRunnable() {
943 public void realRun() throws InterruptedException {
944 Thread.currentThread().interrupt();
945 try {
946 q.tryTransfer(new Object(), randomTimeout(), randomTimeUnit());
947 shouldThrow();
948 } catch (InterruptedException success) {}
949 assertFalse(Thread.interrupted());
950
951 pleaseInterrupt.countDown();
952 try {
953 q.tryTransfer(new Object(), LONGER_DELAY_MS, MILLISECONDS);
954 shouldThrow();
955 } catch (InterruptedException success) {}
956 assertFalse(Thread.interrupted());
957 }});
958
959 await(pleaseInterrupt);
960 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
961 t.interrupt();
962 awaitTermination(t);
963 checkEmpty(q);
964 }
965
966 /**
967 * tryTransfer gives up after the timeout and returns false
968 */
969 public void testTryTransfer6() throws InterruptedException {
970 final LinkedTransferQueue<Object> q = new LinkedTransferQueue<>();
971
972 Thread t = newStartedThread(new CheckedRunnable() {
973 public void realRun() throws InterruptedException {
974 long startTime = System.nanoTime();
975 assertFalse(q.tryTransfer(new Object(),
976 timeoutMillis(), MILLISECONDS));
977 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
978 checkEmpty(q);
979 }});
980
981 awaitTermination(t);
982 checkEmpty(q);
983 }
984
985 /**
986 * tryTransfer waits for any elements previously in to be removed
987 * before transfering to a poll or take
988 */
989 public void testTryTransfer7() throws InterruptedException {
990 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
991 assertTrue(q.offer(four));
992
993 Thread t = newStartedThread(new CheckedRunnable() {
994 public void realRun() throws InterruptedException {
995 long startTime = System.nanoTime();
996 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
997 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
998 checkEmpty(q);
999 }});
1000
1001 while (q.size() != 2)
1002 Thread.yield();
1003 mustEqual(2, q.size());
1004 assertSame(four, q.poll());
1005 assertSame(five, q.poll());
1006 checkEmpty(q);
1007 awaitTermination(t);
1008 }
1009
1010 /**
1011 * tryTransfer attempts to enqueue into the queue and fails
1012 * returning false not enqueueing and the successive poll is null
1013 */
1014 public void testTryTransfer8() throws InterruptedException {
1015 final LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
1016 assertTrue(q.offer(four));
1017 mustEqual(1, q.size());
1018 long startTime = System.nanoTime();
1019 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1020 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1021 mustEqual(1, q.size());
1022 assertSame(four, q.poll());
1023 assertNull(q.poll());
1024 checkEmpty(q);
1025 }
1026
1027 private LinkedTransferQueue<Item> populatedQueue(int n) {
1028 LinkedTransferQueue<Item> q = new LinkedTransferQueue<>();
1029 checkEmpty(q);
1030 for (int i = 0; i < n; i++) {
1031 mustEqual(i, q.size());
1032 mustOffer(q, i);
1033 mustEqual(Integer.MAX_VALUE, q.remainingCapacity());
1034 }
1035 assertFalse(q.isEmpty());
1036 return q;
1037 }
1038
1039 /**
1040 * remove(null), contains(null) always return false
1041 */
1042 public void testNeverContainsNull() {
1043 Collection<?>[] qs = {
1044 new LinkedTransferQueue<>(),
1045 populatedQueue(2),
1046 };
1047
1048 for (Collection<?> q : qs) {
1049 assertFalse(q.contains(null));
1050 assertFalse(q.remove(null));
1051 }
1052 }
1053 }