ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedTransferQueueTest.java
Revision: 1.85
Committed: Fri Sep 6 22:47:02 2019 UTC (4 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.84: +1 -4 lines
Log Message:
testTimedPollAfterInterrupt: rely on awaitTermination timeout handling

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include John Vint
6 */
7
8 import static java.util.concurrent.TimeUnit.MILLISECONDS;
9
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.List;
15 import java.util.NoSuchElementException;
16 import java.util.Queue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.LinkedTransferQueue;
23
24 import junit.framework.Test;
25
26 @SuppressWarnings({"unchecked", "rawtypes"})
27 public class LinkedTransferQueueTest extends JSR166TestCase {
28 public static class Generic extends BlockingQueueTest {
29 protected BlockingQueue emptyCollection() {
30 return new LinkedTransferQueue();
31 }
32 }
33
34 public static void main(String[] args) {
35 main(suite(), args);
36 }
37
38 public static Test suite() {
39 class Implementation implements CollectionImplementation {
40 public Class<?> klazz() { return LinkedTransferQueue.class; }
41 public Collection emptyCollection() { return new LinkedTransferQueue(); }
42 public Object makeElement(int i) { return i; }
43 public boolean isConcurrent() { return true; }
44 public boolean permitsNulls() { return false; }
45 }
46 return newTestSuite(LinkedTransferQueueTest.class,
47 new Generic().testSuite(),
48 CollectionTest.testSuite(new Implementation()));
49 }
50
51 /**
52 * Constructor builds new queue with size being zero and empty
53 * being true
54 */
55 public void testConstructor1() {
56 assertEquals(0, new LinkedTransferQueue().size());
57 assertTrue(new LinkedTransferQueue().isEmpty());
58 }
59
60 /**
61 * Initializing constructor with null collection throws
62 * NullPointerException
63 */
64 public void testConstructor2() {
65 try {
66 new LinkedTransferQueue(null);
67 shouldThrow();
68 } catch (NullPointerException success) {}
69 }
70
71 /**
72 * Initializing from Collection of null elements throws
73 * NullPointerException
74 */
75 public void testConstructor3() {
76 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
77 try {
78 new LinkedTransferQueue(elements);
79 shouldThrow();
80 } catch (NullPointerException success) {}
81 }
82
83 /**
84 * Initializing constructor with a collection containing some null elements
85 * throws NullPointerException
86 */
87 public void testConstructor4() {
88 Integer[] ints = new Integer[SIZE];
89 for (int i = 0; i < SIZE - 1; ++i)
90 ints[i] = i;
91 Collection<Integer> elements = Arrays.asList(ints);
92 try {
93 new LinkedTransferQueue(elements);
94 shouldThrow();
95 } catch (NullPointerException success) {}
96 }
97
98 /**
99 * Queue contains all elements of the collection it is initialized by
100 */
101 public void testConstructor5() {
102 Integer[] ints = new Integer[SIZE];
103 for (int i = 0; i < SIZE; ++i) {
104 ints[i] = i;
105 }
106 List intList = Arrays.asList(ints);
107 LinkedTransferQueue q
108 = new LinkedTransferQueue(intList);
109 assertEquals(q.size(), intList.size());
110 assertEquals(q.toString(), intList.toString());
111 assertTrue(Arrays.equals(q.toArray(),
112 intList.toArray()));
113 assertTrue(Arrays.equals(q.toArray(new Object[0]),
114 intList.toArray(new Object[0])));
115 assertTrue(Arrays.equals(q.toArray(new Object[SIZE]),
116 intList.toArray(new Object[SIZE])));
117 for (int i = 0; i < SIZE; ++i) {
118 assertEquals(ints[i], q.poll());
119 }
120 }
121
122 /**
123 * remainingCapacity() always returns Integer.MAX_VALUE
124 */
125 public void testRemainingCapacity() {
126 BlockingQueue q = populatedQueue(SIZE);
127 for (int i = 0; i < SIZE; ++i) {
128 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
129 assertEquals(SIZE - i, q.size());
130 assertEquals(i, q.remove());
131 }
132 for (int i = 0; i < SIZE; ++i) {
133 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
134 assertEquals(i, q.size());
135 assertTrue(q.add(i));
136 }
137 }
138
139 /**
140 * addAll(this) throws IllegalArgumentException
141 */
142 public void testAddAllSelf() {
143 LinkedTransferQueue q = populatedQueue(SIZE);
144 try {
145 q.addAll(q);
146 shouldThrow();
147 } catch (IllegalArgumentException success) {}
148 }
149
150 /**
151 * addAll of a collection with any null elements throws
152 * NullPointerException after possibly adding some elements
153 */
154 public void testAddAll3() {
155 LinkedTransferQueue q = new LinkedTransferQueue();
156 Integer[] ints = new Integer[SIZE];
157 for (int i = 0; i < SIZE - 1; ++i)
158 ints[i] = i;
159 try {
160 q.addAll(Arrays.asList(ints));
161 shouldThrow();
162 } catch (NullPointerException success) {}
163 }
164
165 /**
166 * Queue contains all elements, in traversal order, of successful addAll
167 */
168 public void testAddAll5() {
169 Integer[] empty = new Integer[0];
170 Integer[] ints = new Integer[SIZE];
171 for (int i = 0; i < SIZE; ++i) {
172 ints[i] = i;
173 }
174 LinkedTransferQueue q = new LinkedTransferQueue();
175 assertFalse(q.addAll(Arrays.asList(empty)));
176 assertTrue(q.addAll(Arrays.asList(ints)));
177 for (int i = 0; i < SIZE; ++i) {
178 assertEquals(ints[i], q.poll());
179 }
180 }
181
182 /**
183 * all elements successfully put are contained
184 */
185 public void testPut() {
186 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
187 for (int i = 0; i < SIZE; ++i) {
188 assertEquals(i, q.size());
189 q.put(i);
190 assertTrue(q.contains(i));
191 }
192 }
193
194 /**
195 * take retrieves elements in FIFO order
196 */
197 public void testTake() throws InterruptedException {
198 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
199 for (int i = 0; i < SIZE; ++i) {
200 assertEquals(i, (int) q.take());
201 }
202 }
203
204 /**
205 * take removes existing elements until empty, then blocks interruptibly
206 */
207 public void testBlockingTake() throws InterruptedException {
208 final BlockingQueue q = populatedQueue(SIZE);
209 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
210 Thread t = newStartedThread(new CheckedRunnable() {
211 public void realRun() throws InterruptedException {
212 for (int i = 0; i < SIZE; i++) assertEquals(i, q.take());
213
214 Thread.currentThread().interrupt();
215 try {
216 q.take();
217 shouldThrow();
218 } catch (InterruptedException success) {}
219 assertFalse(Thread.interrupted());
220
221 pleaseInterrupt.countDown();
222 try {
223 q.take();
224 shouldThrow();
225 } catch (InterruptedException success) {}
226 assertFalse(Thread.interrupted());
227 }});
228
229 await(pleaseInterrupt);
230 if (randomBoolean()) assertThreadBlocks(t, Thread.State.WAITING);
231 t.interrupt();
232 awaitTermination(t);
233 }
234
235 /**
236 * poll succeeds unless empty
237 */
238 public void testPoll() throws InterruptedException {
239 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
240 for (int i = 0; i < SIZE; ++i) {
241 assertEquals(i, (int) q.poll());
242 }
243 assertNull(q.poll());
244 checkEmpty(q);
245 }
246
247 /**
248 * timed poll with zero timeout succeeds when non-empty, else times out
249 */
250 public void testTimedPoll0() throws InterruptedException {
251 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
252 for (int i = 0; i < SIZE; ++i) {
253 assertEquals(i, (int) q.poll(0, MILLISECONDS));
254 }
255 assertNull(q.poll(0, MILLISECONDS));
256 checkEmpty(q);
257 }
258
259 /**
260 * timed poll with nonzero timeout succeeds when non-empty, else times out
261 */
262 public void testTimedPoll() throws InterruptedException {
263 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
264 long startTime = System.nanoTime();
265 for (int i = 0; i < SIZE; ++i)
266 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
267 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
268
269 startTime = System.nanoTime();
270 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
271 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
272 checkEmpty(q);
273 }
274
275 /**
276 * Interrupted timed poll throws InterruptedException instead of
277 * returning timeout status
278 */
279 public void testInterruptedTimedPoll() throws InterruptedException {
280 final BlockingQueue<Integer> q = populatedQueue(SIZE);
281 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
282 Thread t = newStartedThread(new CheckedRunnable() {
283 public void realRun() throws InterruptedException {
284 for (int i = 0; i < SIZE; i++)
285 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
286
287 Thread.currentThread().interrupt();
288 try {
289 q.poll(randomTimeout(), randomTimeUnit());
290 shouldThrow();
291 } catch (InterruptedException success) {}
292 assertFalse(Thread.interrupted());
293
294 pleaseInterrupt.countDown();
295 try {
296 q.poll(LONGER_DELAY_MS, MILLISECONDS);
297 shouldThrow();
298 } catch (InterruptedException success) {}
299 assertFalse(Thread.interrupted());
300 }});
301
302 await(pleaseInterrupt);
303 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
304 t.interrupt();
305 awaitTermination(t);
306 checkEmpty(q);
307 }
308
309 /**
310 * timed poll after thread interrupted throws InterruptedException
311 * instead of returning timeout status
312 */
313 public void testTimedPollAfterInterrupt() throws InterruptedException {
314 final BlockingQueue<Integer> q = populatedQueue(SIZE);
315 Thread t = newStartedThread(new CheckedRunnable() {
316 public void realRun() throws InterruptedException {
317 Thread.currentThread().interrupt();
318 for (int i = 0; i < SIZE; ++i)
319 assertEquals(i, (int) q.poll(randomTimeout(), randomTimeUnit()));
320 try {
321 q.poll(randomTimeout(), randomTimeUnit());
322 shouldThrow();
323 } catch (InterruptedException success) {}
324 assertFalse(Thread.interrupted());
325 }});
326
327 awaitTermination(t);
328 checkEmpty(q);
329 }
330
331 /**
332 * peek returns next element, or null if empty
333 */
334 public void testPeek() throws InterruptedException {
335 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
336 for (int i = 0; i < SIZE; ++i) {
337 assertEquals(i, (int) q.peek());
338 assertEquals(i, (int) q.poll());
339 assertTrue(q.peek() == null ||
340 i != (int) q.peek());
341 }
342 assertNull(q.peek());
343 checkEmpty(q);
344 }
345
346 /**
347 * element returns next element, or throws NoSuchElementException if empty
348 */
349 public void testElement() throws InterruptedException {
350 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
351 for (int i = 0; i < SIZE; ++i) {
352 assertEquals(i, (int) q.element());
353 assertEquals(i, (int) q.poll());
354 }
355 try {
356 q.element();
357 shouldThrow();
358 } catch (NoSuchElementException success) {}
359 checkEmpty(q);
360 }
361
362 /**
363 * remove removes next element, or throws NoSuchElementException if empty
364 */
365 public void testRemove() throws InterruptedException {
366 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
367 for (int i = 0; i < SIZE; ++i) {
368 assertEquals(i, (int) q.remove());
369 }
370 try {
371 q.remove();
372 shouldThrow();
373 } catch (NoSuchElementException success) {}
374 checkEmpty(q);
375 }
376
377 /**
378 * An add following remove(x) succeeds
379 */
380 public void testRemoveElementAndAdd() throws InterruptedException {
381 LinkedTransferQueue q = new LinkedTransferQueue();
382 assertTrue(q.add(one));
383 assertTrue(q.add(two));
384 assertTrue(q.remove(one));
385 assertTrue(q.remove(two));
386 assertTrue(q.add(three));
387 assertSame(q.take(), three);
388 }
389
390 /**
391 * contains(x) reports true when elements added but not yet removed
392 */
393 public void testContains() {
394 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
395 for (int i = 0; i < SIZE; ++i) {
396 assertTrue(q.contains(i));
397 assertEquals(i, (int) q.poll());
398 assertFalse(q.contains(i));
399 }
400 }
401
402 /**
403 * clear removes all elements
404 */
405 public void testClear() throws InterruptedException {
406 LinkedTransferQueue q = populatedQueue(SIZE);
407 q.clear();
408 checkEmpty(q);
409 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
410 q.add(one);
411 assertFalse(q.isEmpty());
412 assertEquals(1, q.size());
413 assertTrue(q.contains(one));
414 q.clear();
415 checkEmpty(q);
416 }
417
418 /**
419 * containsAll(c) is true when c contains a subset of elements
420 */
421 public void testContainsAll() {
422 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
423 LinkedTransferQueue<Integer> p = new LinkedTransferQueue<>();
424 for (int i = 0; i < SIZE; ++i) {
425 assertTrue(q.containsAll(p));
426 assertFalse(p.containsAll(q));
427 p.add(i);
428 }
429 assertTrue(p.containsAll(q));
430 }
431
432 /**
433 * retainAll(c) retains only those elements of c and reports true
434 * if changed
435 */
436 public void testRetainAll() {
437 LinkedTransferQueue q = populatedQueue(SIZE);
438 LinkedTransferQueue p = populatedQueue(SIZE);
439 for (int i = 0; i < SIZE; ++i) {
440 boolean changed = q.retainAll(p);
441 if (i == 0) {
442 assertFalse(changed);
443 } else {
444 assertTrue(changed);
445 }
446 assertTrue(q.containsAll(p));
447 assertEquals(SIZE - i, q.size());
448 p.remove();
449 }
450 }
451
452 /**
453 * removeAll(c) removes only those elements of c and reports true
454 * if changed
455 */
456 public void testRemoveAll() {
457 for (int i = 1; i < SIZE; ++i) {
458 LinkedTransferQueue q = populatedQueue(SIZE);
459 LinkedTransferQueue p = populatedQueue(i);
460 assertTrue(q.removeAll(p));
461 assertEquals(SIZE - i, q.size());
462 for (int j = 0; j < i; ++j) {
463 assertFalse(q.contains(p.remove()));
464 }
465 }
466 }
467
468 /**
469 * toArray() contains all elements in FIFO order
470 */
471 public void testToArray() {
472 LinkedTransferQueue q = populatedQueue(SIZE);
473 Object[] a = q.toArray();
474 assertSame(Object[].class, a.getClass());
475 for (Object o : a)
476 assertSame(o, q.poll());
477 assertTrue(q.isEmpty());
478 }
479
480 /**
481 * toArray(a) contains all elements in FIFO order
482 */
483 public void testToArray2() {
484 LinkedTransferQueue<Integer> q = populatedQueue(SIZE);
485 Integer[] ints = new Integer[SIZE];
486 Integer[] array = q.toArray(ints);
487 assertSame(ints, array);
488 for (Integer o : ints)
489 assertSame(o, q.poll());
490 assertTrue(q.isEmpty());
491 }
492
493 /**
494 * toArray(incompatible array type) throws ArrayStoreException
495 */
496 public void testToArray1_BadArg() {
497 LinkedTransferQueue q = populatedQueue(SIZE);
498 try {
499 q.toArray(new String[10]);
500 shouldThrow();
501 } catch (ArrayStoreException success) {}
502 }
503
504 /**
505 * iterator iterates through all elements
506 */
507 public void testIterator() throws InterruptedException {
508 LinkedTransferQueue q = populatedQueue(SIZE);
509 Iterator it = q.iterator();
510 int i;
511 for (i = 0; it.hasNext(); i++)
512 assertTrue(q.contains(it.next()));
513 assertEquals(i, SIZE);
514 assertIteratorExhausted(it);
515
516 it = q.iterator();
517 for (i = 0; it.hasNext(); i++)
518 assertEquals(it.next(), q.take());
519 assertEquals(i, SIZE);
520 assertIteratorExhausted(it);
521 }
522
523 /**
524 * iterator of empty collection has no elements
525 */
526 public void testEmptyIterator() {
527 assertIteratorExhausted(new LinkedTransferQueue().iterator());
528 }
529
530 /**
531 * iterator.remove() removes current element
532 */
533 public void testIteratorRemove() {
534 final LinkedTransferQueue q = new LinkedTransferQueue();
535 q.add(two);
536 q.add(one);
537 q.add(three);
538
539 Iterator it = q.iterator();
540 it.next();
541 it.remove();
542
543 it = q.iterator();
544 assertSame(it.next(), one);
545 assertSame(it.next(), three);
546 assertFalse(it.hasNext());
547 }
548
549 /**
550 * iterator ordering is FIFO
551 */
552 public void testIteratorOrdering() {
553 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
554 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
555 q.add(one);
556 q.add(two);
557 q.add(three);
558 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
559 int k = 0;
560 for (Integer n : q) {
561 assertEquals(++k, (int) n);
562 }
563 assertEquals(3, k);
564 }
565
566 /**
567 * Modifications do not cause iterators to fail
568 */
569 public void testWeaklyConsistentIteration() {
570 final LinkedTransferQueue q = new LinkedTransferQueue();
571 q.add(one);
572 q.add(two);
573 q.add(three);
574 for (Iterator it = q.iterator(); it.hasNext();) {
575 q.remove();
576 it.next();
577 }
578 assertEquals(0, q.size());
579 }
580
581 /**
582 * toString contains toStrings of elements
583 */
584 public void testToString() {
585 LinkedTransferQueue q = populatedQueue(SIZE);
586 String s = q.toString();
587 for (int i = 0; i < SIZE; ++i) {
588 assertTrue(s.contains(String.valueOf(i)));
589 }
590 }
591
592 /**
593 * offer transfers elements across Executor tasks
594 */
595 public void testOfferInExecutor() {
596 final LinkedTransferQueue q = new LinkedTransferQueue();
597 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
598 final ExecutorService executor = Executors.newFixedThreadPool(2);
599 try (PoolCleaner cleaner = cleaner(executor)) {
600
601 executor.execute(new CheckedRunnable() {
602 public void realRun() throws InterruptedException {
603 threadsStarted.await();
604 long startTime = System.nanoTime();
605 assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
606 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
607 }});
608
609 executor.execute(new CheckedRunnable() {
610 public void realRun() throws InterruptedException {
611 threadsStarted.await();
612 assertSame(one, q.take());
613 checkEmpty(q);
614 }});
615 }
616 }
617
618 /**
619 * timed poll retrieves elements across Executor threads
620 */
621 public void testPollInExecutor() {
622 final LinkedTransferQueue q = new LinkedTransferQueue();
623 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
624 final ExecutorService executor = Executors.newFixedThreadPool(2);
625 try (PoolCleaner cleaner = cleaner(executor)) {
626
627 executor.execute(new CheckedRunnable() {
628 public void realRun() throws InterruptedException {
629 assertNull(q.poll());
630 threadsStarted.await();
631 long startTime = System.nanoTime();
632 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
633 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
634 checkEmpty(q);
635 }});
636
637 executor.execute(new CheckedRunnable() {
638 public void realRun() throws InterruptedException {
639 threadsStarted.await();
640 q.put(one);
641 }});
642 }
643 }
644
645 /**
646 * A deserialized/reserialized queue has same elements in same order
647 */
648 public void testSerialization() throws Exception {
649 Queue x = populatedQueue(SIZE);
650 Queue y = serialClone(x);
651
652 assertNotSame(y, x);
653 assertEquals(x.size(), y.size());
654 assertEquals(x.toString(), y.toString());
655 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
656 while (!x.isEmpty()) {
657 assertFalse(y.isEmpty());
658 assertEquals(x.remove(), y.remove());
659 }
660 assertTrue(y.isEmpty());
661 }
662
663 /**
664 * drainTo(c) empties queue into another collection c
665 */
666 public void testDrainTo() {
667 LinkedTransferQueue q = populatedQueue(SIZE);
668 ArrayList l = new ArrayList();
669 q.drainTo(l);
670 assertEquals(0, q.size());
671 assertEquals(SIZE, l.size());
672 for (int i = 0; i < SIZE; ++i) {
673 assertEquals(i, l.get(i));
674 }
675 q.add(zero);
676 q.add(one);
677 assertFalse(q.isEmpty());
678 assertTrue(q.contains(zero));
679 assertTrue(q.contains(one));
680 l.clear();
681 q.drainTo(l);
682 assertEquals(0, q.size());
683 assertEquals(2, l.size());
684 for (int i = 0; i < 2; ++i) {
685 assertEquals(i, l.get(i));
686 }
687 }
688
689 /**
690 * drainTo(c) empties full queue, unblocking a waiting put.
691 */
692 public void testDrainToWithActivePut() throws InterruptedException {
693 final LinkedTransferQueue q = populatedQueue(SIZE);
694 Thread t = newStartedThread(new CheckedRunnable() {
695 public void realRun() {
696 q.put(SIZE + 1);
697 }});
698 ArrayList l = new ArrayList();
699 q.drainTo(l);
700 assertTrue(l.size() >= SIZE);
701 for (int i = 0; i < SIZE; ++i)
702 assertEquals(i, l.get(i));
703 awaitTermination(t);
704 assertTrue(q.size() + l.size() >= SIZE);
705 }
706
707 /**
708 * drainTo(c, n) empties first min(n, size) elements of queue into c
709 */
710 public void testDrainToN() {
711 LinkedTransferQueue q = new LinkedTransferQueue();
712 for (int i = 0; i < SIZE + 2; ++i) {
713 for (int j = 0; j < SIZE; j++) {
714 assertTrue(q.offer(j));
715 }
716 ArrayList l = new ArrayList();
717 q.drainTo(l, i);
718 int k = (i < SIZE) ? i : SIZE;
719 assertEquals(k, l.size());
720 assertEquals(SIZE - k, q.size());
721 for (int j = 0; j < k; ++j)
722 assertEquals(j, l.get(j));
723 do {} while (q.poll() != null);
724 }
725 }
726
727 /**
728 * timed poll() or take() increments the waiting consumer count;
729 * offer(e) decrements the waiting consumer count
730 */
731 public void testWaitingConsumer() throws InterruptedException {
732 final LinkedTransferQueue q = new LinkedTransferQueue();
733 assertEquals(0, q.getWaitingConsumerCount());
734 assertFalse(q.hasWaitingConsumer());
735 final CountDownLatch threadStarted = new CountDownLatch(1);
736
737 Thread t = newStartedThread(new CheckedRunnable() {
738 public void realRun() throws InterruptedException {
739 threadStarted.countDown();
740 long startTime = System.nanoTime();
741 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
742 assertEquals(0, q.getWaitingConsumerCount());
743 assertFalse(q.hasWaitingConsumer());
744 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
745 }});
746
747 threadStarted.await();
748 Callable<Boolean> oneConsumer
749 = new Callable<Boolean>() { public Boolean call() {
750 return q.hasWaitingConsumer()
751 && q.getWaitingConsumerCount() == 1; }};
752 waitForThreadToEnterWaitState(t, oneConsumer);
753
754 assertTrue(q.offer(one));
755 assertEquals(0, q.getWaitingConsumerCount());
756 assertFalse(q.hasWaitingConsumer());
757
758 awaitTermination(t);
759 }
760
761 /**
762 * transfer(null) throws NullPointerException
763 */
764 public void testTransfer1() throws InterruptedException {
765 try {
766 LinkedTransferQueue q = new LinkedTransferQueue();
767 q.transfer(null);
768 shouldThrow();
769 } catch (NullPointerException success) {}
770 }
771
772 /**
773 * transfer waits until a poll occurs. The transferred element
774 * is returned by the associated poll.
775 */
776 public void testTransfer2() throws InterruptedException {
777 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
778 final CountDownLatch threadStarted = new CountDownLatch(1);
779
780 Thread t = newStartedThread(new CheckedRunnable() {
781 public void realRun() throws InterruptedException {
782 threadStarted.countDown();
783 q.transfer(five);
784 checkEmpty(q);
785 }});
786
787 threadStarted.await();
788 Callable<Boolean> oneElement
789 = new Callable<Boolean>() { public Boolean call() {
790 return !q.isEmpty() && q.size() == 1; }};
791 waitForThreadToEnterWaitState(t, oneElement);
792
793 assertSame(five, q.poll());
794 checkEmpty(q);
795 awaitTermination(t);
796 }
797
798 /**
799 * transfer waits until a poll occurs, and then transfers in fifo order
800 */
801 public void testTransfer3() throws InterruptedException {
802 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
803
804 Thread first = newStartedThread(new CheckedRunnable() {
805 public void realRun() throws InterruptedException {
806 q.transfer(four);
807 assertFalse(q.contains(four));
808 assertEquals(1, q.size());
809 }});
810
811 Thread interruptedThread = newStartedThread(
812 new CheckedInterruptedRunnable() {
813 public void realRun() throws InterruptedException {
814 while (q.isEmpty())
815 Thread.yield();
816 q.transfer(five);
817 }});
818
819 while (q.size() < 2)
820 Thread.yield();
821 assertEquals(2, q.size());
822 assertSame(four, q.poll());
823 first.join();
824 assertEquals(1, q.size());
825 interruptedThread.interrupt();
826 interruptedThread.join();
827 checkEmpty(q);
828 }
829
830 /**
831 * transfer waits until a poll occurs, at which point the polling
832 * thread returns the element
833 */
834 public void testTransfer4() throws InterruptedException {
835 final LinkedTransferQueue q = new LinkedTransferQueue();
836
837 Thread t = newStartedThread(new CheckedRunnable() {
838 public void realRun() throws InterruptedException {
839 q.transfer(four);
840 assertFalse(q.contains(four));
841 assertSame(three, q.poll());
842 }});
843
844 while (q.isEmpty())
845 Thread.yield();
846 assertFalse(q.isEmpty());
847 assertEquals(1, q.size());
848 assertTrue(q.offer(three));
849 assertSame(four, q.poll());
850 awaitTermination(t);
851 }
852
853 /**
854 * transfer waits until a take occurs. The transferred element
855 * is returned by the associated take.
856 */
857 public void testTransfer5() throws InterruptedException {
858 final LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
859
860 Thread t = newStartedThread(new CheckedRunnable() {
861 public void realRun() throws InterruptedException {
862 q.transfer(four);
863 checkEmpty(q);
864 }});
865
866 while (q.isEmpty())
867 Thread.yield();
868 assertFalse(q.isEmpty());
869 assertEquals(1, q.size());
870 assertSame(four, q.take());
871 checkEmpty(q);
872 awaitTermination(t);
873 }
874
875 /**
876 * tryTransfer(null) throws NullPointerException
877 */
878 public void testTryTransfer1() {
879 final LinkedTransferQueue q = new LinkedTransferQueue();
880 try {
881 q.tryTransfer(null);
882 shouldThrow();
883 } catch (NullPointerException success) {}
884 }
885
886 /**
887 * tryTransfer returns false and does not enqueue if there are no
888 * consumers waiting to poll or take.
889 */
890 public void testTryTransfer2() throws InterruptedException {
891 final LinkedTransferQueue q = new LinkedTransferQueue();
892 assertFalse(q.tryTransfer(new Object()));
893 assertFalse(q.hasWaitingConsumer());
894 checkEmpty(q);
895 }
896
897 /**
898 * If there is a consumer waiting in timed poll, tryTransfer
899 * returns true while successfully transfering object.
900 */
901 public void testTryTransfer3() throws InterruptedException {
902 final Object hotPotato = new Object();
903 final LinkedTransferQueue q = new LinkedTransferQueue();
904
905 Thread t = newStartedThread(new CheckedRunnable() {
906 public void realRun() {
907 while (! q.hasWaitingConsumer())
908 Thread.yield();
909 assertTrue(q.hasWaitingConsumer());
910 checkEmpty(q);
911 assertTrue(q.tryTransfer(hotPotato));
912 }});
913
914 long startTime = System.nanoTime();
915 assertSame(hotPotato, q.poll(LONG_DELAY_MS, MILLISECONDS));
916 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
917 checkEmpty(q);
918 awaitTermination(t);
919 }
920
921 /**
922 * If there is a consumer waiting in take, tryTransfer returns
923 * true while successfully transfering object.
924 */
925 public void testTryTransfer4() throws InterruptedException {
926 final Object hotPotato = new Object();
927 final LinkedTransferQueue q = new LinkedTransferQueue();
928
929 Thread t = newStartedThread(new CheckedRunnable() {
930 public void realRun() {
931 while (! q.hasWaitingConsumer())
932 Thread.yield();
933 assertTrue(q.hasWaitingConsumer());
934 checkEmpty(q);
935 assertTrue(q.tryTransfer(hotPotato));
936 }});
937
938 assertSame(q.take(), hotPotato);
939 checkEmpty(q);
940 awaitTermination(t);
941 }
942
943 /**
944 * tryTransfer blocks interruptibly if no takers
945 */
946 public void testTryTransfer5() throws InterruptedException {
947 final LinkedTransferQueue q = new LinkedTransferQueue();
948 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
949 assertTrue(q.isEmpty());
950
951 Thread t = newStartedThread(new CheckedRunnable() {
952 public void realRun() throws InterruptedException {
953 Thread.currentThread().interrupt();
954 try {
955 q.tryTransfer(new Object(), randomTimeout(), randomTimeUnit());
956 shouldThrow();
957 } catch (InterruptedException success) {}
958 assertFalse(Thread.interrupted());
959
960 pleaseInterrupt.countDown();
961 try {
962 q.tryTransfer(new Object(), LONGER_DELAY_MS, MILLISECONDS);
963 shouldThrow();
964 } catch (InterruptedException success) {}
965 assertFalse(Thread.interrupted());
966 }});
967
968 await(pleaseInterrupt);
969 if (randomBoolean()) assertThreadBlocks(t, Thread.State.TIMED_WAITING);
970 t.interrupt();
971 awaitTermination(t);
972 checkEmpty(q);
973 }
974
975 /**
976 * tryTransfer gives up after the timeout and returns false
977 */
978 public void testTryTransfer6() throws InterruptedException {
979 final LinkedTransferQueue q = new LinkedTransferQueue();
980
981 Thread t = newStartedThread(new CheckedRunnable() {
982 public void realRun() throws InterruptedException {
983 long startTime = System.nanoTime();
984 assertFalse(q.tryTransfer(new Object(),
985 timeoutMillis(), MILLISECONDS));
986 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
987 checkEmpty(q);
988 }});
989
990 awaitTermination(t);
991 checkEmpty(q);
992 }
993
994 /**
995 * tryTransfer waits for any elements previously in to be removed
996 * before transfering to a poll or take
997 */
998 public void testTryTransfer7() throws InterruptedException {
999 final LinkedTransferQueue q = new LinkedTransferQueue();
1000 assertTrue(q.offer(four));
1001
1002 Thread t = newStartedThread(new CheckedRunnable() {
1003 public void realRun() throws InterruptedException {
1004 long startTime = System.nanoTime();
1005 assertTrue(q.tryTransfer(five, LONG_DELAY_MS, MILLISECONDS));
1006 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
1007 checkEmpty(q);
1008 }});
1009
1010 while (q.size() != 2)
1011 Thread.yield();
1012 assertEquals(2, q.size());
1013 assertSame(four, q.poll());
1014 assertSame(five, q.poll());
1015 checkEmpty(q);
1016 awaitTermination(t);
1017 }
1018
1019 /**
1020 * tryTransfer attempts to enqueue into the queue and fails
1021 * returning false not enqueueing and the successive poll is null
1022 */
1023 public void testTryTransfer8() throws InterruptedException {
1024 final LinkedTransferQueue q = new LinkedTransferQueue();
1025 assertTrue(q.offer(four));
1026 assertEquals(1, q.size());
1027 long startTime = System.nanoTime();
1028 assertFalse(q.tryTransfer(five, timeoutMillis(), MILLISECONDS));
1029 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
1030 assertEquals(1, q.size());
1031 assertSame(four, q.poll());
1032 assertNull(q.poll());
1033 checkEmpty(q);
1034 }
1035
1036 private LinkedTransferQueue<Integer> populatedQueue(int n) {
1037 LinkedTransferQueue<Integer> q = new LinkedTransferQueue<>();
1038 checkEmpty(q);
1039 for (int i = 0; i < n; i++) {
1040 assertEquals(i, q.size());
1041 assertTrue(q.offer(i));
1042 assertEquals(Integer.MAX_VALUE, q.remainingCapacity());
1043 }
1044 assertFalse(q.isEmpty());
1045 return q;
1046 }
1047
1048 /**
1049 * remove(null), contains(null) always return false
1050 */
1051 public void testNeverContainsNull() {
1052 Collection<?>[] qs = {
1053 new LinkedTransferQueue<Object>(),
1054 populatedQueue(2),
1055 };
1056
1057 for (Collection<?> q : qs) {
1058 assertFalse(q.contains(null));
1059 assertFalse(q.remove(null));
1060 }
1061 }
1062 }