ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/tck/LinkedBlockingQueueTest.java
Revision: 1.47
Committed: Fri Jul 15 18:49:31 2011 UTC (12 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.46: +1 -22 lines
Log Message:
Robust weak consistency for ArrayBlockingQueue iterators

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 * Other contributors include Andrew Wright, Jeffrey Hayes,
6 * Pat Fisher, Mike Judd.
7 */
8
9 import junit.framework.*;
10 import java.util.Arrays;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Queue;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.ExecutorService;
21 import static java.util.concurrent.TimeUnit.MILLISECONDS;
22
23 public class LinkedBlockingQueueTest extends JSR166TestCase {
24
25 public static class Unbounded extends BlockingQueueTest {
26 protected BlockingQueue emptyCollection() {
27 return new LinkedBlockingQueue();
28 }
29 }
30
31 public static class Bounded extends BlockingQueueTest {
32 protected BlockingQueue emptyCollection() {
33 return new LinkedBlockingQueue(SIZE);
34 }
35 }
36
37 public static void main(String[] args) {
38 junit.textui.TestRunner.run(suite());
39 }
40
41 public static Test suite() {
42 return newTestSuite(LinkedBlockingQueueTest.class,
43 new Unbounded().testSuite(),
44 new Bounded().testSuite());
45 }
46
47 /**
48 * Create a queue of given size containing consecutive
49 * Integers 0 ... n.
50 */
51 private LinkedBlockingQueue<Integer> populatedQueue(int n) {
52 LinkedBlockingQueue<Integer> q =
53 new LinkedBlockingQueue<Integer>(n);
54 assertTrue(q.isEmpty());
55 for (int i = 0; i < n; i++)
56 assertTrue(q.offer(new Integer(i)));
57 assertFalse(q.isEmpty());
58 assertEquals(0, q.remainingCapacity());
59 assertEquals(n, q.size());
60 return q;
61 }
62
63 /**
64 * A new queue has the indicated capacity, or Integer.MAX_VALUE if
65 * none given
66 */
67 public void testConstructor1() {
68 assertEquals(SIZE, new LinkedBlockingQueue(SIZE).remainingCapacity());
69 assertEquals(Integer.MAX_VALUE, new LinkedBlockingQueue().remainingCapacity());
70 }
71
72 /**
73 * Constructor throws IllegalArgumentException if capacity argument nonpositive
74 */
75 public void testConstructor2() {
76 try {
77 new LinkedBlockingQueue(0);
78 shouldThrow();
79 } catch (IllegalArgumentException success) {}
80 }
81
82 /**
83 * Initializing from null Collection throws NullPointerException
84 */
85 public void testConstructor3() {
86 try {
87 new LinkedBlockingQueue(null);
88 shouldThrow();
89 } catch (NullPointerException success) {}
90 }
91
92 /**
93 * Initializing from Collection of null elements throws NullPointerException
94 */
95 public void testConstructor4() {
96 Collection<Integer> elements = Arrays.asList(new Integer[SIZE]);
97 try {
98 new LinkedBlockingQueue(elements);
99 shouldThrow();
100 } catch (NullPointerException success) {}
101 }
102
103 /**
104 * Initializing from Collection with some null elements throws
105 * NullPointerException
106 */
107 public void testConstructor5() {
108 Integer[] ints = new Integer[SIZE];
109 for (int i = 0; i < SIZE-1; ++i)
110 ints[i] = new Integer(i);
111 Collection<Integer> elements = Arrays.asList(ints);
112 try {
113 new LinkedBlockingQueue(elements);
114 shouldThrow();
115 } catch (NullPointerException success) {}
116 }
117
118 /**
119 * Queue contains all elements of collection used to initialize
120 */
121 public void testConstructor6() {
122 Integer[] ints = new Integer[SIZE];
123 for (int i = 0; i < SIZE; ++i)
124 ints[i] = new Integer(i);
125 LinkedBlockingQueue q = new LinkedBlockingQueue(Arrays.asList(ints));
126 for (int i = 0; i < SIZE; ++i)
127 assertEquals(ints[i], q.poll());
128 }
129
130 /**
131 * Queue transitions from empty to full when elements added
132 */
133 public void testEmptyFull() {
134 LinkedBlockingQueue q = new LinkedBlockingQueue(2);
135 assertTrue(q.isEmpty());
136 assertEquals("should have room for 2", 2, q.remainingCapacity());
137 q.add(one);
138 assertFalse(q.isEmpty());
139 q.add(two);
140 assertFalse(q.isEmpty());
141 assertEquals(0, q.remainingCapacity());
142 assertFalse(q.offer(three));
143 }
144
145 /**
146 * remainingCapacity decreases on add, increases on remove
147 */
148 public void testRemainingCapacity() {
149 LinkedBlockingQueue q = populatedQueue(SIZE);
150 for (int i = 0; i < SIZE; ++i) {
151 assertEquals(i, q.remainingCapacity());
152 assertEquals(SIZE-i, q.size());
153 q.remove();
154 }
155 for (int i = 0; i < SIZE; ++i) {
156 assertEquals(SIZE-i, q.remainingCapacity());
157 assertEquals(i, q.size());
158 q.add(new Integer(i));
159 }
160 }
161
162 /**
163 * Offer succeeds if not full; fails if full
164 */
165 public void testOffer() {
166 LinkedBlockingQueue q = new LinkedBlockingQueue(1);
167 assertTrue(q.offer(zero));
168 assertFalse(q.offer(one));
169 }
170
171 /**
172 * add succeeds if not full; throws IllegalStateException if full
173 */
174 public void testAdd() {
175 LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
176 for (int i = 0; i < SIZE; ++i)
177 assertTrue(q.add(new Integer(i)));
178 assertEquals(0, q.remainingCapacity());
179 try {
180 q.add(new Integer(SIZE));
181 shouldThrow();
182 } catch (IllegalStateException success) {}
183 }
184
185 /**
186 * addAll(this) throws IllegalArgumentException
187 */
188 public void testAddAllSelf() {
189 LinkedBlockingQueue q = populatedQueue(SIZE);
190 try {
191 q.addAll(q);
192 shouldThrow();
193 } catch (IllegalArgumentException success) {}
194 }
195
196 /**
197 * addAll of a collection with any null elements throws NPE after
198 * possibly adding some elements
199 */
200 public void testAddAll3() {
201 LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
202 Integer[] ints = new Integer[SIZE];
203 for (int i = 0; i < SIZE-1; ++i)
204 ints[i] = new Integer(i);
205 Collection<Integer> elements = Arrays.asList(ints);
206 try {
207 q.addAll(elements);
208 shouldThrow();
209 } catch (NullPointerException success) {}
210 }
211
212 /**
213 * addAll throws IllegalStateException if not enough room
214 */
215 public void testAddAll4() {
216 LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE - 1);
217 Integer[] ints = new Integer[SIZE];
218 for (int i = 0; i < SIZE; ++i)
219 ints[i] = new Integer(i);
220 Collection<Integer> elements = Arrays.asList(ints);
221 try {
222 q.addAll(elements);
223 shouldThrow();
224 } catch (IllegalStateException success) {}
225 }
226
227 /**
228 * Queue contains all elements, in traversal order, of successful addAll
229 */
230 public void testAddAll5() {
231 Integer[] empty = new Integer[0];
232 Integer[] ints = new Integer[SIZE];
233 for (int i = 0; i < SIZE; ++i)
234 ints[i] = new Integer(i);
235 LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
236 assertFalse(q.addAll(Arrays.asList(empty)));
237 assertTrue(q.addAll(Arrays.asList(ints)));
238 for (int i = 0; i < SIZE; ++i)
239 assertEquals(ints[i], q.poll());
240 }
241
242 /**
243 * all elements successfully put are contained
244 */
245 public void testPut() throws InterruptedException {
246 LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
247 for (int i = 0; i < SIZE; ++i) {
248 Integer I = new Integer(i);
249 q.put(I);
250 assertTrue(q.contains(I));
251 }
252 assertEquals(0, q.remainingCapacity());
253 }
254
255 /**
256 * put blocks interruptibly if full
257 */
258 public void testBlockingPut() throws InterruptedException {
259 final LinkedBlockingQueue q = new LinkedBlockingQueue(SIZE);
260 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
261 Thread t = newStartedThread(new CheckedRunnable() {
262 public void realRun() throws InterruptedException {
263 for (int i = 0; i < SIZE; ++i)
264 q.put(i);
265 assertEquals(SIZE, q.size());
266 assertEquals(0, q.remainingCapacity());
267
268 Thread.currentThread().interrupt();
269 try {
270 q.put(99);
271 shouldThrow();
272 } catch (InterruptedException success) {}
273 assertFalse(Thread.interrupted());
274
275 pleaseInterrupt.countDown();
276 try {
277 q.put(99);
278 shouldThrow();
279 } catch (InterruptedException success) {}
280 assertFalse(Thread.interrupted());
281 }});
282
283 await(pleaseInterrupt);
284 assertThreadStaysAlive(t);
285 t.interrupt();
286 awaitTermination(t);
287 assertEquals(SIZE, q.size());
288 assertEquals(0, q.remainingCapacity());
289 }
290
291 /**
292 * put blocks interruptibly waiting for take when full
293 */
294 public void testPutWithTake() throws InterruptedException {
295 final int capacity = 2;
296 final LinkedBlockingQueue q = new LinkedBlockingQueue(2);
297 final CountDownLatch pleaseTake = new CountDownLatch(1);
298 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
299 Thread t = newStartedThread(new CheckedRunnable() {
300 public void realRun() throws InterruptedException {
301 for (int i = 0; i < capacity; i++)
302 q.put(i);
303 pleaseTake.countDown();
304 q.put(86);
305
306 pleaseInterrupt.countDown();
307 try {
308 q.put(99);
309 shouldThrow();
310 } catch (InterruptedException success) {}
311 assertFalse(Thread.interrupted());
312 }});
313
314 await(pleaseTake);
315 assertEquals(q.remainingCapacity(), 0);
316 assertEquals(0, q.take());
317
318 await(pleaseInterrupt);
319 assertThreadStaysAlive(t);
320 t.interrupt();
321 awaitTermination(t);
322 assertEquals(q.remainingCapacity(), 0);
323 }
324
325 /**
326 * timed offer times out if full and elements not taken
327 */
328 public void testTimedOffer() {
329 final LinkedBlockingQueue q = new LinkedBlockingQueue(2);
330 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
331 Thread t = newStartedThread(new CheckedRunnable() {
332 public void realRun() throws InterruptedException {
333 q.put(new Object());
334 q.put(new Object());
335 long startTime = System.nanoTime();
336 assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
337 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
338 pleaseInterrupt.countDown();
339 try {
340 q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
341 shouldThrow();
342 } catch (InterruptedException success) {}
343 }});
344
345 await(pleaseInterrupt);
346 assertThreadStaysAlive(t);
347 t.interrupt();
348 awaitTermination(t);
349 }
350
351 /**
352 * take retrieves elements in FIFO order
353 */
354 public void testTake() throws InterruptedException {
355 LinkedBlockingQueue q = populatedQueue(SIZE);
356 for (int i = 0; i < SIZE; ++i) {
357 assertEquals(i, q.take());
358 }
359 }
360
361 /**
362 * Take removes existing elements until empty, then blocks interruptibly
363 */
364 public void testBlockingTake() throws InterruptedException {
365 final BlockingQueue q = populatedQueue(SIZE);
366 final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
367 Thread t = newStartedThread(new CheckedRunnable() {
368 public void realRun() throws InterruptedException {
369 for (int i = 0; i < SIZE; ++i) {
370 assertEquals(i, q.take());
371 }
372
373 Thread.currentThread().interrupt();
374 try {
375 q.take();
376 shouldThrow();
377 } catch (InterruptedException success) {}
378 assertFalse(Thread.interrupted());
379
380 pleaseInterrupt.countDown();
381 try {
382 q.take();
383 shouldThrow();
384 } catch (InterruptedException success) {}
385 assertFalse(Thread.interrupted());
386 }});
387
388 await(pleaseInterrupt);
389 assertThreadStaysAlive(t);
390 t.interrupt();
391 awaitTermination(t);
392 }
393
394 /**
395 * poll succeeds unless empty
396 */
397 public void testPoll() {
398 LinkedBlockingQueue q = populatedQueue(SIZE);
399 for (int i = 0; i < SIZE; ++i) {
400 assertEquals(i, q.poll());
401 }
402 assertNull(q.poll());
403 }
404
405 /**
406 * timed poll with zero timeout succeeds when non-empty, else times out
407 */
408 public void testTimedPoll0() throws InterruptedException {
409 LinkedBlockingQueue q = populatedQueue(SIZE);
410 for (int i = 0; i < SIZE; ++i) {
411 assertEquals(i, q.poll(0, MILLISECONDS));
412 }
413 assertNull(q.poll(0, MILLISECONDS));
414 }
415
416 /**
417 * timed poll with nonzero timeout succeeds when non-empty, else times out
418 */
419 public void testTimedPoll() throws InterruptedException {
420 LinkedBlockingQueue<Integer> q = populatedQueue(SIZE);
421 for (int i = 0; i < SIZE; ++i) {
422 long startTime = System.nanoTime();
423 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
424 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS);
425 }
426 long startTime = System.nanoTime();
427 assertNull(q.poll(timeoutMillis(), MILLISECONDS));
428 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
429 checkEmpty(q);
430 }
431
432 /**
433 * Interrupted timed poll throws InterruptedException instead of
434 * returning timeout status
435 */
436 public void testInterruptedTimedPoll() throws InterruptedException {
437 final BlockingQueue<Integer> q = populatedQueue(SIZE);
438 final CountDownLatch aboutToWait = new CountDownLatch(1);
439 Thread t = newStartedThread(new CheckedRunnable() {
440 public void realRun() throws InterruptedException {
441 for (int i = 0; i < SIZE; ++i) {
442 long t0 = System.nanoTime();
443 assertEquals(i, (int) q.poll(LONG_DELAY_MS, MILLISECONDS));
444 assertTrue(millisElapsedSince(t0) < SMALL_DELAY_MS);
445 }
446 long t0 = System.nanoTime();
447 aboutToWait.countDown();
448 try {
449 q.poll(MEDIUM_DELAY_MS, MILLISECONDS);
450 shouldThrow();
451 } catch (InterruptedException success) {
452 assertTrue(millisElapsedSince(t0) < MEDIUM_DELAY_MS);
453 }
454 }});
455
456 aboutToWait.await();
457 waitForThreadToEnterWaitState(t, SMALL_DELAY_MS);
458 t.interrupt();
459 awaitTermination(t, MEDIUM_DELAY_MS);
460 checkEmpty(q);
461 }
462
463 /**
464 * peek returns next element, or null if empty
465 */
466 public void testPeek() {
467 LinkedBlockingQueue q = populatedQueue(SIZE);
468 for (int i = 0; i < SIZE; ++i) {
469 assertEquals(i, q.peek());
470 assertEquals(i, q.poll());
471 assertTrue(q.peek() == null ||
472 !q.peek().equals(i));
473 }
474 assertNull(q.peek());
475 }
476
477 /**
478 * element returns next element, or throws NSEE if empty
479 */
480 public void testElement() {
481 LinkedBlockingQueue q = populatedQueue(SIZE);
482 for (int i = 0; i < SIZE; ++i) {
483 assertEquals(i, q.element());
484 assertEquals(i, q.poll());
485 }
486 try {
487 q.element();
488 shouldThrow();
489 } catch (NoSuchElementException success) {}
490 }
491
492 /**
493 * remove removes next element, or throws NSEE if empty
494 */
495 public void testRemove() {
496 LinkedBlockingQueue q = populatedQueue(SIZE);
497 for (int i = 0; i < SIZE; ++i) {
498 assertEquals(i, q.remove());
499 }
500 try {
501 q.remove();
502 shouldThrow();
503 } catch (NoSuchElementException success) {}
504 }
505
506 /**
507 * An add following remove(x) succeeds
508 */
509 public void testRemoveElementAndAdd() throws InterruptedException {
510 LinkedBlockingQueue q = new LinkedBlockingQueue();
511 assertTrue(q.add(new Integer(1)));
512 assertTrue(q.add(new Integer(2)));
513 assertTrue(q.remove(new Integer(1)));
514 assertTrue(q.remove(new Integer(2)));
515 assertTrue(q.add(new Integer(3)));
516 assertTrue(q.take() != null);
517 }
518
519 /**
520 * contains(x) reports true when elements added but not yet removed
521 */
522 public void testContains() {
523 LinkedBlockingQueue q = populatedQueue(SIZE);
524 for (int i = 0; i < SIZE; ++i) {
525 assertTrue(q.contains(new Integer(i)));
526 q.poll();
527 assertFalse(q.contains(new Integer(i)));
528 }
529 }
530
531 /**
532 * clear removes all elements
533 */
534 public void testClear() {
535 LinkedBlockingQueue q = populatedQueue(SIZE);
536 q.clear();
537 assertTrue(q.isEmpty());
538 assertEquals(0, q.size());
539 assertEquals(SIZE, q.remainingCapacity());
540 q.add(one);
541 assertFalse(q.isEmpty());
542 assertTrue(q.contains(one));
543 q.clear();
544 assertTrue(q.isEmpty());
545 }
546
547 /**
548 * containsAll(c) is true when c contains a subset of elements
549 */
550 public void testContainsAll() {
551 LinkedBlockingQueue q = populatedQueue(SIZE);
552 LinkedBlockingQueue p = new LinkedBlockingQueue(SIZE);
553 for (int i = 0; i < SIZE; ++i) {
554 assertTrue(q.containsAll(p));
555 assertFalse(p.containsAll(q));
556 p.add(new Integer(i));
557 }
558 assertTrue(p.containsAll(q));
559 }
560
561 /**
562 * retainAll(c) retains only those elements of c and reports true if changed
563 */
564 public void testRetainAll() {
565 LinkedBlockingQueue q = populatedQueue(SIZE);
566 LinkedBlockingQueue p = populatedQueue(SIZE);
567 for (int i = 0; i < SIZE; ++i) {
568 boolean changed = q.retainAll(p);
569 if (i == 0)
570 assertFalse(changed);
571 else
572 assertTrue(changed);
573
574 assertTrue(q.containsAll(p));
575 assertEquals(SIZE-i, q.size());
576 p.remove();
577 }
578 }
579
580 /**
581 * removeAll(c) removes only those elements of c and reports true if changed
582 */
583 public void testRemoveAll() {
584 for (int i = 1; i < SIZE; ++i) {
585 LinkedBlockingQueue q = populatedQueue(SIZE);
586 LinkedBlockingQueue p = populatedQueue(i);
587 assertTrue(q.removeAll(p));
588 assertEquals(SIZE-i, q.size());
589 for (int j = 0; j < i; ++j) {
590 Integer I = (Integer)(p.remove());
591 assertFalse(q.contains(I));
592 }
593 }
594 }
595
596 /**
597 * toArray contains all elements in FIFO order
598 */
599 public void testToArray() {
600 LinkedBlockingQueue q = populatedQueue(SIZE);
601 Object[] o = q.toArray();
602 for (int i = 0; i < o.length; i++)
603 assertSame(o[i], q.poll());
604 }
605
606 /**
607 * toArray(a) contains all elements in FIFO order
608 */
609 public void testToArray2() throws InterruptedException {
610 LinkedBlockingQueue<Integer> q = populatedQueue(SIZE);
611 Integer[] ints = new Integer[SIZE];
612 Integer[] array = q.toArray(ints);
613 assertSame(ints, array);
614 for (int i = 0; i < ints.length; i++)
615 assertSame(ints[i], q.poll());
616 }
617
618 /**
619 * toArray(incompatible array type) throws ArrayStoreException
620 */
621 public void testToArray1_BadArg() {
622 LinkedBlockingQueue q = populatedQueue(SIZE);
623 try {
624 q.toArray(new String[10]);
625 shouldThrow();
626 } catch (ArrayStoreException success) {}
627 }
628
629 /**
630 * iterator iterates through all elements
631 */
632 public void testIterator() throws InterruptedException {
633 LinkedBlockingQueue q = populatedQueue(SIZE);
634 Iterator it = q.iterator();
635 while (it.hasNext()) {
636 assertEquals(it.next(), q.take());
637 }
638 }
639
640 /**
641 * iterator.remove removes current element
642 */
643 public void testIteratorRemove() {
644 final LinkedBlockingQueue q = new LinkedBlockingQueue(3);
645 q.add(two);
646 q.add(one);
647 q.add(three);
648
649 Iterator it = q.iterator();
650 it.next();
651 it.remove();
652
653 it = q.iterator();
654 assertSame(it.next(), one);
655 assertSame(it.next(), three);
656 assertFalse(it.hasNext());
657 }
658
659 /**
660 * iterator ordering is FIFO
661 */
662 public void testIteratorOrdering() {
663 final LinkedBlockingQueue q = new LinkedBlockingQueue(3);
664 q.add(one);
665 q.add(two);
666 q.add(three);
667 assertEquals(0, q.remainingCapacity());
668 int k = 0;
669 for (Iterator it = q.iterator(); it.hasNext();) {
670 assertEquals(++k, it.next());
671 }
672 assertEquals(3, k);
673 }
674
675 /**
676 * Modifications do not cause iterators to fail
677 */
678 public void testWeaklyConsistentIteration() {
679 final LinkedBlockingQueue q = new LinkedBlockingQueue(3);
680 q.add(one);
681 q.add(two);
682 q.add(three);
683 for (Iterator it = q.iterator(); it.hasNext();) {
684 q.remove();
685 it.next();
686 }
687 assertEquals(0, q.size());
688 }
689
690 /**
691 * toString contains toStrings of elements
692 */
693 public void testToString() {
694 LinkedBlockingQueue q = populatedQueue(SIZE);
695 String s = q.toString();
696 for (int i = 0; i < SIZE; ++i) {
697 assertTrue(s.contains(String.valueOf(i)));
698 }
699 }
700
701 /**
702 * offer transfers elements across Executor tasks
703 */
704 public void testOfferInExecutor() {
705 final LinkedBlockingQueue q = new LinkedBlockingQueue(2);
706 q.add(one);
707 q.add(two);
708 ExecutorService executor = Executors.newFixedThreadPool(2);
709 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
710 executor.execute(new CheckedRunnable() {
711 public void realRun() throws InterruptedException {
712 assertFalse(q.offer(three));
713 threadsStarted.await();
714 assertTrue(q.offer(three, LONG_DELAY_MS, MILLISECONDS));
715 assertEquals(0, q.remainingCapacity());
716 }});
717
718 executor.execute(new CheckedRunnable() {
719 public void realRun() throws InterruptedException {
720 threadsStarted.await();
721 assertSame(one, q.take());
722 }});
723
724 joinPool(executor);
725 }
726
727 /**
728 * timed poll retrieves elements across Executor threads
729 */
730 public void testPollInExecutor() {
731 final LinkedBlockingQueue q = new LinkedBlockingQueue(2);
732 final CheckedBarrier threadsStarted = new CheckedBarrier(2);
733 ExecutorService executor = Executors.newFixedThreadPool(2);
734 executor.execute(new CheckedRunnable() {
735 public void realRun() throws InterruptedException {
736 assertNull(q.poll());
737 threadsStarted.await();
738 assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
739 checkEmpty(q);
740 }});
741
742 executor.execute(new CheckedRunnable() {
743 public void realRun() throws InterruptedException {
744 threadsStarted.await();
745 q.put(one);
746 }});
747
748 joinPool(executor);
749 }
750
751 /**
752 * A deserialized serialized queue has same elements in same order
753 */
754 public void testSerialization() throws Exception {
755 Queue x = populatedQueue(SIZE);
756 Queue y = serialClone(x);
757
758 assertTrue(x != y);
759 assertEquals(x.size(), y.size());
760 assertEquals(x.toString(), y.toString());
761 assertTrue(Arrays.equals(x.toArray(), y.toArray()));
762 while (!x.isEmpty()) {
763 assertFalse(y.isEmpty());
764 assertEquals(x.remove(), y.remove());
765 }
766 assertTrue(y.isEmpty());
767 }
768
769 /**
770 * drainTo(c) empties queue into another collection c
771 */
772 public void testDrainTo() {
773 LinkedBlockingQueue q = populatedQueue(SIZE);
774 ArrayList l = new ArrayList();
775 q.drainTo(l);
776 assertEquals(q.size(), 0);
777 assertEquals(l.size(), SIZE);
778 for (int i = 0; i < SIZE; ++i)
779 assertEquals(l.get(i), new Integer(i));
780 q.add(zero);
781 q.add(one);
782 assertFalse(q.isEmpty());
783 assertTrue(q.contains(zero));
784 assertTrue(q.contains(one));
785 l.clear();
786 q.drainTo(l);
787 assertEquals(q.size(), 0);
788 assertEquals(l.size(), 2);
789 for (int i = 0; i < 2; ++i)
790 assertEquals(l.get(i), new Integer(i));
791 }
792
793 /**
794 * drainTo empties full queue, unblocking a waiting put.
795 */
796 public void testDrainToWithActivePut() throws InterruptedException {
797 final LinkedBlockingQueue q = populatedQueue(SIZE);
798 Thread t = new Thread(new CheckedRunnable() {
799 public void realRun() throws InterruptedException {
800 q.put(new Integer(SIZE+1));
801 }});
802
803 t.start();
804 ArrayList l = new ArrayList();
805 q.drainTo(l);
806 assertTrue(l.size() >= SIZE);
807 for (int i = 0; i < SIZE; ++i)
808 assertEquals(l.get(i), new Integer(i));
809 t.join();
810 assertTrue(q.size() + l.size() >= SIZE);
811 }
812
813 /**
814 * drainTo(c, n) empties first min(n, size) elements of queue into c
815 */
816 public void testDrainToN() {
817 LinkedBlockingQueue q = new LinkedBlockingQueue();
818 for (int i = 0; i < SIZE + 2; ++i) {
819 for (int j = 0; j < SIZE; j++)
820 assertTrue(q.offer(new Integer(j)));
821 ArrayList l = new ArrayList();
822 q.drainTo(l, i);
823 int k = (i < SIZE) ? i : SIZE;
824 assertEquals(l.size(), k);
825 assertEquals(q.size(), SIZE-k);
826 for (int j = 0; j < k; ++j)
827 assertEquals(l.get(j), new Integer(j));
828 while (q.poll() != null) ;
829 }
830 }
831
832 }