ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
Revision: 1.6
Committed: Mon Sep 27 19:15:16 2010 UTC (13 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.5: +1 -1 lines
Log Message:
use blessed declaration modifier order

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     import java.util.concurrent.*;
8 dl 1.3 //import jsr166y.*;
9 dl 1.1
10     public class SingleProducerMultipleConsumerLoops {
11 dl 1.3 static final int NCPUS = Runtime.getRuntime().availableProcessors();
12    
13     // Number of puts by producers or takes by consumers
14     static final int ITERS = 1 << 20;
15 dl 1.1
16     static final ExecutorService pool = Executors.newCachedThreadPool();
17     static boolean print = false;
18    
19 dl 1.3 // Number of elements passed around -- must be power of two
20     // Elements are reused from pool to minimize alloc impact
21     static final int POOL_SIZE = 1 << 8;
22     static final int POOL_MASK = POOL_SIZE-1;
23     static final Integer[] intPool = new Integer[POOL_SIZE];
24     static {
25 jsr166 1.4 for (int i = 0; i < POOL_SIZE; ++i)
26 dl 1.3 intPool[i] = Integer.valueOf(i);
27     }
28    
29 dl 1.1 public static void main(String[] args) throws Exception {
30 dl 1.3 int maxn = 12;
31 dl 1.1
32 jsr166 1.4 if (args.length > 0)
33 dl 1.3 maxn = Integer.parseInt(args[0]);
34 dl 1.1
35     print = false;
36 dl 1.3 warmup();
37 dl 1.1 print = true;
38 dl 1.3
39     int k = 1;
40     for (int i = 1; i <= maxn;) {
41 dl 1.1 System.out.println("Consumers:" + i);
42 dl 1.3 oneTest(i, ITERS);
43     if (i == k) {
44     k = i << 1;
45     i = i + (i >>> 1);
46 jsr166 1.4 }
47     else
48 dl 1.3 i = k;
49 dl 1.1 }
50 jsr166 1.4
51 dl 1.1 pool.shutdown();
52     }
53    
54 dl 1.3 static void warmup() throws Exception {
55     print = false;
56     System.out.print("Warmup ");
57     int it = 2000;
58     for (int j = 5; j > 0; --j) {
59 jsr166 1.4 oneTest(j, it);
60 dl 1.3 System.out.print(".");
61     it += 1000;
62     }
63     System.gc();
64     it = 20000;
65     for (int j = 5; j > 0; --j) {
66 jsr166 1.4 oneTest(j, it);
67 dl 1.3 System.out.print(".");
68     it += 10000;
69     }
70     System.gc();
71     System.out.println();
72     }
73    
74     static void oneTest(int n, int iters) throws Exception {
75     int fairIters = iters/16;
76    
77     Thread.sleep(100); // System.gc();
78 dl 1.1 if (print)
79 dl 1.3 System.out.print("LinkedTransferQueue ");
80     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
81 dl 1.1
82 dl 1.3 Thread.sleep(100); // System.gc();
83 dl 1.1 if (print)
84     System.out.print("LinkedBlockingQueue ");
85 dl 1.3 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
86 dl 1.1
87 dl 1.3 Thread.sleep(100); // System.gc();
88 dl 1.1 if (print)
89 dl 1.3 System.out.print("LinkedBlockingQueue(cap)");
90     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
91    
92     Thread.sleep(100); // System.gc();
93     if (print)
94     System.out.print("LinkedBlockingDeque ");
95     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
96 jsr166 1.4
97 dl 1.3 Thread.sleep(100); // System.gc();
98     if (print)
99     System.out.print("ArrayBlockingQueue ");
100     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
101 dl 1.1
102 dl 1.3 Thread.sleep(100); // System.gc();
103 dl 1.1 if (print)
104     System.out.print("SynchronousQueue ");
105 dl 1.3 oneRun(new SynchronousQueue<Integer>(), n, iters);
106 jsr166 1.4
107 dl 1.3 Thread.sleep(100); // System.gc();
108     if (print)
109     System.out.print("SynchronousQueue(fair) ");
110     oneRun(new SynchronousQueue<Integer>(true), n, iters);
111    
112     Thread.sleep(100); // System.gc();
113     if (print)
114     System.out.print("LinkedTransferQueue(xfer)");
115     oneRun(new LTQasSQ<Integer>(), n, iters);
116 dl 1.1
117 dl 1.3 Thread.sleep(100); // System.gc();
118     if (print)
119     System.out.print("LinkedTransferQueue(half)");
120     oneRun(new HalfSyncLTQ<Integer>(), n, iters);
121 jsr166 1.4
122 dl 1.3 Thread.sleep(100); // System.gc();
123     if (print)
124     System.out.print("PriorityBlockingQueue ");
125     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
126    
127     Thread.sleep(100); // System.gc();
128 dl 1.1 if (print)
129     System.out.print("ArrayBlockingQueue(fair)");
130 dl 1.3 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
131    
132 dl 1.1 }
133 jsr166 1.4
134 jsr166 1.6 abstract static class Stage implements Runnable {
135 dl 1.1 final int iters;
136     final BlockingQueue<Integer> queue;
137     final CyclicBarrier barrier;
138     volatile int result;
139 jsr166 1.5 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
140 jsr166 1.4 queue = q;
141 dl 1.1 barrier = b;
142     this.iters = iters;
143     }
144     }
145    
146     static class Producer extends Stage {
147     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
148     super(q, b, iters);
149     }
150    
151     public void run() {
152     try {
153     barrier.await();
154 dl 1.3 int r = hashCode();
155 dl 1.1 for (int i = 0; i < iters; ++i) {
156 dl 1.3 r = LoopHelpers.compute7(r);
157     Integer v = intPool[r & POOL_MASK];
158     queue.put(v);
159 dl 1.1 }
160     barrier.await();
161     result = 432;
162     }
163 jsr166 1.4 catch (Exception ie) {
164     ie.printStackTrace();
165     return;
166 dl 1.1 }
167     }
168     }
169    
170     static class Consumer extends Stage {
171 jsr166 1.4 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
172 dl 1.1 super(q, b, iters);
173 dl 1.3
174 dl 1.1 }
175    
176     public void run() {
177     try {
178     barrier.await();
179     int l = 0;
180     int s = 0;
181     for (int i = 0; i < iters; ++i) {
182     Integer item = queue.take();
183 dl 1.3 s += item.intValue();
184 dl 1.1 }
185     barrier.await();
186     result = s;
187 dl 1.3 if (s == 0) System.out.print(" ");
188 dl 1.1 }
189 jsr166 1.4 catch (Exception ie) {
190     ie.printStackTrace();
191     return;
192 dl 1.1 }
193     }
194    
195     }
196    
197     static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
198     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
199     CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
200     pool.execute(new Producer(q, barrier, iters * nconsumers));
201     for (int i = 0; i < nconsumers; ++i) {
202     pool.execute(new Consumer(q, barrier, iters));
203     }
204     barrier.await();
205     barrier.await();
206     long time = timer.getTime();
207     if (print)
208     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
209     }
210    
211 dl 1.3 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
212     LTQasSQ() { super(); }
213     public void put(T x) {
214 jsr166 1.4 try { super.transfer(x);
215 dl 1.3 } catch (InterruptedException ex) { throw new Error(); }
216     }
217     }
218    
219     static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
220     int calls;
221     HalfSyncLTQ() { super(); }
222     public void put(T x) {
223     if ((++calls & 1) == 0)
224     super.put(x);
225     else {
226 jsr166 1.4 try { super.transfer(x);
227     } catch (InterruptedException ex) {
228     throw new Error();
229 dl 1.3 }
230     }
231     }
232     }
233    
234 dl 1.1 }