ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
Revision: 1.14
Committed: Sat Dec 31 19:25:33 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +12 -1 lines
Log Message:
organize imports

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4 jsr166 1.9 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7 jsr166 1.14 import java.util.concurrent.ArrayBlockingQueue;
8     import java.util.concurrent.CyclicBarrier;
9     import java.util.concurrent.BlockingQueue;
10     import java.util.concurrent.ExecutorService;
11     import java.util.concurrent.Executors;
12     import java.util.concurrent.LinkedBlockingDeque;
13     import java.util.concurrent.LinkedBlockingQueue;
14     import java.util.concurrent.LinkedTransferQueue;
15     import java.util.concurrent.Phaser;
16     import java.util.concurrent.PriorityBlockingQueue;
17     import java.util.concurrent.SynchronousQueue;
18     import java.util.concurrent.TimeUnit;
19 dl 1.5
20 dl 1.1 public class TimeoutProducerConsumerLoops {
21 dl 1.3 static final int NCPUS = Runtime.getRuntime().availableProcessors();
22 dl 1.1 static final ExecutorService pool = Executors.newCachedThreadPool();
23 dl 1.3
24     // Number of elements passed around -- must be power of two
25     // Elements are reused from pool to minimize alloc impact
26     static final int POOL_SIZE = 1 << 8;
27     static final int POOL_MASK = POOL_SIZE-1;
28     static final Integer[] intPool = new Integer[POOL_SIZE];
29     static {
30 jsr166 1.6 for (int i = 0; i < POOL_SIZE; ++i)
31 dl 1.3 intPool[i] = Integer.valueOf(i);
32     }
33    
34 jsr166 1.6 // max lag between a producer and consumer to avoid
35 dl 1.5 // this becoming a GC test rather than queue test.
36     // Used only per-pair to lessen impact on queue sync
37     static final int LAG_MASK = (1 << 12) - 1;
38 dl 1.3
39 dl 1.1 static boolean print = false;
40     static int producerSum;
41     static int consumerSum;
42     static synchronized void addProducerSum(int x) {
43     producerSum += x;
44     }
45    
46     static synchronized void addConsumerSum(int x) {
47     consumerSum += x;
48     }
49    
50     static synchronized void checkSum() {
51     if (producerSum != consumerSum) {
52     throw new Error("CheckSum mismatch");
53     }
54     }
55    
56     public static void main(String[] args) throws Exception {
57 dl 1.3 int maxPairs = NCPUS * 3 / 2;
58     int iters = 1000000;
59 dl 1.1
60 jsr166 1.6 if (args.length > 0)
61 dl 1.1 maxPairs = Integer.parseInt(args[0]);
62    
63     print = true;
64 jsr166 1.11 for (int k = 1, i = 1; i <= maxPairs;) {
65 dl 1.1 System.out.println("Pairs:" + i);
66     oneTest(i, iters);
67     Thread.sleep(100);
68     if (i == k) {
69     k = i << 1;
70     i = i + (i >>> 1);
71 jsr166 1.6 }
72     else
73 dl 1.1 i = k;
74     }
75     pool.shutdown();
76 jsr166 1.10 }
77 dl 1.1
78 dl 1.3 static void oneTest(int n, int iters) throws Exception {
79     if (print)
80     System.out.print("LinkedTransferQueue ");
81     oneRun(new LinkedTransferQueue<Integer>(), n, iters);
82    
83     if (print)
84     System.out.print("LinkedTransferQueue(xfer)");
85     oneRun(new LTQasSQ<Integer>(), n, iters);
86    
87 dl 1.1 if (print)
88 dl 1.3 System.out.print("LinkedBlockingQueue ");
89     oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
90 dl 1.1
91     if (print)
92 dl 1.3 System.out.print("LinkedBlockingQueue(cap) ");
93     oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
94 dl 1.1
95     if (print)
96 dl 1.3 System.out.print("ArrayBlockingQueue(cap) ");
97     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
98 dl 1.1
99     if (print)
100 dl 1.3 System.out.print("LinkedBlockingDeque ");
101     oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
102 dl 1.1
103     if (print)
104 dl 1.3 System.out.print("SynchronousQueue ");
105     oneRun(new SynchronousQueue<Integer>(), n, iters);
106 dl 1.1
107     if (print)
108 dl 1.3 System.out.print("SynchronousQueue(fair) ");
109     oneRun(new SynchronousQueue<Integer>(true), n, iters);
110 dl 1.1
111     if (print)
112 dl 1.3 System.out.print("PriorityBlockingQueue ");
113     oneRun(new PriorityBlockingQueue<Integer>(), n, iters / 16);
114    
115     if (print)
116     System.out.print("ArrayBlockingQueue(fair) ");
117     oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, iters/16);
118 dl 1.1 }
119 jsr166 1.6
120 jsr166 1.8 abstract static class Stage implements Runnable {
121 dl 1.1 final int iters;
122     final BlockingQueue<Integer> queue;
123     final CyclicBarrier barrier;
124 dl 1.5 final Phaser lagPhaser;
125 jsr166 1.7 Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
126 jsr166 1.6 queue = q;
127 dl 1.1 barrier = b;
128 dl 1.5 lagPhaser = s;
129 dl 1.1 this.iters = iters;
130     }
131     }
132    
133     static class Producer extends Stage {
134 dl 1.5 Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
135     super(q, b, s, iters);
136 dl 1.1 }
137    
138     public void run() {
139     try {
140     barrier.await();
141     int s = 0;
142     int l = hashCode();
143     int i = 0;
144 dl 1.3 long timeout = 1000;
145 dl 1.1 while (i < iters) {
146     l = LoopHelpers.compute4(l);
147 dl 1.3 Integer v = intPool[l & POOL_MASK];
148     if (queue.offer(v, timeout, TimeUnit.NANOSECONDS)) {
149     s += LoopHelpers.compute4(v.intValue());
150 dl 1.1 ++i;
151     if (timeout > 1)
152     timeout--;
153 dl 1.5 if ((i & LAG_MASK) == LAG_MASK)
154     lagPhaser.arriveAndAwaitAdvance();
155 dl 1.1 }
156     else
157     timeout++;
158     }
159     addProducerSum(s);
160     barrier.await();
161     }
162 jsr166 1.6 catch (Exception ie) {
163     ie.printStackTrace();
164     return;
165 dl 1.1 }
166     }
167     }
168    
169     static class Consumer extends Stage {
170 jsr166 1.6 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
171 dl 1.5 super(q, b, s, iters);
172 dl 1.1 }
173    
174     public void run() {
175     try {
176     barrier.await();
177     int l = 0;
178     int s = 0;
179     int i = 0;
180 dl 1.3 long timeout = 1000;
181 dl 1.1 while (i < iters) {
182 jsr166 1.6 Integer e = queue.poll(timeout,
183 dl 1.1 TimeUnit.NANOSECONDS);
184     if (e != null) {
185     l = LoopHelpers.compute4(e.intValue());
186     s += l;
187     ++i;
188     if (timeout > 1)
189     --timeout;
190 dl 1.5 if ((i & LAG_MASK) == LAG_MASK)
191     lagPhaser.arriveAndAwaitAdvance();
192 dl 1.1 }
193     else
194     ++timeout;
195     }
196     addConsumerSum(s);
197     barrier.await();
198     }
199 jsr166 1.6 catch (Exception ie) {
200     ie.printStackTrace();
201     return;
202 dl 1.1 }
203     }
204    
205     }
206    
207     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
208     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
209     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
210     for (int i = 0; i < npairs; ++i) {
211 dl 1.5 Phaser s = new Phaser(2);
212     pool.execute(new Producer(q, barrier, s, iters));
213     pool.execute(new Consumer(q, barrier, s, iters));
214 dl 1.1 }
215     barrier.await();
216     barrier.await();
217     long time = timer.getTime();
218     checkSum();
219 dl 1.3 q.clear();
220 dl 1.1 if (print)
221     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
222     }
223    
224 dl 1.3 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
225     LTQasSQ() { super(); }
226     public void put(T x) {
227 jsr166 1.6 try { super.transfer(x);
228 dl 1.3 } catch (InterruptedException ex) { throw new Error(); }
229     }
230    
231     public boolean offer(T x, long timeout, TimeUnit unit) {
232 jsr166 1.6 return super.offer(x, timeout, unit);
233 dl 1.3 }
234    
235     }
236    
237 dl 1.1 }