ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.11
Committed: Sat Dec 31 19:50:56 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.10: +0 -1 lines
Log Message:
remove unused Randoms

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.Queue;
8 import java.util.Random;
9 import java.util.concurrent.ArrayBlockingQueue;
10 import java.util.concurrent.CyclicBarrier;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.ConcurrentLinkedDeque;
13 import java.util.concurrent.ConcurrentLinkedQueue;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingDeque;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.LinkedTransferQueue;
19 import java.util.concurrent.Phaser;
20 import java.util.concurrent.PriorityBlockingQueue;
21
22 public class OfferPollLoops {
23 static final int NCPUS = Runtime.getRuntime().availableProcessors();
24 static final ExecutorService pool = Executors.newCachedThreadPool();
25 static boolean print = false;
26 static int producerSum;
27 static int consumerSum;
28 static synchronized void addProducerSum(int x) {
29 producerSum += x;
30 }
31
32 static synchronized void addConsumerSum(int x) {
33 consumerSum += x;
34 }
35
36 static synchronized void checkSum() {
37 if (producerSum != consumerSum)
38 throw new Error("CheckSum mismatch");
39 }
40
41 // Number of elements passed around -- must be power of two
42 // Elements are reused from pool to minimize alloc impact
43 static final int POOL_SIZE = 1 << 8;
44 static final int POOL_MASK = POOL_SIZE-1;
45 static final Integer[] intPool = new Integer[POOL_SIZE];
46 static {
47 for (int i = 0; i < POOL_SIZE; ++i)
48 intPool[i] = Integer.valueOf(i);
49 }
50
51 // Number of puts by producers or takes by consumers
52 static final int ITERS = 1 << 20;
53
54 // max lag between a producer and consumer to avoid
55 // this becoming a GC test rather than queue test.
56 // Used only per-pair to lessen impact on queue sync
57 static final int LAG_MASK = (1 << 12) - 1;
58
59 public static void main(String[] args) throws Exception {
60 int maxN = NCPUS * 3 / 2;
61
62 if (args.length > 0)
63 maxN = Integer.parseInt(args[0]);
64
65 warmup();
66 print = true;
67 for (int k = 1, i = 1; i <= maxN;) {
68 System.out.println("Pairs:" + i);
69 oneTest(i, ITERS);
70 if (i == k) {
71 k = i << 1;
72 i = i + (i >>> 1);
73 }
74 else
75 i = k;
76 }
77 pool.shutdown();
78 }
79
80 static void warmup() throws Exception {
81 print = false;
82 System.out.print("Warmup ");
83 int it = 2000;
84 for (int j = 5; j > 0; --j) {
85 oneTest(j, it);
86 System.out.print(".");
87 it += 1000;
88 }
89 System.gc();
90 it = 20000;
91 for (int j = 5; j > 0; --j) {
92 oneTest(j, it);
93 System.out.print(".");
94 it += 10000;
95 }
96 System.gc();
97 System.out.println();
98 }
99
100 static void oneTest(int n, int iters) throws Exception {
101 int fairIters = iters/16;
102
103 Thread.sleep(100); // System.gc();
104 if (print)
105 System.out.print("LinkedTransferQueue ");
106 oneRun(new LinkedTransferQueue<Integer>(), n, iters);
107
108 Thread.sleep(100); // System.gc();
109 if (print)
110 System.out.print("ConcurrentLinkedQueue ");
111 oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
112
113 Thread.sleep(100); // System.gc();
114 if (print)
115 System.out.print("ConcurrentLinkedDeque ");
116 oneRun(new ConcurrentLinkedDeque<Integer>(), n, iters);
117
118 Thread.sleep(100); // System.gc();
119 if (print)
120 System.out.print("LinkedBlockingQueue ");
121 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
122
123 Thread.sleep(100); // System.gc();
124 if (print)
125 System.out.print("LinkedBlockingQueue(cap)");
126 oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
127
128 Thread.sleep(100); // System.gc();
129 if (print)
130 System.out.print("LinkedBlockingDeque ");
131 oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
132
133 Thread.sleep(100); // System.gc();
134 if (print)
135 System.out.print("ArrayBlockingQueue ");
136 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
137
138 Thread.sleep(100); // System.gc();
139 if (print)
140 System.out.print("PriorityBlockingQueue ");
141 oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
142
143 Thread.sleep(100); // System.gc();
144 if (print)
145 System.out.print("ArrayBlockingQueue(fair)");
146 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
147 }
148
149 abstract static class Stage implements Runnable {
150 final int iters;
151 final Queue<Integer> queue;
152 final CyclicBarrier barrier;
153 final Phaser lagPhaser;
154 Stage(Queue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
155 queue = q;
156 barrier = b;
157 lagPhaser = s;
158 this.iters = iters;
159 }
160 }
161
162 static class Producer extends Stage {
163 Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
164 int iters) {
165 super(q, b, s, iters);
166 }
167
168 public void run() {
169 try {
170 barrier.await();
171 int ps = 0;
172 int r = hashCode();
173 int i = 0;
174 for (;;) {
175 r = LoopHelpers.compute7(r);
176 Integer v = intPool[r & POOL_MASK];
177 int k = v.intValue();
178 if (queue.offer(v)) {
179 ps += k;
180 ++i;
181 if (i >= iters)
182 break;
183 if ((i & LAG_MASK) == LAG_MASK)
184 lagPhaser.arriveAndAwaitAdvance();
185 }
186 }
187 addProducerSum(ps);
188 barrier.await();
189 }
190 catch (Exception ie) {
191 ie.printStackTrace();
192 return;
193 }
194 }
195 }
196
197 static class Consumer extends Stage {
198 Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
199 int iters) {
200 super(q, b, s, iters);
201 }
202
203 public void run() {
204 try {
205 barrier.await();
206 int cs = 0;
207 int i = 0;
208 for (;;) {
209 Integer v = queue.poll();
210 if (v != null) {
211 int k = v.intValue();
212 cs += k;
213 ++i;
214 if (i >= iters)
215 break;
216 if ((i & LAG_MASK) == LAG_MASK)
217 lagPhaser.arriveAndAwaitAdvance();
218 }
219 }
220 addConsumerSum(cs);
221 barrier.await();
222 }
223 catch (Exception ie) {
224 ie.printStackTrace();
225 return;
226 }
227 }
228
229 }
230
231 static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
232 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
233 CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
234 for (int i = 0; i < n; ++i) {
235 Phaser s = new Phaser(2);
236 pool.execute(new Producer(q, barrier, s, iters));
237 pool.execute(new Consumer(q, barrier, s, iters));
238 }
239 barrier.await();
240 barrier.await();
241 long time = timer.getTime();
242 checkSum();
243 if (print)
244 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
245 }
246
247 }