ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.1
Committed: Fri Oct 23 19:57:06 2009 UTC (14 years, 6 months ago) by dl
Branch: MAIN
Log Message:
Update misc tests for JDK7

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     import java.util.*;
8     import java.util.concurrent.*;
9     //import jsr166y.*;
10    
11     public class OfferPollLoops {
12     static final int NCPUS = Runtime.getRuntime().availableProcessors();
13     static final Random rng = new Random();
14     static final ExecutorService pool = Executors.newCachedThreadPool();
15     static boolean print = false;
16     static int producerSum;
17     static int consumerSum;
18     static synchronized void addProducerSum(int x) {
19     producerSum += x;
20     }
21    
22     static synchronized void addConsumerSum(int x) {
23     consumerSum += x;
24     }
25    
26     static synchronized void checkSum() {
27     if (producerSum != consumerSum)
28     throw new Error("CheckSum mismatch");
29     }
30    
31     // Number of elements passed around -- must be power of two
32     // Elements are reused from pool to minimize alloc impact
33     static final int POOL_SIZE = 1 << 8;
34     static final int POOL_MASK = POOL_SIZE-1;
35     static final Integer[] intPool = new Integer[POOL_SIZE];
36     static {
37     for (int i = 0; i < POOL_SIZE; ++i)
38     intPool[i] = Integer.valueOf(i);
39     }
40    
41     // Number of puts by producers or takes by consumers
42     static final int ITERS = 1 << 20;
43    
44     // max lag between a producer and consumer to avoid
45     // this becoming a GC test rather than queue test.
46     // Used only per-pair to lessen impact on queue sync
47     static final int LAG_MASK = (1 << 12) - 1;
48    
49     public static void main(String[] args) throws Exception {
50     int maxN = NCPUS * 3 / 2;
51    
52     if (args.length > 0)
53     maxN = Integer.parseInt(args[0]);
54    
55     warmup();
56     print = true;
57     int k = 1;
58     for (int i = 1; i <= maxN;) {
59     System.out.println("Pairs:" + i);
60     oneTest(i, ITERS);
61     if (i == k) {
62     k = i << 1;
63     i = i + (i >>> 1);
64     }
65     else
66     i = k;
67     }
68     pool.shutdown();
69     }
70    
71     static void warmup() throws Exception {
72     print = false;
73     System.out.print("Warmup ");
74     int it = 2000;
75     for (int j = 5; j > 0; --j) {
76     oneTest(j, it);
77     System.out.print(".");
78     it += 1000;
79     }
80     System.gc();
81     it = 20000;
82     for (int j = 5; j > 0; --j) {
83     oneTest(j, it);
84     System.out.print(".");
85     it += 10000;
86     }
87     System.gc();
88     System.out.println();
89     }
90    
91     static void oneTest(int n, int iters) throws Exception {
92     int fairIters = iters/16;
93    
94     Thread.sleep(100); // System.gc();
95     if (print)
96     System.out.print("LinkedTransferQueue ");
97     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
98    
99     Thread.sleep(100); // System.gc();
100     if (print)
101     System.out.print("ConcurrentLinkedQueue ");
102     oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
103    
104     Thread.sleep(100); // System.gc();
105     if (print)
106     System.out.print("LinkedBlockingQueue ");
107     oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
108    
109     Thread.sleep(100); // System.gc();
110     if (print)
111     System.out.print("LinkedBlockingQueue(cap)");
112     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
113    
114     Thread.sleep(100); // System.gc();
115     if (print)
116     System.out.print("LinkedBlockingDeque ");
117     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
118    
119     Thread.sleep(100); // System.gc();
120     if (print)
121     System.out.print("ArrayBlockingQueue ");
122     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
123    
124    
125     Thread.sleep(100); // System.gc();
126     if (print)
127     System.out.print("PriorityBlockingQueue ");
128     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
129    
130     Thread.sleep(100); // System.gc();
131     if (print)
132     System.out.print("ArrayBlockingQueue(fair)");
133     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
134    
135     }
136    
137     static abstract class Stage implements Runnable {
138     final int iters;
139     final Queue<Integer> queue;
140     final CyclicBarrier barrier;
141     final Phaser lagPhaser;
142     Stage (Queue<Integer> q, CyclicBarrier b, Phaser s,
143     int iters) {
144     queue = q;
145     barrier = b;
146     lagPhaser = s;
147     this.iters = iters;
148     }
149     }
150    
151     static class Producer extends Stage {
152     Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
153     int iters) {
154     super(q, b, s, iters);
155     }
156    
157     public void run() {
158     try {
159     barrier.await();
160     int ps = 0;
161     int r = hashCode();
162     int i = 0;
163     for (;;) {
164     r = LoopHelpers.compute7(r);
165     Integer v = intPool[r & POOL_MASK];
166     int k = v.intValue();
167     if (queue.offer(v)) {
168     ps += k;
169     ++i;
170     if (i >= iters)
171     break;
172     if ((i & LAG_MASK) == LAG_MASK)
173     lagPhaser.arriveAndAwaitAdvance();
174     }
175     }
176     addProducerSum(ps);
177     barrier.await();
178     }
179     catch (Exception ie) {
180     ie.printStackTrace();
181     return;
182     }
183     }
184     }
185    
186     static class Consumer extends Stage {
187     Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
188     int iters) {
189     super(q, b, s, iters);
190     }
191    
192     public void run() {
193     try {
194     barrier.await();
195     int cs = 0;
196     int i = 0;
197     for (;;) {
198     Integer v = queue.poll();
199     if (v != null) {
200     int k = v.intValue();
201     cs += k;
202     ++i;
203     if (i >= iters)
204     break;
205     if ((i & LAG_MASK) == LAG_MASK)
206     lagPhaser.arriveAndAwaitAdvance();
207     }
208     }
209     addConsumerSum(cs);
210     barrier.await();
211     }
212     catch (Exception ie) {
213     ie.printStackTrace();
214     return;
215     }
216     }
217    
218     }
219    
220     static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
221     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
222     CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
223     for (int i = 0; i < n; ++i) {
224     Phaser s = new Phaser(2);
225     pool.execute(new Producer(q, barrier, s, iters));
226     pool.execute(new Consumer(q, barrier, s, iters));
227     }
228     barrier.await();
229     barrier.await();
230     long time = timer.getTime();
231     checkSum();
232     if (print)
233     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
234     }
235    
236    
237     }
238