ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.3
Committed: Wed Sep 1 07:47:27 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.2: +1 -2 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     import java.util.*;
8     import java.util.concurrent.*;
9     //import jsr166y.*;
10    
11     public class OfferPollLoops {
12     static final int NCPUS = Runtime.getRuntime().availableProcessors();
13     static final Random rng = new Random();
14     static final ExecutorService pool = Executors.newCachedThreadPool();
15     static boolean print = false;
16     static int producerSum;
17     static int consumerSum;
18     static synchronized void addProducerSum(int x) {
19     producerSum += x;
20     }
21    
22     static synchronized void addConsumerSum(int x) {
23     consumerSum += x;
24     }
25    
26     static synchronized void checkSum() {
27     if (producerSum != consumerSum)
28     throw new Error("CheckSum mismatch");
29     }
30    
31     // Number of elements passed around -- must be power of two
32     // Elements are reused from pool to minimize alloc impact
33     static final int POOL_SIZE = 1 << 8;
34     static final int POOL_MASK = POOL_SIZE-1;
35     static final Integer[] intPool = new Integer[POOL_SIZE];
36     static {
37 jsr166 1.2 for (int i = 0; i < POOL_SIZE; ++i)
38 dl 1.1 intPool[i] = Integer.valueOf(i);
39     }
40    
41     // Number of puts by producers or takes by consumers
42     static final int ITERS = 1 << 20;
43    
44 jsr166 1.2 // max lag between a producer and consumer to avoid
45 dl 1.1 // this becoming a GC test rather than queue test.
46     // Used only per-pair to lessen impact on queue sync
47     static final int LAG_MASK = (1 << 12) - 1;
48    
49     public static void main(String[] args) throws Exception {
50     int maxN = NCPUS * 3 / 2;
51    
52 jsr166 1.2 if (args.length > 0)
53 dl 1.1 maxN = Integer.parseInt(args[0]);
54    
55     warmup();
56     print = true;
57     int k = 1;
58     for (int i = 1; i <= maxN;) {
59     System.out.println("Pairs:" + i);
60     oneTest(i, ITERS);
61     if (i == k) {
62     k = i << 1;
63     i = i + (i >>> 1);
64 jsr166 1.2 }
65     else
66 dl 1.1 i = k;
67     }
68     pool.shutdown();
69     }
70    
71     static void warmup() throws Exception {
72     print = false;
73     System.out.print("Warmup ");
74     int it = 2000;
75     for (int j = 5; j > 0; --j) {
76 jsr166 1.2 oneTest(j, it);
77 dl 1.1 System.out.print(".");
78     it += 1000;
79     }
80     System.gc();
81     it = 20000;
82     for (int j = 5; j > 0; --j) {
83 jsr166 1.2 oneTest(j, it);
84 dl 1.1 System.out.print(".");
85     it += 10000;
86     }
87     System.gc();
88     System.out.println();
89     }
90    
91     static void oneTest(int n, int iters) throws Exception {
92     int fairIters = iters/16;
93    
94     Thread.sleep(100); // System.gc();
95     if (print)
96     System.out.print("LinkedTransferQueue ");
97     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
98    
99     Thread.sleep(100); // System.gc();
100     if (print)
101     System.out.print("ConcurrentLinkedQueue ");
102     oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
103    
104     Thread.sleep(100); // System.gc();
105     if (print)
106     System.out.print("LinkedBlockingQueue ");
107     oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
108    
109     Thread.sleep(100); // System.gc();
110     if (print)
111     System.out.print("LinkedBlockingQueue(cap)");
112     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
113    
114     Thread.sleep(100); // System.gc();
115     if (print)
116     System.out.print("LinkedBlockingDeque ");
117     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
118 jsr166 1.2
119 dl 1.1 Thread.sleep(100); // System.gc();
120     if (print)
121     System.out.print("ArrayBlockingQueue ");
122     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
123    
124    
125     Thread.sleep(100); // System.gc();
126     if (print)
127     System.out.print("PriorityBlockingQueue ");
128     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
129    
130     Thread.sleep(100); // System.gc();
131     if (print)
132     System.out.print("ArrayBlockingQueue(fair)");
133     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
134    
135     }
136 jsr166 1.2
137 dl 1.1 static abstract class Stage implements Runnable {
138     final int iters;
139     final Queue<Integer> queue;
140     final CyclicBarrier barrier;
141     final Phaser lagPhaser;
142 jsr166 1.3 Stage(Queue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
143 jsr166 1.2 queue = q;
144 dl 1.1 barrier = b;
145     lagPhaser = s;
146     this.iters = iters;
147     }
148     }
149    
150     static class Producer extends Stage {
151     Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
152     int iters) {
153     super(q, b, s, iters);
154     }
155    
156     public void run() {
157     try {
158     barrier.await();
159     int ps = 0;
160     int r = hashCode();
161     int i = 0;
162     for (;;) {
163     r = LoopHelpers.compute7(r);
164     Integer v = intPool[r & POOL_MASK];
165     int k = v.intValue();
166     if (queue.offer(v)) {
167     ps += k;
168     ++i;
169     if (i >= iters)
170     break;
171     if ((i & LAG_MASK) == LAG_MASK)
172     lagPhaser.arriveAndAwaitAdvance();
173 jsr166 1.2 }
174 dl 1.1 }
175     addProducerSum(ps);
176     barrier.await();
177     }
178 jsr166 1.2 catch (Exception ie) {
179     ie.printStackTrace();
180     return;
181 dl 1.1 }
182     }
183     }
184    
185     static class Consumer extends Stage {
186     Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
187 jsr166 1.2 int iters) {
188 dl 1.1 super(q, b, s, iters);
189     }
190    
191     public void run() {
192     try {
193     barrier.await();
194     int cs = 0;
195     int i = 0;
196     for (;;) {
197     Integer v = queue.poll();
198     if (v != null) {
199     int k = v.intValue();
200     cs += k;
201     ++i;
202     if (i >= iters)
203     break;
204     if ((i & LAG_MASK) == LAG_MASK)
205     lagPhaser.arriveAndAwaitAdvance();
206     }
207     }
208     addConsumerSum(cs);
209     barrier.await();
210     }
211 jsr166 1.2 catch (Exception ie) {
212     ie.printStackTrace();
213     return;
214 dl 1.1 }
215     }
216    
217     }
218    
219     static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
220     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
221     CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
222     for (int i = 0; i < n; ++i) {
223     Phaser s = new Phaser(2);
224     pool.execute(new Producer(q, barrier, s, iters));
225     pool.execute(new Consumer(q, barrier, s, iters));
226     }
227     barrier.await();
228     barrier.await();
229     long time = timer.getTime();
230     checkSum();
231     if (print)
232     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
233     }
234    
235    
236     }