ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
Revision: 1.3
Committed: Fri Oct 23 19:57:07 2009 UTC (14 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.2: +62 -33 lines
Log Message:
Update misc tests for JDK7

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 import java.util.concurrent.*;
8 //import jsr166y.*;
9
10 public class TimeoutProducerConsumerLoops {
11 static final int NCPUS = Runtime.getRuntime().availableProcessors();
12 static final ExecutorService pool = Executors.newCachedThreadPool();
13
14 // Number of elements passed around -- must be power of two
15 // Elements are reused from pool to minimize alloc impact
16 static final int POOL_SIZE = 1 << 8;
17 static final int POOL_MASK = POOL_SIZE-1;
18 static final Integer[] intPool = new Integer[POOL_SIZE];
19 static {
20 for (int i = 0; i < POOL_SIZE; ++i)
21 intPool[i] = Integer.valueOf(i);
22 }
23
24
25 static boolean print = false;
26 static int producerSum;
27 static int consumerSum;
28 static synchronized void addProducerSum(int x) {
29 producerSum += x;
30 }
31
32 static synchronized void addConsumerSum(int x) {
33 consumerSum += x;
34 }
35
36 static synchronized void checkSum() {
37 if (producerSum != consumerSum) {
38 throw new Error("CheckSum mismatch");
39 }
40 }
41
42 public static void main(String[] args) throws Exception {
43 int maxPairs = NCPUS * 3 / 2;
44 int iters = 1000000;
45
46 if (args.length > 0)
47 maxPairs = Integer.parseInt(args[0]);
48
49 print = true;
50 int k = 1;
51 for (int i = 1; i <= maxPairs;) {
52 System.out.println("Pairs:" + i);
53 oneTest(i, iters);
54 Thread.sleep(100);
55 if (i == k) {
56 k = i << 1;
57 i = i + (i >>> 1);
58 }
59 else
60 i = k;
61 }
62 pool.shutdown();
63 }
64
65 static void oneTest(int n, int iters) throws Exception {
66 if (print)
67 System.out.print("LinkedTransferQueue ");
68 oneRun(new LinkedTransferQueue<Integer>(), n, iters);
69
70 if (print)
71 System.out.print("LinkedTransferQueue(xfer)");
72 oneRun(new LTQasSQ<Integer>(), n, iters);
73
74 if (print)
75 System.out.print("LinkedBlockingQueue ");
76 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
77
78 if (print)
79 System.out.print("LinkedBlockingQueue(cap) ");
80 oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
81
82 if (print)
83 System.out.print("ArrayBlockingQueue(cap) ");
84 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
85
86 if (print)
87 System.out.print("LinkedBlockingDeque ");
88 oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
89
90 if (print)
91 System.out.print("SynchronousQueue ");
92 oneRun(new SynchronousQueue<Integer>(), n, iters);
93
94 if (print)
95 System.out.print("SynchronousQueue(fair) ");
96 oneRun(new SynchronousQueue<Integer>(true), n, iters);
97
98 if (print)
99 System.out.print("PriorityBlockingQueue ");
100 oneRun(new PriorityBlockingQueue<Integer>(), n, iters / 16);
101
102 if (print)
103 System.out.print("ArrayBlockingQueue(fair) ");
104 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, iters/16);
105
106 }
107
108 static abstract class Stage implements Runnable {
109 final int iters;
110 final BlockingQueue<Integer> queue;
111 final CyclicBarrier barrier;
112 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
113 queue = q;
114 barrier = b;
115 this.iters = iters;
116 }
117 }
118
119 static class Producer extends Stage {
120 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
121 super(q, b, iters);
122 }
123
124 public void run() {
125 try {
126 barrier.await();
127 int s = 0;
128 int l = hashCode();
129 int i = 0;
130 long timeout = 1000;
131 while (i < iters) {
132 l = LoopHelpers.compute4(l);
133 Integer v = intPool[l & POOL_MASK];
134 if (queue.offer(v, timeout, TimeUnit.NANOSECONDS)) {
135 s += LoopHelpers.compute4(v.intValue());
136 ++i;
137 if (timeout > 1)
138 timeout--;
139 }
140 else
141 timeout++;
142 }
143 addProducerSum(s);
144 barrier.await();
145 }
146 catch (Exception ie) {
147 ie.printStackTrace();
148 return;
149 }
150 }
151 }
152
153 static class Consumer extends Stage {
154 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
155 super(q, b, iters);
156 }
157
158 public void run() {
159 try {
160 barrier.await();
161 int l = 0;
162 int s = 0;
163 int i = 0;
164 long timeout = 1000;
165 while (i < iters) {
166 Integer e = queue.poll(timeout,
167 TimeUnit.NANOSECONDS);
168 if (e != null) {
169 l = LoopHelpers.compute4(e.intValue());
170 s += l;
171 ++i;
172 if (timeout > 1)
173 --timeout;
174 }
175 else
176 ++timeout;
177 }
178 addConsumerSum(s);
179 barrier.await();
180 }
181 catch (Exception ie) {
182 ie.printStackTrace();
183 return;
184 }
185 }
186
187 }
188
189 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
190 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
191 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
192 for (int i = 0; i < npairs; ++i) {
193 pool.execute(new Producer(q, barrier, iters));
194 pool.execute(new Consumer(q, barrier, iters));
195 }
196 barrier.await();
197 barrier.await();
198 long time = timer.getTime();
199 checkSum();
200 q.clear();
201 if (print)
202 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
203 }
204
205 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
206 LTQasSQ() { super(); }
207 public void put(T x) {
208 try { super.transfer(x);
209 } catch (InterruptedException ex) { throw new Error(); }
210 }
211
212 public boolean offer(T x, long timeout, TimeUnit unit) {
213 return super.offer(x, timeout, unit);
214 }
215
216 }
217
218 }