ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.10
Committed: Sat Dec 31 19:29:58 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.9: +14 -3 lines
Log Message:
organize imports

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4 jsr166 1.6 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7 jsr166 1.10 import java.util.Queue;
8     import java.util.Random;
9     import java.util.concurrent.ArrayBlockingQueue;
10     import java.util.concurrent.CyclicBarrier;
11     import java.util.concurrent.BlockingQueue;
12     import java.util.concurrent.ConcurrentLinkedDeque;
13     import java.util.concurrent.ConcurrentLinkedQueue;
14     import java.util.concurrent.ExecutorService;
15     import java.util.concurrent.Executors;
16     import java.util.concurrent.LinkedBlockingDeque;
17     import java.util.concurrent.LinkedBlockingQueue;
18     import java.util.concurrent.LinkedTransferQueue;
19     import java.util.concurrent.Phaser;
20     import java.util.concurrent.PriorityBlockingQueue;
21 dl 1.1
22     public class OfferPollLoops {
23     static final int NCPUS = Runtime.getRuntime().availableProcessors();
24     static final Random rng = new Random();
25     static final ExecutorService pool = Executors.newCachedThreadPool();
26     static boolean print = false;
27     static int producerSum;
28     static int consumerSum;
29     static synchronized void addProducerSum(int x) {
30     producerSum += x;
31     }
32    
33     static synchronized void addConsumerSum(int x) {
34     consumerSum += x;
35     }
36    
37     static synchronized void checkSum() {
38     if (producerSum != consumerSum)
39     throw new Error("CheckSum mismatch");
40     }
41    
42     // Number of elements passed around -- must be power of two
43     // Elements are reused from pool to minimize alloc impact
44     static final int POOL_SIZE = 1 << 8;
45     static final int POOL_MASK = POOL_SIZE-1;
46     static final Integer[] intPool = new Integer[POOL_SIZE];
47     static {
48 jsr166 1.2 for (int i = 0; i < POOL_SIZE; ++i)
49 dl 1.1 intPool[i] = Integer.valueOf(i);
50     }
51    
52     // Number of puts by producers or takes by consumers
53     static final int ITERS = 1 << 20;
54    
55 jsr166 1.2 // max lag between a producer and consumer to avoid
56 dl 1.1 // this becoming a GC test rather than queue test.
57     // Used only per-pair to lessen impact on queue sync
58     static final int LAG_MASK = (1 << 12) - 1;
59    
60     public static void main(String[] args) throws Exception {
61     int maxN = NCPUS * 3 / 2;
62    
63 jsr166 1.2 if (args.length > 0)
64 dl 1.1 maxN = Integer.parseInt(args[0]);
65    
66     warmup();
67     print = true;
68 jsr166 1.7 for (int k = 1, i = 1; i <= maxN;) {
69 dl 1.1 System.out.println("Pairs:" + i);
70     oneTest(i, ITERS);
71     if (i == k) {
72     k = i << 1;
73     i = i + (i >>> 1);
74 jsr166 1.2 }
75     else
76 dl 1.1 i = k;
77     }
78     pool.shutdown();
79     }
80    
81     static void warmup() throws Exception {
82     print = false;
83     System.out.print("Warmup ");
84     int it = 2000;
85     for (int j = 5; j > 0; --j) {
86 jsr166 1.2 oneTest(j, it);
87 dl 1.1 System.out.print(".");
88     it += 1000;
89     }
90     System.gc();
91     it = 20000;
92     for (int j = 5; j > 0; --j) {
93 jsr166 1.2 oneTest(j, it);
94 dl 1.1 System.out.print(".");
95     it += 10000;
96     }
97     System.gc();
98     System.out.println();
99     }
100    
101     static void oneTest(int n, int iters) throws Exception {
102     int fairIters = iters/16;
103    
104     Thread.sleep(100); // System.gc();
105     if (print)
106     System.out.print("LinkedTransferQueue ");
107     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
108    
109     Thread.sleep(100); // System.gc();
110     if (print)
111     System.out.print("ConcurrentLinkedQueue ");
112     oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
113    
114     Thread.sleep(100); // System.gc();
115     if (print)
116 dl 1.4 System.out.print("ConcurrentLinkedDeque ");
117     oneRun(new ConcurrentLinkedDeque<Integer>(), n, iters);
118    
119     Thread.sleep(100); // System.gc();
120     if (print)
121 dl 1.1 System.out.print("LinkedBlockingQueue ");
122     oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
123    
124     Thread.sleep(100); // System.gc();
125     if (print)
126     System.out.print("LinkedBlockingQueue(cap)");
127     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
128    
129     Thread.sleep(100); // System.gc();
130     if (print)
131     System.out.print("LinkedBlockingDeque ");
132     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
133 jsr166 1.2
134 dl 1.1 Thread.sleep(100); // System.gc();
135     if (print)
136     System.out.print("ArrayBlockingQueue ");
137     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
138    
139     Thread.sleep(100); // System.gc();
140     if (print)
141     System.out.print("PriorityBlockingQueue ");
142     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
143    
144     Thread.sleep(100); // System.gc();
145     if (print)
146     System.out.print("ArrayBlockingQueue(fair)");
147     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
148     }
149 jsr166 1.2
150 jsr166 1.5 abstract static class Stage implements Runnable {
151 dl 1.1 final int iters;
152     final Queue<Integer> queue;
153     final CyclicBarrier barrier;
154     final Phaser lagPhaser;
155 jsr166 1.3 Stage(Queue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
156 jsr166 1.2 queue = q;
157 dl 1.1 barrier = b;
158     lagPhaser = s;
159     this.iters = iters;
160     }
161     }
162    
163     static class Producer extends Stage {
164     Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
165     int iters) {
166     super(q, b, s, iters);
167     }
168    
169     public void run() {
170     try {
171     barrier.await();
172     int ps = 0;
173     int r = hashCode();
174     int i = 0;
175     for (;;) {
176     r = LoopHelpers.compute7(r);
177     Integer v = intPool[r & POOL_MASK];
178     int k = v.intValue();
179     if (queue.offer(v)) {
180     ps += k;
181     ++i;
182     if (i >= iters)
183     break;
184     if ((i & LAG_MASK) == LAG_MASK)
185     lagPhaser.arriveAndAwaitAdvance();
186 jsr166 1.2 }
187 dl 1.1 }
188     addProducerSum(ps);
189     barrier.await();
190     }
191 jsr166 1.2 catch (Exception ie) {
192     ie.printStackTrace();
193     return;
194 dl 1.1 }
195     }
196     }
197    
198     static class Consumer extends Stage {
199     Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
200 jsr166 1.2 int iters) {
201 dl 1.1 super(q, b, s, iters);
202     }
203    
204     public void run() {
205     try {
206     barrier.await();
207     int cs = 0;
208     int i = 0;
209     for (;;) {
210     Integer v = queue.poll();
211     if (v != null) {
212     int k = v.intValue();
213     cs += k;
214     ++i;
215     if (i >= iters)
216     break;
217     if ((i & LAG_MASK) == LAG_MASK)
218     lagPhaser.arriveAndAwaitAdvance();
219     }
220     }
221     addConsumerSum(cs);
222     barrier.await();
223     }
224 jsr166 1.2 catch (Exception ie) {
225     ie.printStackTrace();
226     return;
227 dl 1.1 }
228     }
229    
230     }
231    
232     static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
233     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
234     CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
235     for (int i = 0; i < n; ++i) {
236     Phaser s = new Phaser(2);
237     pool.execute(new Producer(q, barrier, s, iters));
238     pool.execute(new Consumer(q, barrier, s, iters));
239     }
240     barrier.await();
241     barrier.await();
242     long time = timer.getTime();
243     checkSum();
244     if (print)
245     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
246     }
247    
248     }