ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
Revision: 1.13
Committed: Sat Dec 31 19:50:56 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.12: +0 -1 lines
Log Message:
remove unused Randoms

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.3 * Expert Group and released to the public domain, as explained at
4 jsr166 1.8 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7 jsr166 1.12 import java.util.Random;
8     import java.util.concurrent.ArrayBlockingQueue;
9     import java.util.concurrent.BlockingQueue;
10     import java.util.concurrent.CyclicBarrier;
11     import java.util.concurrent.ExecutorService;
12     import java.util.concurrent.Executors;
13     import java.util.concurrent.LinkedBlockingDeque;
14     import java.util.concurrent.LinkedBlockingQueue;
15     import java.util.concurrent.LinkedTransferQueue;
16     import java.util.concurrent.Phaser;
17     import java.util.concurrent.PriorityBlockingQueue;
18     import java.util.concurrent.SynchronousQueue;
19 dl 1.1
20     public class MultipleProducersSingleConsumerLoops {
21 dl 1.4 static final int NCPUS = Runtime.getRuntime().availableProcessors();
22 dl 1.1 static final ExecutorService pool = Executors.newCachedThreadPool();
23     static boolean print = false;
24     static int producerSum;
25     static int consumerSum;
26     static synchronized void addProducerSum(int x) {
27     producerSum += x;
28     }
29    
30     static synchronized void addConsumerSum(int x) {
31     consumerSum += x;
32     }
33    
34     static synchronized void checkSum() {
35     if (producerSum != consumerSum)
36     throw new Error("CheckSum mismatch");
37     }
38    
39 dl 1.4 // Number of elements passed around -- must be power of two
40     // Elements are reused from pool to minimize alloc impact
41     static final int POOL_SIZE = 1 << 8;
42     static final int POOL_MASK = POOL_SIZE-1;
43     static final Integer[] intPool = new Integer[POOL_SIZE];
44     static {
45 jsr166 1.5 for (int i = 0; i < POOL_SIZE; ++i)
46 dl 1.4 intPool[i] = Integer.valueOf(i);
47     }
48    
49     // Number of puts by producers or takes by consumers
50     static final int ITERS = 1 << 20;
51    
52 jsr166 1.5 // max lag between a producer and consumer to avoid
53 dl 1.4 // this becoming a GC test rather than queue test.
54     static final int LAG = (1 << 12);
55     static final int LAG_MASK = LAG - 1;
56    
57 dl 1.1 public static void main(String[] args) throws Exception {
58 dl 1.4 int maxn = 12; // NCPUS * 3 / 2;
59 dl 1.1
60 jsr166 1.5 if (args.length > 0)
61 dl 1.4 maxn = Integer.parseInt(args[0]);
62 dl 1.1
63 dl 1.4 warmup();
64 dl 1.1 print = true;
65 jsr166 1.9 for (int k = 1, i = 1; i <= maxn;) {
66 dl 1.1 System.out.println("Producers:" + i);
67 dl 1.4 oneTest(i, ITERS);
68     if (i == k) {
69     k = i << 1;
70     i = i + (i >>> 1);
71 jsr166 1.5 }
72     else
73 dl 1.4 i = k;
74 dl 1.1 }
75     pool.shutdown();
76 dl 1.4 }
77    
78     static void warmup() throws Exception {
79     print = false;
80     System.out.print("Warmup ");
81     int it = 2000;
82     for (int j = 5; j > 0; --j) {
83 jsr166 1.5 oneTest(j, it);
84 dl 1.4 System.out.print(".");
85     it += 1000;
86     }
87     System.gc();
88     it = 20000;
89     for (int j = 5; j > 0; --j) {
90 jsr166 1.5 oneTest(j, it);
91 dl 1.4 System.out.print(".");
92     it += 10000;
93     }
94     System.gc();
95     System.out.println();
96     }
97 dl 1.1
98 dl 1.4 static void oneTest(int n, int iters) throws Exception {
99     int fairIters = iters/16;
100    
101     Thread.sleep(100); // System.gc();
102 dl 1.1 if (print)
103 dl 1.4 System.out.print("LinkedTransferQueue ");
104     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
105 dl 1.1
106 dl 1.4 Thread.sleep(100); // System.gc();
107 dl 1.1 if (print)
108     System.out.print("LinkedBlockingQueue ");
109 dl 1.4 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
110 dl 1.1
111 dl 1.4 Thread.sleep(100); // System.gc();
112     if (print)
113     System.out.print("LinkedBlockingQueue(cap)");
114     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
115    
116     Thread.sleep(100); // System.gc();
117     if (print)
118     System.out.print("LinkedBlockingDeque ");
119     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
120 jsr166 1.5
121 dl 1.4 Thread.sleep(100); // System.gc();
122     if (print)
123     System.out.print("ArrayBlockingQueue ");
124     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
125 dl 1.1
126 dl 1.4 Thread.sleep(100); // System.gc();
127 dl 1.1 if (print)
128     System.out.print("SynchronousQueue ");
129 dl 1.4 oneRun(new SynchronousQueue<Integer>(), n, iters);
130 dl 1.1
131 dl 1.4 Thread.sleep(100); // System.gc();
132 dl 1.1 if (print)
133 dl 1.2 System.out.print("SynchronousQueue(fair) ");
134 dl 1.4 oneRun(new SynchronousQueue<Integer>(true), n, iters);
135    
136     Thread.sleep(100); // System.gc();
137     if (print)
138     System.out.print("LinkedTransferQueue(xfer)");
139     oneRun(new LTQasSQ<Integer>(), n, iters);
140    
141     Thread.sleep(100); // System.gc();
142     if (print)
143     System.out.print("LinkedTransferQueue(half)");
144     oneRun(new HalfSyncLTQ<Integer>(), n, iters);
145 jsr166 1.5
146 dl 1.4 Thread.sleep(100); // System.gc();
147     if (print)
148     System.out.print("PriorityBlockingQueue ");
149     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
150 dl 1.2
151 dl 1.4 Thread.sleep(100); // System.gc();
152 dl 1.2 if (print)
153 dl 1.1 System.out.print("ArrayBlockingQueue(fair)");
154 dl 1.4 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
155 dl 1.1 }
156 jsr166 1.5
157 jsr166 1.7 abstract static class Stage implements Runnable {
158 dl 1.1 final int iters;
159     final BlockingQueue<Integer> queue;
160     final CyclicBarrier barrier;
161 dl 1.4 final Phaser lagPhaser;
162     final int lag;
163 jsr166 1.6 Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
164     int iters, int lag) {
165 jsr166 1.5 queue = q;
166 dl 1.1 barrier = b;
167 dl 1.4 lagPhaser = s;
168 dl 1.1 this.iters = iters;
169 dl 1.4 this.lag = lag;
170 dl 1.1 }
171     }
172    
173     static class Producer extends Stage {
174 dl 1.4 Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
175     int iters, int lag) {
176     super(q, b, s, iters, lag);
177 dl 1.1 }
178    
179     public void run() {
180     try {
181     barrier.await();
182 dl 1.4 int ps = 0;
183     int r = hashCode();
184     int j = 0;
185 dl 1.1 for (int i = 0; i < iters; ++i) {
186 dl 1.4 r = LoopHelpers.compute7(r);
187     Integer v = intPool[r & POOL_MASK];
188     int k = v.intValue();
189     queue.put(v);
190     ps += k;
191     if (++j == lag) {
192     j = 0;
193     lagPhaser.arriveAndAwaitAdvance();
194     }
195 dl 1.1 }
196 dl 1.4 addProducerSum(ps);
197 dl 1.1 barrier.await();
198     }
199 jsr166 1.5 catch (Exception ie) {
200     ie.printStackTrace();
201     return;
202 dl 1.1 }
203     }
204     }
205    
206     static class Consumer extends Stage {
207 dl 1.4 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
208 jsr166 1.5 int iters, int lag) {
209 dl 1.4 super(q, b, s, iters, lag);
210 dl 1.1 }
211    
212     public void run() {
213     try {
214     barrier.await();
215 dl 1.4 int cs = 0;
216     int j = 0;
217 dl 1.1 for (int i = 0; i < iters; ++i) {
218 dl 1.4 Integer v = queue.take();
219     int k = v.intValue();
220     cs += k;
221     if (++j == lag) {
222     j = 0;
223     lagPhaser.arriveAndAwaitAdvance();
224     }
225 dl 1.1 }
226 dl 1.4 addConsumerSum(cs);
227 dl 1.1 barrier.await();
228     }
229 jsr166 1.5 catch (Exception ie) {
230     ie.printStackTrace();
231     return;
232 dl 1.1 }
233     }
234    
235     }
236    
237 dl 1.4 static void oneRun(BlockingQueue<Integer> q, int n, int iters) throws Exception {
238    
239 dl 1.1 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
240 dl 1.4 CyclicBarrier barrier = new CyclicBarrier(n + 2, timer);
241     Phaser s = new Phaser(n + 1);
242     for (int i = 0; i < n; ++i) {
243     pool.execute(new Producer(q, barrier, s, iters, LAG));
244 dl 1.1 }
245 dl 1.4 pool.execute(new Consumer(q, barrier, s, iters * n, LAG * n));
246 dl 1.1 barrier.await();
247     barrier.await();
248     long time = timer.getTime();
249     checkSum();
250     if (print)
251 dl 1.4 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * (n + 1))) + " ns per transfer");
252     }
253 jsr166 1.5
254 dl 1.4 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
255     LTQasSQ() { super(); }
256     public void put(T x) {
257 jsr166 1.5 try { super.transfer(x);
258 dl 1.4 } catch (InterruptedException ex) { throw new Error(); }
259     }
260 dl 1.1 }
261    
262 dl 1.4 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
263     int calls;
264     HalfSyncLTQ() { super(); }
265     public void put(T x) {
266     if ((++calls & 1) == 0)
267     super.put(x);
268     else {
269 jsr166 1.5 try { super.transfer(x);
270     } catch (InterruptedException ex) {
271     throw new Error();
272 dl 1.4 }
273     }
274     }
275     }
276 dl 1.1 }