ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
Revision: 1.2
Committed: Sun Aug 7 19:25:55 2005 UTC (18 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.1: +4 -0 lines
Log Message:
Add exchanger performance tests; update others

File Contents

# Content
1 /*
2 * @test
3 * @synopsis multiple producers and single consumer using blocking queues
4 */
5 /*
6 * Written by Doug Lea with assistance from members of JCP JSR-166
7 * Expert Group and released to the public domain. Use, modify, and
8 * redistribute this code in any way without acknowledgement.
9 */
10
11 import java.util.concurrent.*;
12
13 public class MultipleProducersSingleConsumerLoops {
14 static final int CAPACITY = 100;
15 static final ExecutorService pool = Executors.newCachedThreadPool();
16 static boolean print = false;
17 static int producerSum;
18 static int consumerSum;
19
20 static synchronized void addProducerSum(int x) {
21 producerSum += x;
22 }
23
24 static synchronized void addConsumerSum(int x) {
25 consumerSum += x;
26 }
27
28 static synchronized void checkSum() {
29 if (producerSum != consumerSum)
30 throw new Error("CheckSum mismatch");
31 }
32
33 public static void main(String[] args) throws Exception {
34 int maxProducers = 100;
35 int iters = 100000;
36
37 if (args.length > 0)
38 maxProducers = Integer.parseInt(args[0]);
39
40 print = false;
41 System.out.println("Warmup...");
42 oneTest(1, 10000);
43 Thread.sleep(100);
44 oneTest(2, 10000);
45 Thread.sleep(100);
46 print = true;
47
48 for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
49 System.out.println("Producers:" + i);
50 oneTest(i, iters);
51 Thread.sleep(100);
52 }
53 pool.shutdown();
54 }
55
56 static void oneTest(int producers, int iters) throws Exception {
57 if (print)
58 System.out.print("ArrayBlockingQueue ");
59 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
60
61 if (print)
62 System.out.print("LinkedBlockingQueue ");
63 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
64
65 // Don't run PBQ since can legitimately run out of memory
66 // if (print)
67 // System.out.print("PriorityBlockingQueue ");
68 // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
69
70 if (print)
71 System.out.print("SynchronousQueue ");
72 oneRun(new SynchronousQueue<Integer>(), producers, iters);
73
74 if (print)
75 System.out.print("SynchronousQueue(fair) ");
76 oneRun(new SynchronousQueue<Integer>(true), producers, iters);
77
78 if (print)
79 System.out.print("ArrayBlockingQueue(fair)");
80 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters/10);
81 }
82
83 static abstract class Stage implements Runnable {
84 final int iters;
85 final BlockingQueue<Integer> queue;
86 final CyclicBarrier barrier;
87 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
88 queue = q;
89 barrier = b;
90 this.iters = iters;
91 }
92 }
93
94 static class Producer extends Stage {
95 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
96 super(q, b, iters);
97 }
98
99 public void run() {
100 try {
101 barrier.await();
102 int s = 0;
103 int l = hashCode();
104 for (int i = 0; i < iters; ++i) {
105 l = LoopHelpers.compute1(l);
106 l = LoopHelpers.compute2(l);
107 queue.put(new Integer(l));
108 s += l;
109 }
110 addProducerSum(s);
111 barrier.await();
112 }
113 catch (Exception ie) {
114 ie.printStackTrace();
115 return;
116 }
117 }
118 }
119
120 static class Consumer extends Stage {
121 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
122 super(q, b, iters);
123 }
124
125 public void run() {
126 try {
127 barrier.await();
128 int s = 0;
129 for (int i = 0; i < iters; ++i) {
130 s += queue.take().intValue();
131 }
132 addConsumerSum(s);
133 barrier.await();
134 }
135 catch (Exception ie) {
136 ie.printStackTrace();
137 return;
138 }
139 }
140
141 }
142
143 static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
144 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
145 CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
146 for (int i = 0; i < nproducers; ++i) {
147 pool.execute(new Producer(q, barrier, iters));
148 }
149 pool.execute(new Consumer(q, barrier, iters * nproducers));
150 barrier.await();
151 barrier.await();
152 long time = timer.getTime();
153 checkSum();
154 if (print)
155 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer");
156 }
157
158 }