ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/OfferPollLoops.java
Revision: 1.5
Committed: Mon Sep 27 19:15:15 2010 UTC (13 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.4: +1 -1 lines
Log Message:
use blessed declaration modifier order

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5     */
6    
7     import java.util.*;
8     import java.util.concurrent.*;
9     //import jsr166y.*;
10    
11     public class OfferPollLoops {
12     static final int NCPUS = Runtime.getRuntime().availableProcessors();
13     static final Random rng = new Random();
14     static final ExecutorService pool = Executors.newCachedThreadPool();
15     static boolean print = false;
16     static int producerSum;
17     static int consumerSum;
18     static synchronized void addProducerSum(int x) {
19     producerSum += x;
20     }
21    
22     static synchronized void addConsumerSum(int x) {
23     consumerSum += x;
24     }
25    
26     static synchronized void checkSum() {
27     if (producerSum != consumerSum)
28     throw new Error("CheckSum mismatch");
29     }
30    
31     // Number of elements passed around -- must be power of two
32     // Elements are reused from pool to minimize alloc impact
33     static final int POOL_SIZE = 1 << 8;
34     static final int POOL_MASK = POOL_SIZE-1;
35     static final Integer[] intPool = new Integer[POOL_SIZE];
36     static {
37 jsr166 1.2 for (int i = 0; i < POOL_SIZE; ++i)
38 dl 1.1 intPool[i] = Integer.valueOf(i);
39     }
40    
41     // Number of puts by producers or takes by consumers
42     static final int ITERS = 1 << 20;
43    
44 jsr166 1.2 // max lag between a producer and consumer to avoid
45 dl 1.1 // this becoming a GC test rather than queue test.
46     // Used only per-pair to lessen impact on queue sync
47     static final int LAG_MASK = (1 << 12) - 1;
48    
49     public static void main(String[] args) throws Exception {
50     int maxN = NCPUS * 3 / 2;
51    
52 jsr166 1.2 if (args.length > 0)
53 dl 1.1 maxN = Integer.parseInt(args[0]);
54    
55     warmup();
56     print = true;
57     int k = 1;
58     for (int i = 1; i <= maxN;) {
59     System.out.println("Pairs:" + i);
60     oneTest(i, ITERS);
61     if (i == k) {
62     k = i << 1;
63     i = i + (i >>> 1);
64 jsr166 1.2 }
65     else
66 dl 1.1 i = k;
67     }
68     pool.shutdown();
69     }
70    
71     static void warmup() throws Exception {
72     print = false;
73     System.out.print("Warmup ");
74     int it = 2000;
75     for (int j = 5; j > 0; --j) {
76 jsr166 1.2 oneTest(j, it);
77 dl 1.1 System.out.print(".");
78     it += 1000;
79     }
80     System.gc();
81     it = 20000;
82     for (int j = 5; j > 0; --j) {
83 jsr166 1.2 oneTest(j, it);
84 dl 1.1 System.out.print(".");
85     it += 10000;
86     }
87     System.gc();
88     System.out.println();
89     }
90    
91     static void oneTest(int n, int iters) throws Exception {
92     int fairIters = iters/16;
93    
94     Thread.sleep(100); // System.gc();
95     if (print)
96     System.out.print("LinkedTransferQueue ");
97     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
98    
99     Thread.sleep(100); // System.gc();
100     if (print)
101     System.out.print("ConcurrentLinkedQueue ");
102     oneRun(new ConcurrentLinkedQueue<Integer>(), n, iters);
103    
104     Thread.sleep(100); // System.gc();
105     if (print)
106 dl 1.4 System.out.print("ConcurrentLinkedDeque ");
107     oneRun(new ConcurrentLinkedDeque<Integer>(), n, iters);
108    
109     Thread.sleep(100); // System.gc();
110     if (print)
111 dl 1.1 System.out.print("LinkedBlockingQueue ");
112     oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
113    
114     Thread.sleep(100); // System.gc();
115     if (print)
116     System.out.print("LinkedBlockingQueue(cap)");
117     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
118    
119     Thread.sleep(100); // System.gc();
120     if (print)
121     System.out.print("LinkedBlockingDeque ");
122     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
123 jsr166 1.2
124 dl 1.1 Thread.sleep(100); // System.gc();
125     if (print)
126     System.out.print("ArrayBlockingQueue ");
127     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
128    
129    
130     Thread.sleep(100); // System.gc();
131     if (print)
132     System.out.print("PriorityBlockingQueue ");
133     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
134    
135     Thread.sleep(100); // System.gc();
136     if (print)
137     System.out.print("ArrayBlockingQueue(fair)");
138     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
139    
140     }
141 jsr166 1.2
142 jsr166 1.5 abstract static class Stage implements Runnable {
143 dl 1.1 final int iters;
144     final Queue<Integer> queue;
145     final CyclicBarrier barrier;
146     final Phaser lagPhaser;
147 jsr166 1.3 Stage(Queue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
148 jsr166 1.2 queue = q;
149 dl 1.1 barrier = b;
150     lagPhaser = s;
151     this.iters = iters;
152     }
153     }
154    
155     static class Producer extends Stage {
156     Producer(Queue<Integer> q, CyclicBarrier b, Phaser s,
157     int iters) {
158     super(q, b, s, iters);
159     }
160    
161     public void run() {
162     try {
163     barrier.await();
164     int ps = 0;
165     int r = hashCode();
166     int i = 0;
167     for (;;) {
168     r = LoopHelpers.compute7(r);
169     Integer v = intPool[r & POOL_MASK];
170     int k = v.intValue();
171     if (queue.offer(v)) {
172     ps += k;
173     ++i;
174     if (i >= iters)
175     break;
176     if ((i & LAG_MASK) == LAG_MASK)
177     lagPhaser.arriveAndAwaitAdvance();
178 jsr166 1.2 }
179 dl 1.1 }
180     addProducerSum(ps);
181     barrier.await();
182     }
183 jsr166 1.2 catch (Exception ie) {
184     ie.printStackTrace();
185     return;
186 dl 1.1 }
187     }
188     }
189    
190     static class Consumer extends Stage {
191     Consumer(Queue<Integer> q, CyclicBarrier b, Phaser s,
192 jsr166 1.2 int iters) {
193 dl 1.1 super(q, b, s, iters);
194     }
195    
196     public void run() {
197     try {
198     barrier.await();
199     int cs = 0;
200     int i = 0;
201     for (;;) {
202     Integer v = queue.poll();
203     if (v != null) {
204     int k = v.intValue();
205     cs += k;
206     ++i;
207     if (i >= iters)
208     break;
209     if ((i & LAG_MASK) == LAG_MASK)
210     lagPhaser.arriveAndAwaitAdvance();
211     }
212     }
213     addConsumerSum(cs);
214     barrier.await();
215     }
216 jsr166 1.2 catch (Exception ie) {
217     ie.printStackTrace();
218     return;
219 dl 1.1 }
220     }
221    
222     }
223    
224     static void oneRun(Queue<Integer> q, int n, int iters) throws Exception {
225     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
226     CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
227     for (int i = 0; i < n; ++i) {
228     Phaser s = new Phaser(2);
229     pool.execute(new Producer(q, barrier, s, iters));
230     pool.execute(new Consumer(q, barrier, s, iters));
231     }
232     barrier.await();
233     barrier.await();
234     long time = timer.getTime();
235     checkSum();
236     if (print)
237     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
238     }
239    
240    
241     }