ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.10
Committed: Sat Dec 31 19:29:58 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.9: +14 -3 lines
Log Message:
organize imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.Queue;
8 import java.util.Random;
9 import java.util.concurrent.ArrayBlockingQueue;
10 import java.util.concurrent.CyclicBarrier;
11 import java.util.concurrent.BlockingQueue;
12 import java.util.concurrent.ConcurrentLinkedDeque;
13 import java.util.concurrent.ConcurrentLinkedQueue;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingDeque;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.LinkedTransferQueue;
19 import java.util.concurrent.Phaser;
20 import java.util.concurrent.PriorityBlockingQueue;
21
22 public class OfferPollLoops {
23 static final int NCPUS = Runtime.getRuntime().availableProcessors();
24 static final Random rng = new Random();
25 static final ExecutorService pool = Executors.newCachedThreadPool();
26 static boolean print = false;
27 static int producerSum;
28 static int consumerSum;
29 static synchronized void addProducerSum(int x) {
30 producerSum += x;
31 }
32
33 static synchronized void addConsumerSum(int x) {
34 consumerSum += x;
35 }
36
37 static synchronized void checkSum() {
38 if (producerSum != consumerSum)
39 throw new Error("CheckSum mismatch");
40 }
41
42 // Number of elements passed around -- must be power of two
43 // Elements are reused from pool to minimize alloc impact
44 static final int POOL_SIZE = 1 << 8;
45 static final int POOL_MASK = POOL_SIZE-1;
46 static final Integer[] intPool = new Integer[POOL_SIZE];
47 static {
48 for (int i = 0; i < POOL_SIZE; ++i)
49 intPool[i] = Integer.valueOf(i);
50 }
51
52 // Number of puts by producers or takes by consumers
53 static final int ITERS = 1 << 20;
54
55 // max lag between a producer and consumer to avoid
56 // this becoming a GC test rather than queue test.
57 // Used only per-pair to lessen impact on queue sync
58 static final int LAG_MASK = (1 << 12) - 1;
59
60 public static void main(String[] args) throws Exception {
61 int maxN = NCPUS * 3 / 2;
62
63 if (args.length > 0)
64 maxN = Integer.parseInt(args[0]);
65
66 warmup();
67 print = true;
68 for (int k = 1, i = 1; i <= maxN;) {
69 System.out.println("Pairs:" + i);
70 oneTest(i, ITERS);
71 if (i == k) {
72 k = i << 1;
73 i = i + (i >>> 1);
74 }
75 else
76 i = k;
77 }
78 pool.shutdown();
79 }
80
81 static void warmup() throws Exception {
82 print = false;
83 System.out.print("Warmup ");
84 int it = 2000;
85 for (int j = 5; j > 0; --j) {
86 oneTest(j, it);
87 System.out.print(".");
88 it += 1000;
89 }
90 System.gc();
91 it = 20000;
92 for (int j = 5; j > 0; --j) {
93 oneTest(j, it);
94 System.out.print(".");
95 it += 10000;
96 }
97 System.gc();
98 System.out.println();
99 }
100
101 static void oneTest(int n, int iters) throws Exception {
102 int fairIters = iters/16;
103
104 Thread.sleep(100); // System.gc();
105 if (print)
106 System.out.print("LinkedTransferQueue ");
107 oneRun(new LinkedTransferQueue<Integer>(), n, iters);
108
109 Thread.sleep(100); // System.gc();
110 if (print)
111 System.out.print("ConcurrentLinkedQueue ");
112 oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
113
114 Thread.sleep(100); // System.gc();
115 if (print)
116 System.out.print("ConcurrentLinkedDeque ");
117 oneRun(new ConcurrentLinkedDeque<Integer>(), n, iters);
118
119 Thread.sleep(100); // System.gc();
120 if (print)
121 System.out.print("LinkedBlockingQueue ");
122 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
123
124 Thread.sleep(100); // System.gc();
125 if (print)
126 System.out.print("LinkedBlockingQueue(cap)");
127 oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
128
129 Thread.sleep(100); // System.gc();
130 if (print)
131 System.out.print("LinkedBlockingDeque ");
132 oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
133
134 Thread.sleep(100); // System.gc();
135 if (print)
136 System.out.print("ArrayBlockingQueue ");
137 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
138
139 Thread.sleep(100); // System.gc();
140 if (print)
141 System.out.print("PriorityBlockingQueue ");
142 oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
143
144 Thread.sleep(100); // System.gc();
145 if (print)
146 System.out.print("ArrayBlockingQueue(fair)");
147 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
148 }
149
150 abstract static class Stage implements Runnable {
151 final int iters;
152 final Queue<Integer> queue;
153 final CyclicBarrier barrier;
154 final Phaser lagPhaser;
155 Stage(Queue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
156 queue = q;
157 barrier = b;
158 lagPhaser = s;
159 this.iters = iters;
160 }
161 }
162
163 static class Producer extends Stage {
164 Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
165 int iters) {
166 super(q, b, s, iters);
167 }
168
169 public void run() {
170 try {
171 barrier.await();
172 int ps = 0;
173 int r = hashCode();
174 int i = 0;
175 for (;;) {
176 r = LoopHelpers.compute7(r);
177 Integer v = intPool[r & POOL_MASK];
178 int k = v.intValue();
179 if (queue.offer(v)) {
180 ps += k;
181 ++i;
182 if (i >= iters)
183 break;
184 if ((i & LAG_MASK) == LAG_MASK)
185 lagPhaser.arriveAndAwaitAdvance();
186 }
187 }
188 addProducerSum(ps);
189 barrier.await();
190 }
191 catch (Exception ie) {
192 ie.printStackTrace();
193 return;
194 }
195 }
196 }
197
198 static class Consumer extends Stage {
199 Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
200 int iters) {
201 super(q, b, s, iters);
202 }
203
204 public void run() {
205 try {
206 barrier.await();
207 int cs = 0;
208 int i = 0;
209 for (;;) {
210 Integer v = queue.poll();
211 if (v != null) {
212 int k = v.intValue();
213 cs += k;
214 ++i;
215 if (i >= iters)
216 break;
217 if ((i & LAG_MASK) == LAG_MASK)
218 lagPhaser.arriveAndAwaitAdvance();
219 }
220 }
221 addConsumerSum(cs);
222 barrier.await();
223 }
224 catch (Exception ie) {
225 ie.printStackTrace();
226 return;
227 }
228 }
229
230 }
231
232 static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
233 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
234 CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
235 for (int i = 0; i < n; ++i) {
236 Phaser s = new Phaser(2);
237 pool.execute(new Producer(q, barrier, s, iters));
238 pool.execute(new Consumer(q, barrier, s, iters));
239 }
240 barrier.await();
241 barrier.await();
242 long time = timer.getTime();
243 checkSum();
244 if (print)
245 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
246 }
247
248 }