ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.12
Committed: Sat Dec 31 19:54:17 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.11: +0 -1 lines
Log Message:
remove unused Randoms

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.Queue;
8 import java.util.concurrent.ArrayBlockingQueue;
9 import java.util.concurrent.CyclicBarrier;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.ConcurrentLinkedDeque;
12 import java.util.concurrent.ConcurrentLinkedQueue;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.LinkedBlockingDeque;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.LinkedTransferQueue;
18 import java.util.concurrent.Phaser;
19 import java.util.concurrent.PriorityBlockingQueue;
20
21 public class OfferPollLoops {
22 static final int NCPUS = Runtime.getRuntime().availableProcessors();
23 static final ExecutorService pool = Executors.newCachedThreadPool();
24 static boolean print = false;
25 static int producerSum;
26 static int consumerSum;
27 static synchronized void addProducerSum(int x) {
28 producerSum += x;
29 }
30
31 static synchronized void addConsumerSum(int x) {
32 consumerSum += x;
33 }
34
35 static synchronized void checkSum() {
36 if (producerSum != consumerSum)
37 throw new Error("CheckSum mismatch");
38 }
39
40 // Number of elements passed around -- must be power of two
41 // Elements are reused from pool to minimize alloc impact
42 static final int POOL_SIZE = 1 << 8;
43 static final int POOL_MASK = POOL_SIZE-1;
44 static final Integer[] intPool = new Integer[POOL_SIZE];
45 static {
46 for (int i = 0; i < POOL_SIZE; ++i)
47 intPool[i] = Integer.valueOf(i);
48 }
49
50 // Number of puts by producers or takes by consumers
51 static final int ITERS = 1 << 20;
52
53 // max lag between a producer and consumer to avoid
54 // this becoming a GC test rather than queue test.
55 // Used only per-pair to lessen impact on queue sync
56 static final int LAG_MASK = (1 << 12) - 1;
57
58 public static void main(String[] args) throws Exception {
59 int maxN = NCPUS * 3 / 2;
60
61 if (args.length > 0)
62 maxN = Integer.parseInt(args[0]);
63
64 warmup();
65 print = true;
66 for (int k = 1, i = 1; i <= maxN;) {
67 System.out.println("Pairs:" + i);
68 oneTest(i, ITERS);
69 if (i == k) {
70 k = i << 1;
71 i = i + (i >>> 1);
72 }
73 else
74 i = k;
75 }
76 pool.shutdown();
77 }
78
79 static void warmup() throws Exception {
80 print = false;
81 System.out.print("Warmup ");
82 int it = 2000;
83 for (int j = 5; j > 0; --j) {
84 oneTest(j, it);
85 System.out.print(".");
86 it += 1000;
87 }
88 System.gc();
89 it = 20000;
90 for (int j = 5; j > 0; --j) {
91 oneTest(j, it);
92 System.out.print(".");
93 it += 10000;
94 }
95 System.gc();
96 System.out.println();
97 }
98
99 static void oneTest(int n, int iters) throws Exception {
100 int fairIters = iters/16;
101
102 Thread.sleep(100); // System.gc();
103 if (print)
104 System.out.print("LinkedTransferQueue ");
105 oneRun(new LinkedTransferQueue<Integer>(), n, iters);
106
107 Thread.sleep(100); // System.gc();
108 if (print)
109 System.out.print("ConcurrentLinkedQueue ");
110 oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
111
112 Thread.sleep(100); // System.gc();
113 if (print)
114 System.out.print("ConcurrentLinkedDeque ");
115 oneRun(new ConcurrentLinkedDeque<Integer>(), n, iters);
116
117 Thread.sleep(100); // System.gc();
118 if (print)
119 System.out.print("LinkedBlockingQueue ");
120 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
121
122 Thread.sleep(100); // System.gc();
123 if (print)
124 System.out.print("LinkedBlockingQueue(cap)");
125 oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
126
127 Thread.sleep(100); // System.gc();
128 if (print)
129 System.out.print("LinkedBlockingDeque ");
130 oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
131
132 Thread.sleep(100); // System.gc();
133 if (print)
134 System.out.print("ArrayBlockingQueue ");
135 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
136
137 Thread.sleep(100); // System.gc();
138 if (print)
139 System.out.print("PriorityBlockingQueue ");
140 oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
141
142 Thread.sleep(100); // System.gc();
143 if (print)
144 System.out.print("ArrayBlockingQueue(fair)");
145 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
146 }
147
148 abstract static class Stage implements Runnable {
149 final int iters;
150 final Queue<Integer> queue;
151 final CyclicBarrier barrier;
152 final Phaser lagPhaser;
153 Stage(Queue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
154 queue = q;
155 barrier = b;
156 lagPhaser = s;
157 this.iters = iters;
158 }
159 }
160
161 static class Producer extends Stage {
162 Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
163 int iters) {
164 super(q, b, s, iters);
165 }
166
167 public void run() {
168 try {
169 barrier.await();
170 int ps = 0;
171 int r = hashCode();
172 int i = 0;
173 for (;;) {
174 r = LoopHelpers.compute7(r);
175 Integer v = intPool[r & POOL_MASK];
176 int k = v.intValue();
177 if (queue.offer(v)) {
178 ps += k;
179 ++i;
180 if (i >= iters)
181 break;
182 if ((i & LAG_MASK) == LAG_MASK)
183 lagPhaser.arriveAndAwaitAdvance();
184 }
185 }
186 addProducerSum(ps);
187 barrier.await();
188 }
189 catch (Exception ie) {
190 ie.printStackTrace();
191 return;
192 }
193 }
194 }
195
196 static class Consumer extends Stage {
197 Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
198 int iters) {
199 super(q, b, s, iters);
200 }
201
202 public void run() {
203 try {
204 barrier.await();
205 int cs = 0;
206 int i = 0;
207 for (;;) {
208 Integer v = queue.poll();
209 if (v != null) {
210 int k = v.intValue();
211 cs += k;
212 ++i;
213 if (i >= iters)
214 break;
215 if ((i & LAG_MASK) == LAG_MASK)
216 lagPhaser.arriveAndAwaitAdvance();
217 }
218 }
219 addConsumerSum(cs);
220 barrier.await();
221 }
222 catch (Exception ie) {
223 ie.printStackTrace();
224 return;
225 }
226 }
227
228 }
229
230 static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
231 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
232 CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
233 for (int i = 0; i < n; ++i) {
234 Phaser s = new Phaser(2);
235 pool.execute(new Producer(q, barrier, s, iters));
236 pool.execute(new Consumer(q, barrier, s, iters));
237 }
238 barrier.await();
239 barrier.await();
240 long time = timer.getTime();
241 checkSum();
242 if (print)
243 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
244 }
245
246 }