ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/jtreg/util/concurrent/BlockingQueue/SingleProducerMultipleConsumerLoops.java
Revision: 1.18
Committed: Sat Feb 27 21:15:57 2016 UTC (8 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.17: +5 -2 lines
Log Message:
improve jtreg timeout handling, especially -timeout:

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 /*
8 * @test
9 * @bug 4486658
10 * @summary check ordering for blocking queues with 1 producer and multiple consumers
11 * @library /lib/testlibrary/
12 */
13
14 import static java.util.concurrent.TimeUnit.MILLISECONDS;
15 import static java.util.concurrent.TimeUnit.NANOSECONDS;
16
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.CyclicBarrier;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.LinkedBlockingDeque;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.LinkedTransferQueue;
25 import java.util.concurrent.PriorityBlockingQueue;
26 import java.util.concurrent.SynchronousQueue;
27 import jdk.testlibrary.Utils;
28
29 public class SingleProducerMultipleConsumerLoops {
30 static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
31 static ExecutorService pool;
32
33 public static void main(String[] args) throws Exception {
34 final int maxConsumers = (args.length > 0)
35 ? Integer.parseInt(args[0])
36 : 5;
37
38 pool = Executors.newCachedThreadPool();
39 for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
40 // Adjust iterations to limit typical single runs to <= 10 ms;
41 // Notably, fair queues get fewer iters.
42 // Unbounded queues can legitimately OOME if iterations
43 // high enough, but we have a sufficiently low limit here.
44 run(new ArrayBlockingQueue<Integer>(100), i, 1000);
45 run(new LinkedBlockingQueue<Integer>(100), i, 1000);
46 run(new LinkedBlockingDeque<Integer>(100), i, 1000);
47 run(new LinkedTransferQueue<Integer>(), i, 700);
48 run(new PriorityBlockingQueue<Integer>(), i, 1000);
49 run(new SynchronousQueue<Integer>(), i, 300);
50 run(new SynchronousQueue<Integer>(true), i, 200);
51 run(new ArrayBlockingQueue<Integer>(100, true), i, 100);
52 }
53 pool.shutdown();
54 if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
55 throw new Error();
56 pool = null;
57 }
58
59 static void run(BlockingQueue<Integer> queue, int consumers, int iters) throws Exception {
60 new SingleProducerMultipleConsumerLoops(queue, consumers, iters).run();
61 }
62
63 final BlockingQueue<Integer> queue;
64 final int consumers;
65 final int iters;
66 final LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
67 final CyclicBarrier barrier;
68 Throwable fail;
69
70 SingleProducerMultipleConsumerLoops(BlockingQueue<Integer> queue, int consumers, int iters) {
71 this.queue = queue;
72 this.consumers = consumers;
73 this.iters = iters;
74 this.barrier = new CyclicBarrier(consumers + 2, timer);
75 }
76
77 void run() throws Exception {
78 pool.execute(new Producer());
79 for (int i = 0; i < consumers; i++) {
80 pool.execute(new Consumer());
81 }
82 barrier.await();
83 barrier.await();
84 System.out.printf("%s, consumers=%d: %d ms%n",
85 queue.getClass().getSimpleName(), consumers,
86 NANOSECONDS.toMillis(timer.getTime()));
87 if (fail != null) throw new AssertionError(fail);
88 }
89
90 abstract class CheckedRunnable implements Runnable {
91 abstract void realRun() throws Throwable;
92 public final void run() {
93 try {
94 realRun();
95 } catch (Throwable t) {
96 fail = t;
97 t.printStackTrace();
98 throw new AssertionError(t);
99 }
100 }
101 }
102
103 class Producer extends CheckedRunnable {
104 volatile int result;
105 void realRun() throws Throwable {
106 barrier.await();
107 for (int i = 0; i < iters * consumers; i++) {
108 queue.put(new Integer(i));
109 }
110 barrier.await();
111 result = 432;
112 }
113 }
114
115 class Consumer extends CheckedRunnable {
116 volatile int result;
117 void realRun() throws Throwable {
118 barrier.await();
119 int l = 0;
120 int s = 0;
121 int last = -1;
122 for (int i = 0; i < iters; i++) {
123 Integer item = queue.take();
124 int v = item.intValue();
125 if (v < last)
126 throw new Error("Out-of-Order transfer");
127 last = v;
128 l = LoopHelpers.compute1(v);
129 s += l;
130 }
131 barrier.await();
132 result = s;
133 }
134 }
135 }