ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
Revision: 1.15
Committed: Sat Dec 31 21:34:47 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.14: +2 -2 lines
Log Message:
better error handling

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.concurrent.ArrayBlockingQueue;
8 import java.util.concurrent.CyclicBarrier;
9 import java.util.concurrent.BlockingQueue;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.LinkedBlockingDeque;
13 import java.util.concurrent.LinkedBlockingQueue;
14 import java.util.concurrent.LinkedTransferQueue;
15 import java.util.concurrent.Phaser;
16 import java.util.concurrent.PriorityBlockingQueue;
17 import java.util.concurrent.SynchronousQueue;
18 import java.util.concurrent.TimeUnit;
19
20 public class TimeoutProducerConsumerLoops {
21 static final int NCPUS = Runtime.getRuntime().availableProcessors();
22 static final ExecutorService pool = Executors.newCachedThreadPool();
23
24 // Number of elements passed around -- must be power of two
25 // Elements are reused from pool to minimize alloc impact
26 static final int POOL_SIZE = 1 << 8;
27 static final int POOL_MASK = POOL_SIZE-1;
28 static final Integer[] intPool = new Integer[POOL_SIZE];
29 static {
30 for (int i = 0; i < POOL_SIZE; ++i)
31 intPool[i] = Integer.valueOf(i);
32 }
33
34 // max lag between a producer and consumer to avoid
35 // this becoming a GC test rather than queue test.
36 // Used only per-pair to lessen impact on queue sync
37 static final int LAG_MASK = (1 << 12) - 1;
38
39 static boolean print = false;
40 static int producerSum;
41 static int consumerSum;
42 static synchronized void addProducerSum(int x) {
43 producerSum += x;
44 }
45
46 static synchronized void addConsumerSum(int x) {
47 consumerSum += x;
48 }
49
50 static synchronized void checkSum() {
51 if (producerSum != consumerSum) {
52 throw new Error("CheckSum mismatch");
53 }
54 }
55
56 public static void main(String[] args) throws Exception {
57 int maxPairs = NCPUS * 3 / 2;
58 int iters = 1000000;
59
60 if (args.length > 0)
61 maxPairs = Integer.parseInt(args[0]);
62
63 print = true;
64 for (int k = 1, i = 1; i <= maxPairs;) {
65 System.out.println("Pairs:" + i);
66 oneTest(i, iters);
67 Thread.sleep(100);
68 if (i == k) {
69 k = i << 1;
70 i = i + (i >>> 1);
71 }
72 else
73 i = k;
74 }
75 pool.shutdown();
76 }
77
78 static void oneTest(int n, int iters) throws Exception {
79 if (print)
80 System.out.print("LinkedTransferQueue ");
81 oneRun(new LinkedTransferQueue<Integer>(), n, iters);
82
83 if (print)
84 System.out.print("LinkedTransferQueue(xfer)");
85 oneRun(new LTQasSQ<Integer>(), n, iters);
86
87 if (print)
88 System.out.print("LinkedBlockingQueue ");
89 oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
90
91 if (print)
92 System.out.print("LinkedBlockingQueue(cap) ");
93 oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
94
95 if (print)
96 System.out.print("ArrayBlockingQueue(cap) ");
97 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
98
99 if (print)
100 System.out.print("LinkedBlockingDeque ");
101 oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
102
103 if (print)
104 System.out.print("SynchronousQueue ");
105 oneRun(new SynchronousQueue<Integer>(), n, iters);
106
107 if (print)
108 System.out.print("SynchronousQueue(fair) ");
109 oneRun(new SynchronousQueue<Integer>(true), n, iters);
110
111 if (print)
112 System.out.print("PriorityBlockingQueue ");
113 oneRun(new PriorityBlockingQueue<Integer>(), n, iters / 16);
114
115 if (print)
116 System.out.print("ArrayBlockingQueue(fair) ");
117 oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, iters/16);
118 }
119
120 abstract static class Stage implements Runnable {
121 final int iters;
122 final BlockingQueue<Integer> queue;
123 final CyclicBarrier barrier;
124 final Phaser lagPhaser;
125 Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
126 queue = q;
127 barrier = b;
128 lagPhaser = s;
129 this.iters = iters;
130 }
131 }
132
133 static class Producer extends Stage {
134 Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
135 super(q, b, s, iters);
136 }
137
138 public void run() {
139 try {
140 barrier.await();
141 int s = 0;
142 int l = hashCode();
143 int i = 0;
144 long timeout = 1000;
145 while (i < iters) {
146 l = LoopHelpers.compute4(l);
147 Integer v = intPool[l & POOL_MASK];
148 if (queue.offer(v, timeout, TimeUnit.NANOSECONDS)) {
149 s += LoopHelpers.compute4(v.intValue());
150 ++i;
151 if (timeout > 1)
152 timeout--;
153 if ((i & LAG_MASK) == LAG_MASK)
154 lagPhaser.arriveAndAwaitAdvance();
155 }
156 else
157 timeout++;
158 }
159 addProducerSum(s);
160 barrier.await();
161 }
162 catch (Exception ie) {
163 ie.printStackTrace();
164 return;
165 }
166 }
167 }
168
169 static class Consumer extends Stage {
170 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
171 super(q, b, s, iters);
172 }
173
174 public void run() {
175 try {
176 barrier.await();
177 int l = 0;
178 int s = 0;
179 int i = 0;
180 long timeout = 1000;
181 while (i < iters) {
182 Integer e = queue.poll(timeout,
183 TimeUnit.NANOSECONDS);
184 if (e != null) {
185 l = LoopHelpers.compute4(e.intValue());
186 s += l;
187 ++i;
188 if (timeout > 1)
189 --timeout;
190 if ((i & LAG_MASK) == LAG_MASK)
191 lagPhaser.arriveAndAwaitAdvance();
192 }
193 else
194 ++timeout;
195 }
196 addConsumerSum(s);
197 barrier.await();
198 }
199 catch (Exception ie) {
200 ie.printStackTrace();
201 return;
202 }
203 }
204
205 }
206
207 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
208 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
209 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
210 for (int i = 0; i < npairs; ++i) {
211 Phaser s = new Phaser(2);
212 pool.execute(new Producer(q, barrier, s, iters));
213 pool.execute(new Consumer(q, barrier, s, iters));
214 }
215 barrier.await();
216 barrier.await();
217 long time = timer.getTime();
218 checkSum();
219 q.clear();
220 if (print)
221 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
222 }
223
224 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
225 LTQasSQ() { super(); }
226 public void put(T x) {
227 try { super.transfer(x); }
228 catch (InterruptedException ex) { throw new Error(ex); }
229 }
230
231 public boolean offer(T x, long timeout, TimeUnit unit) {
232 return super.offer(x, timeout, unit);
233 }
234
235 }
236
237 }