ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/ProducerConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/ProducerConsumerLoops.java (file contents):
Revision 1.2 by dl, Mon Feb 19 00:46:06 2007 UTC vs.
Revision 1.12 by jsr166, Sat Dec 31 19:50:57 2016 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7 < import java.util.concurrent.*;
7 > import java.util.Random;
8 > import java.util.concurrent.ArrayBlockingQueue;
9 > import java.util.concurrent.BlockingQueue;
10 > import java.util.concurrent.CyclicBarrier;
11 > import java.util.concurrent.ExecutorService;
12 > import java.util.concurrent.Executors;
13 > import java.util.concurrent.LinkedBlockingDeque;
14 > import java.util.concurrent.LinkedBlockingQueue;
15 > import java.util.concurrent.LinkedTransferQueue;
16 > import java.util.concurrent.Phaser;
17 > import java.util.concurrent.PriorityBlockingQueue;
18 > import java.util.concurrent.SynchronousQueue;
19  
20   public class ProducerConsumerLoops {
21 <    static final int CAPACITY =      100;
11 <
21 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
22      static final ExecutorService pool = Executors.newCachedThreadPool();
23      static boolean print = false;
24      static int producerSum;
# Line 26 | Line 36 | public class ProducerConsumerLoops {
36              throw new Error("CheckSum mismatch");
37      }
38  
39 +    // Number of elements passed around -- must be power of two
40 +    // Elements are reused from pool to minimize alloc impact
41 +    static final int POOL_SIZE = 1 << 7;
42 +    static final int POOL_MASK = POOL_SIZE-1;
43 +    static final Integer[] intPool = new Integer[POOL_SIZE];
44 +    static {
45 +        for (int i = 0; i < POOL_SIZE; ++i)
46 +            intPool[i] = Integer.valueOf(i);
47 +    }
48 +
49 +    // Number of puts by producers or takes by consumers
50 +    static final int ITERS = 1 << 20;
51 +
52 +    // max lag between a producer and consumer to avoid
53 +    // this becoming a GC test rather than queue test.
54 +    // Used only per-pair to lessen impact on queue sync
55 +    static final int LAG_MASK = (1 << 12) - 1;
56 +
57      public static void main(String[] args) throws Exception {
58 <        int maxPairs = 100;
31 <        int iters = 100000;
58 >        int maxPairs = NCPUS * 3 / 2;
59  
60 <        if (args.length > 0)
60 >        if (args.length > 0)
61              maxPairs = Integer.parseInt(args[0]);
62  
63 <        print = false;
37 <        System.out.println("Warmup...");
38 <        oneTest(1, 10000);
39 <        Thread.sleep(100);
40 <        oneTest(2, 10000);
41 <        Thread.sleep(100);
42 <        oneTest(2, 10000);
43 <        Thread.sleep(100);
63 >        warmup();
64          print = true;
65 <        
46 <        int k = 1;
47 <        for (int i = 1; i <= maxPairs;) {
65 >        for (int k = 1, i = 1; i <= maxPairs;) {
66              System.out.println("Pairs:" + i);
67 <            oneTest(i, iters);
50 <            Thread.sleep(100);
67 >            oneTest(i, ITERS);
68              if (i == k) {
69                  k = i << 1;
70                  i = i + (i >>> 1);
71 <            }
72 <            else
71 >            }
72 >            else
73                  i = k;
74          }
75          pool.shutdown();
76 <   }
76 >    }
77  
78 <    static void oneTest(int pairs, int iters) throws Exception {
79 <        int fairIters = iters/20;
78 >    static void warmup() throws Exception {
79 >        print = false;
80 >        System.out.print("Warmup ");
81 >        int it = 2000;
82 >        for (int j = 5; j > 0; --j) {
83 >            oneTest(j, it);
84 >            System.out.print(".");
85 >            it += 1000;
86 >        }
87 >        System.gc();
88 >        it = 20000;
89 >        for (int j = 5; j > 0; --j) {
90 >            oneTest(j, it);
91 >            System.out.print(".");
92 >            it += 10000;
93 >        }
94 >        System.gc();
95 >        System.out.println();
96 >    }
97 >
98 >    static void oneTest(int n, int iters) throws Exception {
99 >        int fairIters = iters/16;
100 >
101 >        Thread.sleep(100); // System.gc();
102          if (print)
103 <            System.out.print("ArrayBlockingQueue      ");
104 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
103 >            System.out.print("LinkedTransferQueue     ");
104 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
105  
106 +        Thread.sleep(100); // System.gc();
107          if (print)
108              System.out.print("LinkedBlockingQueue     ");
109 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
109 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
110 >
111 >        Thread.sleep(100); // System.gc();
112 >        if (print)
113 >            System.out.print("LinkedBlockingQueue(cap)");
114 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
115  
116 +        Thread.sleep(100); // System.gc();
117          if (print)
118              System.out.print("LinkedBlockingDeque     ");
119 <        oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
119 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
120  
121 +        Thread.sleep(100); // System.gc();
122 +        if (print)
123 +            System.out.print("ArrayBlockingQueue      ");
124 +        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
125 +
126 +        Thread.sleep(100); // System.gc();
127          if (print)
128              System.out.print("SynchronousQueue        ");
129 <        oneRun(new SynchronousQueue<Integer>(), pairs, iters);
129 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
130  
131 +        Thread.sleep(100); // System.gc();
132          if (print)
133              System.out.print("SynchronousQueue(fair)  ");
134 <        oneRun(new SynchronousQueue<Integer>(true), pairs, fairIters);
134 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
135 >
136 >        Thread.sleep(100); // System.gc();
137 >        if (print)
138 >            System.out.print("LinkedTransferQueue(xfer)");
139 >        oneRun(new LTQasSQ<Integer>(), n, iters);
140 >
141 >        Thread.sleep(100); // System.gc();
142 >        if (print)
143 >            System.out.print("LinkedTransferQueue(half)");
144 >        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
145  
146 +        Thread.sleep(100); // System.gc();
147          if (print)
148              System.out.print("PriorityBlockingQueue   ");
149 <        oneRun(new PriorityBlockingQueue<Integer>(), pairs, fairIters);
149 >        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
150  
151 +        Thread.sleep(100); // System.gc();
152          if (print)
153              System.out.print("ArrayBlockingQueue(fair)");
154 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, fairIters);
90 <
154 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
155      }
156 <    
157 <    static abstract class Stage implements Runnable {
156 >
157 >    abstract static class Stage implements Runnable {
158          final int iters;
159          final BlockingQueue<Integer> queue;
160          final CyclicBarrier barrier;
161 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
162 <            queue = q;
161 >        final Phaser lagPhaser;
162 >        Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
163 >            queue = q;
164              barrier = b;
165 +            lagPhaser = s;
166              this.iters = iters;
167          }
168      }
169  
170      static class Producer extends Stage {
171 <        Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
172 <            super(q, b, iters);
171 >        Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
172 >                 int iters) {
173 >            super(q, b, s, iters);
174          }
175  
176          public void run() {
177              try {
178                  barrier.await();
179 <                int s = 0;
180 <                int l = hashCode();
179 >                int ps = 0;
180 >                int r = hashCode();
181                  for (int i = 0; i < iters; ++i) {
182 <                    l = LoopHelpers.compute4(l);
183 <                    queue.put(new Integer(l));
184 <                    s += LoopHelpers.compute4(l);
182 >                    r = LoopHelpers.compute7(r);
183 >                    Integer v = intPool[r & POOL_MASK];
184 >                    int k = v.intValue();
185 >                    queue.put(v);
186 >                    ps += k;
187 >                    if ((i & LAG_MASK) == LAG_MASK)
188 >                        lagPhaser.arriveAndAwaitAdvance();
189                  }
190 <                addProducerSum(s);
190 >                addProducerSum(ps);
191                  barrier.await();
192              }
193 <            catch (Exception ie) {
194 <                ie.printStackTrace();
195 <                return;
193 >            catch (Exception ie) {
194 >                ie.printStackTrace();
195 >                return;
196              }
197          }
198      }
199  
200      static class Consumer extends Stage {
201 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
202 <            super(q, b, iters);
201 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
202 >                 int iters) {
203 >            super(q, b, s, iters);
204          }
205  
206          public void run() {
207              try {
208                  barrier.await();
209 <                int l = 0;
138 <                int s = 0;
209 >                int cs = 0;
210                  for (int i = 0; i < iters; ++i) {
211 <                    l = LoopHelpers.compute4(queue.take().intValue());
212 <                    s += l;
211 >                    Integer v = queue.take();
212 >                    int k = v.intValue();
213 >                    cs += k;
214 >                    if ((i & LAG_MASK) == LAG_MASK)
215 >                        lagPhaser.arriveAndAwaitAdvance();
216                  }
217 <                addConsumerSum(s);
217 >                addConsumerSum(cs);
218                  barrier.await();
219              }
220 <            catch (Exception ie) {
221 <                ie.printStackTrace();
222 <                return;
220 >            catch (Exception ie) {
221 >                ie.printStackTrace();
222 >                return;
223              }
224          }
225  
226      }
227  
228 <    static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
228 >    static void oneRun(BlockingQueue<Integer> q, int n, int iters) throws Exception {
229          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
230 <        CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
231 <        for (int i = 0; i < npairs; ++i) {
232 <            pool.execute(new Producer(q, barrier, iters));
233 <            pool.execute(new Consumer(q, barrier, iters));
230 >        CyclicBarrier barrier = new CyclicBarrier(n * 2 + 1, timer);
231 >        for (int i = 0; i < n; ++i) {
232 >            Phaser s = new Phaser(2);
233 >            pool.execute(new Producer(q, barrier, s, iters));
234 >            pool.execute(new Consumer(q, barrier, s, iters));
235          }
236          barrier.await();
237          barrier.await();
238          long time = timer.getTime();
239          checkSum();
240          if (print)
241 <            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
241 >            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * n)) + " ns per transfer");
242 >    }
243 >
244 >    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
245 >        LTQasSQ() { super(); }
246 >        public void put(T x) {
247 >            try { super.transfer(x);
248 >            } catch (InterruptedException ex) { throw new Error(); }
249 >        }
250      }
251  
252 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
253 +        int calls;
254 +        HalfSyncLTQ() { super(); }
255 +        public void put(T x) {
256 +            if ((++calls & 1) == 0)
257 +                super.put(x);
258 +            else {
259 +                try { super.transfer(x);
260 +                } catch (InterruptedException ex) {
261 +                    throw new Error();
262 +                }
263 +            }
264 +        }
265 +    }
266   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines