ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
Revision: 1.2
Committed: Mon Feb 19 00:46:06 2007 UTC (17 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.1: +2 -6 lines
Log Message:
Uniform headers

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     import java.util.concurrent.*;
8    
9     public class TimeoutProducerConsumerLoops {
10     static final int CAPACITY = 100;
11     static final ExecutorService pool = Executors.newCachedThreadPool();
12     static boolean print = false;
13     static int producerSum;
14     static int consumerSum;
15     static synchronized void addProducerSum(int x) {
16     producerSum += x;
17     }
18    
19     static synchronized void addConsumerSum(int x) {
20     consumerSum += x;
21     }
22    
23     static synchronized void checkSum() {
24     if (producerSum != consumerSum) {
25     throw new Error("CheckSum mismatch");
26     }
27     }
28    
29     public static void main(String[] args) throws Exception {
30     int maxPairs = 100;
31     int iters = 100000;
32    
33     if (args.length > 0)
34     maxPairs = Integer.parseInt(args[0]);
35    
36     print = false;
37     System.out.println("Warmup...");
38     oneTest(1, 10000);
39     Thread.sleep(100);
40     oneTest(2, 10000);
41     Thread.sleep(100);
42     oneTest(2, 10000);
43     Thread.sleep(100);
44     print = true;
45    
46     int k = 1;
47     for (int i = 1; i <= maxPairs;) {
48     System.out.println("Pairs:" + i);
49     oneTest(i, iters);
50     Thread.sleep(100);
51     if (i == k) {
52     k = i << 1;
53     i = i + (i >>> 1);
54     }
55     else
56     i = k;
57     }
58     pool.shutdown();
59     }
60    
61     static void oneTest(int pairs, int iters) throws Exception {
62     int fairIters = iters/20;
63     if (print)
64     System.out.print("ArrayBlockingQueue ");
65     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
66    
67     if (print)
68     System.out.print("LinkedBlockingQueue ");
69     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
70    
71     if (print)
72     System.out.print("LinkedBlockingDeque ");
73     oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
74    
75     if (print)
76     System.out.print("SynchronousQueue ");
77     oneRun(new SynchronousQueue<Integer>(), pairs, iters);
78    
79     if (print)
80     System.out.print("SynchronousQueue(fair) ");
81     oneRun(new SynchronousQueue<Integer>(true), pairs, fairIters);
82    
83     if (print)
84     System.out.print("PriorityBlockingQueue ");
85     oneRun(new PriorityBlockingQueue<Integer>(), pairs, fairIters);
86    
87     if (print)
88     System.out.print("ArrayBlockingQueue(fair)");
89     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, fairIters);
90    
91     }
92    
93     static abstract class Stage implements Runnable {
94     final int iters;
95     final BlockingQueue<Integer> queue;
96     final CyclicBarrier barrier;
97     Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
98     queue = q;
99     barrier = b;
100     this.iters = iters;
101     }
102     }
103    
104     static class Producer extends Stage {
105     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
106     super(q, b, iters);
107     }
108    
109     public void run() {
110     try {
111     barrier.await();
112     int s = 0;
113     int l = hashCode();
114     int i = 0;
115     long timeout = 1;
116     while (i < iters) {
117     l = LoopHelpers.compute4(l);
118     if (queue.offer(new Integer(l),
119     timeout, TimeUnit.NANOSECONDS)) {
120     s += LoopHelpers.compute4(l);
121     ++i;
122     if (timeout > 1)
123     timeout--;
124     }
125     else
126     timeout++;
127     }
128     addProducerSum(s);
129     barrier.await();
130     }
131     catch (Exception ie) {
132     ie.printStackTrace();
133     return;
134     }
135     }
136     }
137    
138     static class Consumer extends Stage {
139     Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
140     super(q, b, iters);
141     }
142    
143     public void run() {
144     try {
145     barrier.await();
146     int l = 0;
147     int s = 0;
148     int i = 0;
149     long timeout = 1;
150     while (i < iters) {
151     Integer e = queue.poll(timeout,
152     TimeUnit.NANOSECONDS);
153     if (e != null) {
154     l = LoopHelpers.compute4(e.intValue());
155     s += l;
156     ++i;
157     if (timeout > 1)
158     --timeout;
159     }
160     else
161     ++timeout;
162     }
163     addConsumerSum(s);
164     barrier.await();
165     }
166     catch (Exception ie) {
167     ie.printStackTrace();
168     return;
169     }
170     }
171    
172     }
173    
174     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
175     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
176     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
177     for (int i = 0; i < npairs; ++i) {
178     pool.execute(new Producer(q, barrier, iters));
179     pool.execute(new Consumer(q, barrier, iters));
180     }
181     barrier.await();
182     barrier.await();
183     long time = timer.getTime();
184     checkSum();
185     if (print)
186     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
187     }
188    
189     }