ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
Revision: 1.12
Committed: Sat Dec 31 21:34:47 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.11: +4 -6 lines
Log Message:
better error handling

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4 jsr166 1.7 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7 jsr166 1.11 import java.util.concurrent.ArrayBlockingQueue;
8     import java.util.concurrent.BlockingQueue;
9     import java.util.concurrent.CyclicBarrier;
10     import java.util.concurrent.ExecutorService;
11     import java.util.concurrent.Executors;
12     import java.util.concurrent.LinkedBlockingDeque;
13     import java.util.concurrent.LinkedBlockingQueue;
14     import java.util.concurrent.LinkedTransferQueue;
15     import java.util.concurrent.PriorityBlockingQueue;
16     import java.util.concurrent.SynchronousQueue;
17 dl 1.1
18     public class SingleProducerMultipleConsumerLoops {
19 dl 1.3 static final int NCPUS = Runtime.getRuntime().availableProcessors();
20    
21     // Number of puts by producers or takes by consumers
22     static final int ITERS = 1 << 20;
23 dl 1.1
24     static final ExecutorService pool = Executors.newCachedThreadPool();
25     static boolean print = false;
26    
27 dl 1.3 // Number of elements passed around -- must be power of two
28     // Elements are reused from pool to minimize alloc impact
29     static final int POOL_SIZE = 1 << 8;
30     static final int POOL_MASK = POOL_SIZE-1;
31     static final Integer[] intPool = new Integer[POOL_SIZE];
32     static {
33 jsr166 1.4 for (int i = 0; i < POOL_SIZE; ++i)
34 dl 1.3 intPool[i] = Integer.valueOf(i);
35     }
36    
37 dl 1.1 public static void main(String[] args) throws Exception {
38 dl 1.3 int maxn = 12;
39 dl 1.1
40 jsr166 1.4 if (args.length > 0)
41 dl 1.3 maxn = Integer.parseInt(args[0]);
42 dl 1.1
43     print = false;
44 dl 1.3 warmup();
45 dl 1.1 print = true;
46 dl 1.3
47 jsr166 1.9 for (int k = 1, i = 1; i <= maxn;) {
48 dl 1.1 System.out.println("Consumers:" + i);
49 dl 1.3 oneTest(i, ITERS);
50     if (i == k) {
51     k = i << 1;
52     i = i + (i >>> 1);
53 jsr166 1.4 }
54     else
55 dl 1.3 i = k;
56 dl 1.1 }
57 jsr166 1.4
58 dl 1.1 pool.shutdown();
59 jsr166 1.8 }
60 dl 1.1
61 dl 1.3 static void warmup() throws Exception {
62     print = false;
63     System.out.print("Warmup ");
64     int it = 2000;
65     for (int j = 5; j > 0; --j) {
66 jsr166 1.4 oneTest(j, it);
67 dl 1.3 System.out.print(".");
68     it += 1000;
69     }
70     System.gc();
71     it = 20000;
72     for (int j = 5; j > 0; --j) {
73 jsr166 1.4 oneTest(j, it);
74 dl 1.3 System.out.print(".");
75     it += 10000;
76     }
77     System.gc();
78     System.out.println();
79     }
80    
81     static void oneTest(int n, int iters) throws Exception {
82     int fairIters = iters/16;
83    
84     Thread.sleep(100); // System.gc();
85 dl 1.1 if (print)
86 dl 1.3 System.out.print("LinkedTransferQueue ");
87     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
88 dl 1.1
89 dl 1.3 Thread.sleep(100); // System.gc();
90 dl 1.1 if (print)
91     System.out.print("LinkedBlockingQueue ");
92 dl 1.3 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
93 dl 1.1
94 dl 1.3 Thread.sleep(100); // System.gc();
95 dl 1.1 if (print)
96 dl 1.3 System.out.print("LinkedBlockingQueue(cap)");
97     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
98    
99     Thread.sleep(100); // System.gc();
100     if (print)
101     System.out.print("LinkedBlockingDeque ");
102     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
103 jsr166 1.4
104 dl 1.3 Thread.sleep(100); // System.gc();
105     if (print)
106     System.out.print("ArrayBlockingQueue ");
107     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
108 dl 1.1
109 dl 1.3 Thread.sleep(100); // System.gc();
110 dl 1.1 if (print)
111     System.out.print("SynchronousQueue ");
112 dl 1.3 oneRun(new SynchronousQueue<Integer>(), n, iters);
113 jsr166 1.4
114 dl 1.3 Thread.sleep(100); // System.gc();
115     if (print)
116     System.out.print("SynchronousQueue(fair) ");
117     oneRun(new SynchronousQueue<Integer>(true), n, iters);
118    
119     Thread.sleep(100); // System.gc();
120     if (print)
121     System.out.print("LinkedTransferQueue(xfer)");
122     oneRun(new LTQasSQ<Integer>(), n, iters);
123 dl 1.1
124 dl 1.3 Thread.sleep(100); // System.gc();
125     if (print)
126     System.out.print("LinkedTransferQueue(half)");
127     oneRun(new HalfSyncLTQ<Integer>(), n, iters);
128 jsr166 1.4
129 dl 1.3 Thread.sleep(100); // System.gc();
130     if (print)
131     System.out.print("PriorityBlockingQueue ");
132     oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
133    
134     Thread.sleep(100); // System.gc();
135 dl 1.1 if (print)
136     System.out.print("ArrayBlockingQueue(fair)");
137 dl 1.3 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
138 dl 1.1 }
139 jsr166 1.4
140 jsr166 1.6 abstract static class Stage implements Runnable {
141 dl 1.1 final int iters;
142     final BlockingQueue<Integer> queue;
143     final CyclicBarrier barrier;
144     volatile int result;
145 jsr166 1.5 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
146 jsr166 1.4 queue = q;
147 dl 1.1 barrier = b;
148     this.iters = iters;
149     }
150     }
151    
152     static class Producer extends Stage {
153     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
154     super(q, b, iters);
155     }
156    
157     public void run() {
158     try {
159     barrier.await();
160 dl 1.3 int r = hashCode();
161 dl 1.1 for (int i = 0; i < iters; ++i) {
162 dl 1.3 r = LoopHelpers.compute7(r);
163     Integer v = intPool[r & POOL_MASK];
164     queue.put(v);
165 dl 1.1 }
166     barrier.await();
167     result = 432;
168     }
169 jsr166 1.4 catch (Exception ie) {
170     ie.printStackTrace();
171     return;
172 dl 1.1 }
173     }
174     }
175    
176     static class Consumer extends Stage {
177 jsr166 1.4 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
178 dl 1.1 super(q, b, iters);
179     }
180    
181     public void run() {
182     try {
183     barrier.await();
184     int l = 0;
185     int s = 0;
186     for (int i = 0; i < iters; ++i) {
187     Integer item = queue.take();
188 dl 1.3 s += item.intValue();
189 dl 1.1 }
190     barrier.await();
191     result = s;
192 dl 1.3 if (s == 0) System.out.print(" ");
193 dl 1.1 }
194 jsr166 1.4 catch (Exception ie) {
195     ie.printStackTrace();
196     return;
197 dl 1.1 }
198     }
199    
200     }
201    
202     static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
203     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
204     CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
205     pool.execute(new Producer(q, barrier, iters * nconsumers));
206     for (int i = 0; i < nconsumers; ++i) {
207     pool.execute(new Consumer(q, barrier, iters));
208     }
209     barrier.await();
210     barrier.await();
211     long time = timer.getTime();
212     if (print)
213     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
214     }
215    
216 dl 1.3 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
217     LTQasSQ() { super(); }
218     public void put(T x) {
219 jsr166 1.12 try { super.transfer(x); }
220     catch (InterruptedException ex) { throw new Error(ex); }
221 dl 1.3 }
222     }
223    
224     static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
225     int calls;
226     HalfSyncLTQ() { super(); }
227     public void put(T x) {
228     if ((++calls & 1) == 0)
229     super.put(x);
230     else {
231 jsr166 1.12 try { super.transfer(x); }
232     catch (InterruptedException ex) { throw new Error(ex); }
233 dl 1.3 }
234     }
235     }
236    
237 dl 1.1 }