ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
Revision: 1.3
Committed: Fri Oct 23 19:57:07 2009 UTC (14 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.2: +125 -29 lines
Log Message:
Update misc tests for JDK7

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     import java.util.concurrent.*;
8 dl 1.3 //import jsr166y.*;
9 dl 1.1
10     public class SingleProducerMultipleConsumerLoops {
11 dl 1.3 static final int NCPUS = Runtime.getRuntime().availableProcessors();
12    
13     // Number of puts by producers or takes by consumers
14     static final int ITERS = 1 << 20;
15 dl 1.1
16     static final ExecutorService pool = Executors.newCachedThreadPool();
17     static boolean print = false;
18    
19 dl 1.3 // Number of elements passed around -- must be power of two
20     // Elements are reused from pool to minimize alloc impact
21     static final int POOL_SIZE = 1 << 8;
22     static final int POOL_MASK = POOL_SIZE-1;
23     static final Integer[] intPool = new Integer[POOL_SIZE];
24     static {
25     for (int i = 0; i < POOL_SIZE; ++i)
26     intPool[i] = Integer.valueOf(i);
27     }
28    
29 dl 1.1 public static void main(String[] args) throws Exception {
30 dl 1.3 int maxn = 12;
31 dl 1.1
32     if (args.length > 0)
33 dl 1.3 maxn = Integer.parseInt(args[0]);
34 dl 1.1
35     print = false;
36 dl 1.3 warmup();
37 dl 1.1 print = true;
38 dl 1.3
39     int k = 1;
40     for (int i = 1; i <= maxn;) {
41 dl 1.1 System.out.println("Consumers:" + i);
42 dl 1.3 oneTest(i, ITERS);
43     if (i == k) {
44     k = i << 1;
45     i = i + (i >>> 1);
46     }
47     else
48     i = k;
49 dl 1.1 }
50 dl 1.3
51 dl 1.1 pool.shutdown();
52     }
53    
54 dl 1.3 static void warmup() throws Exception {
55     print = false;
56     System.out.print("Warmup ");
57     int it = 2000;
58     for (int j = 5; j > 0; --j) {
59     oneTest(j, it);
60     System.out.print(".");
61     it += 1000;
62     }
63     System.gc();
64     it = 20000;
65     for (int j = 5; j > 0; --j) {
66     oneTest(j, it);
67     System.out.print(".");
68     it += 10000;
69     }
70     System.gc();
71     System.out.println();
72     }
73    
74     static void oneTest(int n, int iters) throws Exception {
75     int fairIters = iters/16;
76    
77     Thread.sleep(100); // System.gc();
78 dl 1.1 if (print)
79 dl 1.3 System.out.print("LinkedTransferQueue ");
80     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
81 dl 1.1
82 dl 1.3 Thread.sleep(100); // System.gc();
83 dl 1.1 if (print)
84     System.out.print("LinkedBlockingQueue ");
85 dl 1.3 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
86 dl 1.1
87 dl 1.3 Thread.sleep(100); // System.gc();
88 dl 1.1 if (print)
89 dl 1.3 System.out.print("LinkedBlockingQueue(cap)");
90     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
91    
92     Thread.sleep(100); // System.gc();
93     if (print)
94     System.out.print("LinkedBlockingDeque ");
95     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
96    
97     Thread.sleep(100); // System.gc();
98     if (print)
99     System.out.print("ArrayBlockingQueue ");
100     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
101 dl 1.1
102 dl 1.3 Thread.sleep(100); // System.gc();
103 dl 1.1 if (print)
104     System.out.print("SynchronousQueue ");
105 dl 1.3 oneRun(new SynchronousQueue<Integer>(), n, iters);
106    
107     Thread.sleep(100); // System.gc();
108     if (print)
109     System.out.print("SynchronousQueue(fair) ");
110     oneRun(new SynchronousQueue<Integer>(true), n, iters);
111    
112     Thread.sleep(100); // System.gc();
113     if (print)
114     System.out.print("LinkedTransferQueue(xfer)");
115     oneRun(new LTQasSQ<Integer>(), n, iters);
116 dl 1.1
117 dl 1.3 Thread.sleep(100); // System.gc();
118     if (print)
119     System.out.print("LinkedTransferQueue(half)");
120     oneRun(new HalfSyncLTQ<Integer>(), n, iters);
121    
122     Thread.sleep(100); // System.gc();
123     if (print)
124     System.out.print("PriorityBlockingQueue ");
125     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
126    
127     Thread.sleep(100); // System.gc();
128 dl 1.1 if (print)
129     System.out.print("ArrayBlockingQueue(fair)");
130 dl 1.3 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
131    
132 dl 1.1 }
133    
134     static abstract class Stage implements Runnable {
135     final int iters;
136     final BlockingQueue<Integer> queue;
137     final CyclicBarrier barrier;
138     volatile int result;
139     Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
140     queue = q;
141     barrier = b;
142     this.iters = iters;
143     }
144     }
145    
146     static class Producer extends Stage {
147     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
148     super(q, b, iters);
149     }
150    
151     public void run() {
152     try {
153     barrier.await();
154 dl 1.3 int r = hashCode();
155 dl 1.1 for (int i = 0; i < iters; ++i) {
156 dl 1.3 r = LoopHelpers.compute7(r);
157     Integer v = intPool[r & POOL_MASK];
158     queue.put(v);
159 dl 1.1 }
160     barrier.await();
161     result = 432;
162     }
163     catch (Exception ie) {
164     ie.printStackTrace();
165     return;
166     }
167     }
168     }
169    
170     static class Consumer extends Stage {
171     Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
172     super(q, b, iters);
173 dl 1.3
174 dl 1.1 }
175    
176     public void run() {
177     try {
178     barrier.await();
179     int l = 0;
180     int s = 0;
181     for (int i = 0; i < iters; ++i) {
182     Integer item = queue.take();
183 dl 1.3 s += item.intValue();
184 dl 1.1 }
185     barrier.await();
186     result = s;
187 dl 1.3 if (s == 0) System.out.print(" ");
188 dl 1.1 }
189     catch (Exception ie) {
190     ie.printStackTrace();
191     return;
192     }
193     }
194    
195     }
196    
197     static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
198     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
199     CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
200     pool.execute(new Producer(q, barrier, iters * nconsumers));
201     for (int i = 0; i < nconsumers; ++i) {
202     pool.execute(new Consumer(q, barrier, iters));
203     }
204     barrier.await();
205     barrier.await();
206     long time = timer.getTime();
207     if (print)
208     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
209     }
210    
211 dl 1.3 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
212     LTQasSQ() { super(); }
213     public void put(T x) {
214     try { super.transfer(x);
215     } catch (InterruptedException ex) { throw new Error(); }
216     }
217     }
218    
219     static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
220     int calls;
221     HalfSyncLTQ() { super(); }
222     public void put(T x) {
223     if ((++calls & 1) == 0)
224     super.put(x);
225     else {
226     try { super.transfer(x);
227     } catch (InterruptedException ex) {
228     throw new Error();
229     }
230     }
231     }
232     }
233    
234 dl 1.1 }