ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
Revision: 1.3
Committed: Mon Feb 19 00:46:06 2007 UTC (17 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.2: +2 -6 lines
Log Message:
Uniform headers

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 import java.util.concurrent.*;
8
9 public class MultipleProducersSingleConsumerLoops {
10 static final int CAPACITY = 100;
11 static final ExecutorService pool = Executors.newCachedThreadPool();
12 static boolean print = false;
13 static int producerSum;
14 static int consumerSum;
15
16 static synchronized void addProducerSum(int x) {
17 producerSum += x;
18 }
19
20 static synchronized void addConsumerSum(int x) {
21 consumerSum += x;
22 }
23
24 static synchronized void checkSum() {
25 if (producerSum != consumerSum)
26 throw new Error("CheckSum mismatch");
27 }
28
29 public static void main(String[] args) throws Exception {
30 int maxProducers = 100;
31 int iters = 100000;
32
33 if (args.length > 0)
34 maxProducers = Integer.parseInt(args[0]);
35
36 print = false;
37 System.out.println("Warmup...");
38 oneTest(1, 10000);
39 Thread.sleep(100);
40 oneTest(2, 10000);
41 Thread.sleep(100);
42 print = true;
43
44 for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
45 System.out.println("Producers:" + i);
46 oneTest(i, iters);
47 Thread.sleep(100);
48 }
49 pool.shutdown();
50 }
51
52 static void oneTest(int producers, int iters) throws Exception {
53 if (print)
54 System.out.print("ArrayBlockingQueue ");
55 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
56
57 if (print)
58 System.out.print("LinkedBlockingQueue ");
59 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
60
61 // Don't run PBQ since can legitimately run out of memory
62 // if (print)
63 // System.out.print("PriorityBlockingQueue ");
64 // oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
65
66 if (print)
67 System.out.print("SynchronousQueue ");
68 oneRun(new SynchronousQueue<Integer>(), producers, iters);
69
70 if (print)
71 System.out.print("SynchronousQueue(fair) ");
72 oneRun(new SynchronousQueue<Integer>(true), producers, iters);
73
74 if (print)
75 System.out.print("ArrayBlockingQueue(fair)");
76 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters/10);
77 }
78
79 static abstract class Stage implements Runnable {
80 final int iters;
81 final BlockingQueue<Integer> queue;
82 final CyclicBarrier barrier;
83 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
84 queue = q;
85 barrier = b;
86 this.iters = iters;
87 }
88 }
89
90 static class Producer extends Stage {
91 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
92 super(q, b, iters);
93 }
94
95 public void run() {
96 try {
97 barrier.await();
98 int s = 0;
99 int l = hashCode();
100 for (int i = 0; i < iters; ++i) {
101 l = LoopHelpers.compute1(l);
102 l = LoopHelpers.compute2(l);
103 queue.put(new Integer(l));
104 s += l;
105 }
106 addProducerSum(s);
107 barrier.await();
108 }
109 catch (Exception ie) {
110 ie.printStackTrace();
111 return;
112 }
113 }
114 }
115
116 static class Consumer extends Stage {
117 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
118 super(q, b, iters);
119 }
120
121 public void run() {
122 try {
123 barrier.await();
124 int s = 0;
125 for (int i = 0; i < iters; ++i) {
126 s += queue.take().intValue();
127 }
128 addConsumerSum(s);
129 barrier.await();
130 }
131 catch (Exception ie) {
132 ie.printStackTrace();
133 return;
134 }
135 }
136
137 }
138
139 static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
140 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
141 CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
142 for (int i = 0; i < nproducers; ++i) {
143 pool.execute(new Producer(q, barrier, iters));
144 }
145 pool.execute(new Consumer(q, barrier, iters * nproducers));
146 barrier.await();
147 barrier.await();
148 long time = timer.getTime();
149 checkSum();
150 if (print)
151 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer");
152 }
153
154 }