ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
Revision: 1.2
Committed: Mon Feb 19 00:46:06 2007 UTC (17 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.1: +2 -6 lines
Log Message:
Uniform headers

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 import java.util.concurrent.*;
8
9 public class TimeoutProducerConsumerLoops {
10 static final int CAPACITY = 100;
11 static final ExecutorService pool = Executors.newCachedThreadPool();
12 static boolean print = false;
13 static int producerSum;
14 static int consumerSum;
15 static synchronized void addProducerSum(int x) {
16 producerSum += x;
17 }
18
19 static synchronized void addConsumerSum(int x) {
20 consumerSum += x;
21 }
22
23 static synchronized void checkSum() {
24 if (producerSum != consumerSum) {
25 throw new Error("CheckSum mismatch");
26 }
27 }
28
29 public static void main(String[] args) throws Exception {
30 int maxPairs = 100;
31 int iters = 100000;
32
33 if (args.length > 0)
34 maxPairs = Integer.parseInt(args[0]);
35
36 print = false;
37 System.out.println("Warmup...");
38 oneTest(1, 10000);
39 Thread.sleep(100);
40 oneTest(2, 10000);
41 Thread.sleep(100);
42 oneTest(2, 10000);
43 Thread.sleep(100);
44 print = true;
45
46 int k = 1;
47 for (int i = 1; i <= maxPairs;) {
48 System.out.println("Pairs:" + i);
49 oneTest(i, iters);
50 Thread.sleep(100);
51 if (i == k) {
52 k = i << 1;
53 i = i + (i >>> 1);
54 }
55 else
56 i = k;
57 }
58 pool.shutdown();
59 }
60
61 static void oneTest(int pairs, int iters) throws Exception {
62 int fairIters = iters/20;
63 if (print)
64 System.out.print("ArrayBlockingQueue ");
65 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
66
67 if (print)
68 System.out.print("LinkedBlockingQueue ");
69 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
70
71 if (print)
72 System.out.print("LinkedBlockingDeque ");
73 oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
74
75 if (print)
76 System.out.print("SynchronousQueue ");
77 oneRun(new SynchronousQueue<Integer>(), pairs, iters);
78
79 if (print)
80 System.out.print("SynchronousQueue(fair) ");
81 oneRun(new SynchronousQueue<Integer>(true), pairs, fairIters);
82
83 if (print)
84 System.out.print("PriorityBlockingQueue ");
85 oneRun(new PriorityBlockingQueue<Integer>(), pairs, fairIters);
86
87 if (print)
88 System.out.print("ArrayBlockingQueue(fair)");
89 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, fairIters);
90
91 }
92
93 static abstract class Stage implements Runnable {
94 final int iters;
95 final BlockingQueue<Integer> queue;
96 final CyclicBarrier barrier;
97 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
98 queue = q;
99 barrier = b;
100 this.iters = iters;
101 }
102 }
103
104 static class Producer extends Stage {
105 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
106 super(q, b, iters);
107 }
108
109 public void run() {
110 try {
111 barrier.await();
112 int s = 0;
113 int l = hashCode();
114 int i = 0;
115 long timeout = 1;
116 while (i < iters) {
117 l = LoopHelpers.compute4(l);
118 if (queue.offer(new Integer(l),
119 timeout, TimeUnit.NANOSECONDS)) {
120 s += LoopHelpers.compute4(l);
121 ++i;
122 if (timeout > 1)
123 timeout--;
124 }
125 else
126 timeout++;
127 }
128 addProducerSum(s);
129 barrier.await();
130 }
131 catch (Exception ie) {
132 ie.printStackTrace();
133 return;
134 }
135 }
136 }
137
138 static class Consumer extends Stage {
139 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
140 super(q, b, iters);
141 }
142
143 public void run() {
144 try {
145 barrier.await();
146 int l = 0;
147 int s = 0;
148 int i = 0;
149 long timeout = 1;
150 while (i < iters) {
151 Integer e = queue.poll(timeout,
152 TimeUnit.NANOSECONDS);
153 if (e != null) {
154 l = LoopHelpers.compute4(e.intValue());
155 s += l;
156 ++i;
157 if (timeout > 1)
158 --timeout;
159 }
160 else
161 ++timeout;
162 }
163 addConsumerSum(s);
164 barrier.await();
165 }
166 catch (Exception ie) {
167 ie.printStackTrace();
168 return;
169 }
170 }
171
172 }
173
174 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
175 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
176 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
177 for (int i = 0; i < npairs; ++i) {
178 pool.execute(new Producer(q, barrier, iters));
179 pool.execute(new Consumer(q, barrier, iters));
180 }
181 barrier.await();
182 barrier.await();
183 long time = timer.getTime();
184 checkSum();
185 if (print)
186 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
187 }
188
189 }