ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
Revision: 1.12
Committed: Sat Dec 31 19:40:49 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.11: +12 -3 lines
Log Message:
organize imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.Random;
8 import java.util.concurrent.ArrayBlockingQueue;
9 import java.util.concurrent.BlockingQueue;
10 import java.util.concurrent.CyclicBarrier;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.LinkedBlockingDeque;
14 import java.util.concurrent.LinkedBlockingQueue;
15 import java.util.concurrent.LinkedTransferQueue;
16 import java.util.concurrent.Phaser;
17 import java.util.concurrent.PriorityBlockingQueue;
18 import java.util.concurrent.SynchronousQueue;
19
20 public class MultipleProducersSingleConsumerLoops {
21 static final int NCPUS = Runtime.getRuntime().availableProcessors();
22 static final Random rng = new Random();
23 static final ExecutorService pool = Executors.newCachedThreadPool();
24 static boolean print = false;
25 static int producerSum;
26 static int consumerSum;
27 static synchronized void addProducerSum(int x) {
28 producerSum += x;
29 }
30
31 static synchronized void addConsumerSum(int x) {
32 consumerSum += x;
33 }
34
35 static synchronized void checkSum() {
36 if (producerSum != consumerSum)
37 throw new Error("CheckSum mismatch");
38 }
39
40 // Number of elements passed around -- must be power of two
41 // Elements are reused from pool to minimize alloc impact
42 static final int POOL_SIZE = 1 << 8;
43 static final int POOL_MASK = POOL_SIZE-1;
44 static final Integer[] intPool = new Integer[POOL_SIZE];
45 static {
46 for (int i = 0; i < POOL_SIZE; ++i)
47 intPool[i] = Integer.valueOf(i);
48 }
49
50 // Number of puts by producers or takes by consumers
51 static final int ITERS = 1 << 20;
52
53 // max lag between a producer and consumer to avoid
54 // this becoming a GC test rather than queue test.
55 static final int LAG = (1 << 12);
56 static final int LAG_MASK = LAG - 1;
57
58 public static void main(String[] args) throws Exception {
59 int maxn = 12; // NCPUS * 3 / 2;
60
61 if (args.length > 0)
62 maxn = Integer.parseInt(args[0]);
63
64 warmup();
65 print = true;
66 for (int k = 1, i = 1; i <= maxn;) {
67 System.out.println("Producers:" + i);
68 oneTest(i, ITERS);
69 if (i == k) {
70 k = i << 1;
71 i = i + (i >>> 1);
72 }
73 else
74 i = k;
75 }
76 pool.shutdown();
77 }
78
79 static void warmup() throws Exception {
80 print = false;
81 System.out.print("Warmup ");
82 int it = 2000;
83 for (int j = 5; j > 0; --j) {
84 oneTest(j, it);
85 System.out.print(".");
86 it += 1000;
87 }
88 System.gc();
89 it = 20000;
90 for (int j = 5; j > 0; --j) {
91 oneTest(j, it);
92 System.out.print(".");
93 it += 10000;
94 }
95 System.gc();
96 System.out.println();
97 }
98
99 static void oneTest(int n, int iters) throws Exception {
100 int fairIters = iters/16;
101
102 Thread.sleep(100); // System.gc();
103 if (print)
104 System.out.print("LinkedTransferQueue ");
105 oneRun(new LinkedTransferQueue<Integer>(), n, iters);
106
107 Thread.sleep(100); // System.gc();
108 if (print)
109 System.out.print("LinkedBlockingQueue ");
110 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
111
112 Thread.sleep(100); // System.gc();
113 if (print)
114 System.out.print("LinkedBlockingQueue(cap)");
115 oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
116
117 Thread.sleep(100); // System.gc();
118 if (print)
119 System.out.print("LinkedBlockingDeque ");
120 oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
121
122 Thread.sleep(100); // System.gc();
123 if (print)
124 System.out.print("ArrayBlockingQueue ");
125 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
126
127 Thread.sleep(100); // System.gc();
128 if (print)
129 System.out.print("SynchronousQueue ");
130 oneRun(new SynchronousQueue<Integer>(), n, iters);
131
132 Thread.sleep(100); // System.gc();
133 if (print)
134 System.out.print("SynchronousQueue(fair) ");
135 oneRun(new SynchronousQueue<Integer>(true), n, iters);
136
137 Thread.sleep(100); // System.gc();
138 if (print)
139 System.out.print("LinkedTransferQueue(xfer)");
140 oneRun(new LTQasSQ<Integer>(), n, iters);
141
142 Thread.sleep(100); // System.gc();
143 if (print)
144 System.out.print("LinkedTransferQueue(half)");
145 oneRun(new HalfSyncLTQ<Integer>(), n, iters);
146
147 Thread.sleep(100); // System.gc();
148 if (print)
149 System.out.print("PriorityBlockingQueue ");
150 oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
151
152 Thread.sleep(100); // System.gc();
153 if (print)
154 System.out.print("ArrayBlockingQueue(fair)");
155 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
156 }
157
158 abstract static class Stage implements Runnable {
159 final int iters;
160 final BlockingQueue<Integer> queue;
161 final CyclicBarrier barrier;
162 final Phaser lagPhaser;
163 final int lag;
164 Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
165 int iters, int lag) {
166 queue = q;
167 barrier = b;
168 lagPhaser = s;
169 this.iters = iters;
170 this.lag = lag;
171 }
172 }
173
174 static class Producer extends Stage {
175 Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
176 int iters, int lag) {
177 super(q, b, s, iters, lag);
178 }
179
180 public void run() {
181 try {
182 barrier.await();
183 int ps = 0;
184 int r = hashCode();
185 int j = 0;
186 for (int i = 0; i < iters; ++i) {
187 r = LoopHelpers.compute7(r);
188 Integer v = intPool[r & POOL_MASK];
189 int k = v.intValue();
190 queue.put(v);
191 ps += k;
192 if (++j == lag) {
193 j = 0;
194 lagPhaser.arriveAndAwaitAdvance();
195 }
196 }
197 addProducerSum(ps);
198 barrier.await();
199 }
200 catch (Exception ie) {
201 ie.printStackTrace();
202 return;
203 }
204 }
205 }
206
207 static class Consumer extends Stage {
208 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
209 int iters, int lag) {
210 super(q, b, s, iters, lag);
211 }
212
213 public void run() {
214 try {
215 barrier.await();
216 int cs = 0;
217 int j = 0;
218 for (int i = 0; i < iters; ++i) {
219 Integer v = queue.take();
220 int k = v.intValue();
221 cs += k;
222 if (++j == lag) {
223 j = 0;
224 lagPhaser.arriveAndAwaitAdvance();
225 }
226 }
227 addConsumerSum(cs);
228 barrier.await();
229 }
230 catch (Exception ie) {
231 ie.printStackTrace();
232 return;
233 }
234 }
235
236 }
237
238 static void oneRun(BlockingQueue<Integer> q, int n, int iters) throws Exception {
239
240 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
241 CyclicBarrier barrier = new CyclicBarrier(n + 2, timer);
242 Phaser s = new Phaser(n + 1);
243 for (int i = 0; i < n; ++i) {
244 pool.execute(new Producer(q, barrier, s, iters, LAG));
245 }
246 pool.execute(new Consumer(q, barrier, s, iters * n, LAG * n));
247 barrier.await();
248 barrier.await();
249 long time = timer.getTime();
250 checkSum();
251 if (print)
252 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * (n + 1))) + " ns per transfer");
253 }
254
255 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
256 LTQasSQ() { super(); }
257 public void put(T x) {
258 try { super.transfer(x);
259 } catch (InterruptedException ex) { throw new Error(); }
260 }
261 }
262
263 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
264 int calls;
265 HalfSyncLTQ() { super(); }
266 public void put(T x) {
267 if ((++calls & 1) == 0)
268 super.put(x);
269 else {
270 try { super.transfer(x);
271 } catch (InterruptedException ex) {
272 throw new Error();
273 }
274 }
275 }
276 }
277 }