ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/ProducerConsumerLoops.java
Revision: 1.2
Committed: Mon Feb 19 00:46:06 2007 UTC (17 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.1: +2 -6 lines
Log Message:
Uniform headers

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     import java.util.concurrent.*;
8    
9     public class ProducerConsumerLoops {
10     static final int CAPACITY = 100;
11    
12     static final ExecutorService pool = Executors.newCachedThreadPool();
13     static boolean print = false;
14     static int producerSum;
15     static int consumerSum;
16     static synchronized void addProducerSum(int x) {
17     producerSum += x;
18     }
19    
20     static synchronized void addConsumerSum(int x) {
21     consumerSum += x;
22     }
23    
24     static synchronized void checkSum() {
25     if (producerSum != consumerSum)
26     throw new Error("CheckSum mismatch");
27     }
28    
29     public static void main(String[] args) throws Exception {
30     int maxPairs = 100;
31     int iters = 100000;
32    
33     if (args.length > 0)
34     maxPairs = Integer.parseInt(args[0]);
35    
36     print = false;
37     System.out.println("Warmup...");
38     oneTest(1, 10000);
39     Thread.sleep(100);
40     oneTest(2, 10000);
41     Thread.sleep(100);
42     oneTest(2, 10000);
43     Thread.sleep(100);
44     print = true;
45    
46     int k = 1;
47     for (int i = 1; i <= maxPairs;) {
48     System.out.println("Pairs:" + i);
49     oneTest(i, iters);
50     Thread.sleep(100);
51     if (i == k) {
52     k = i << 1;
53     i = i + (i >>> 1);
54     }
55     else
56     i = k;
57     }
58     pool.shutdown();
59     }
60    
61     static void oneTest(int pairs, int iters) throws Exception {
62     int fairIters = iters/20;
63     if (print)
64     System.out.print("ArrayBlockingQueue ");
65     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
66    
67     if (print)
68     System.out.print("LinkedBlockingQueue ");
69     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
70    
71     if (print)
72     System.out.print("LinkedBlockingDeque ");
73     oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
74    
75     if (print)
76     System.out.print("SynchronousQueue ");
77     oneRun(new SynchronousQueue<Integer>(), pairs, iters);
78    
79     if (print)
80     System.out.print("SynchronousQueue(fair) ");
81     oneRun(new SynchronousQueue<Integer>(true), pairs, fairIters);
82    
83     if (print)
84     System.out.print("PriorityBlockingQueue ");
85     oneRun(new PriorityBlockingQueue<Integer>(), pairs, fairIters);
86    
87     if (print)
88     System.out.print("ArrayBlockingQueue(fair)");
89     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, fairIters);
90    
91     }
92    
93     static abstract class Stage implements Runnable {
94     final int iters;
95     final BlockingQueue<Integer> queue;
96     final CyclicBarrier barrier;
97     Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
98     queue = q;
99     barrier = b;
100     this.iters = iters;
101     }
102     }
103    
104     static class Producer extends Stage {
105     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
106     super(q, b, iters);
107     }
108    
109     public void run() {
110     try {
111     barrier.await();
112     int s = 0;
113     int l = hashCode();
114     for (int i = 0; i < iters; ++i) {
115     l = LoopHelpers.compute4(l);
116     queue.put(new Integer(l));
117     s += LoopHelpers.compute4(l);
118     }
119     addProducerSum(s);
120     barrier.await();
121     }
122     catch (Exception ie) {
123     ie.printStackTrace();
124     return;
125     }
126     }
127     }
128    
129     static class Consumer extends Stage {
130     Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
131     super(q, b, iters);
132     }
133    
134     public void run() {
135     try {
136     barrier.await();
137     int l = 0;
138     int s = 0;
139     for (int i = 0; i < iters; ++i) {
140     l = LoopHelpers.compute4(queue.take().intValue());
141     s += l;
142     }
143     addConsumerSum(s);
144     barrier.await();
145     }
146     catch (Exception ie) {
147     ie.printStackTrace();
148     return;
149     }
150     }
151    
152     }
153    
154     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
155     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
156     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
157     for (int i = 0; i < npairs; ++i) {
158     pool.execute(new Producer(q, barrier, iters));
159     pool.execute(new Consumer(q, barrier, iters));
160     }
161     barrier.await();
162     barrier.await();
163     long time = timer.getTime();
164     checkSum();
165     if (print)
166     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
167     }
168    
169     }