ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java (file contents):
Revision 1.1 by dl, Mon May 2 19:19:38 2005 UTC vs.
Revision 1.12 by jsr166, Sat Dec 31 19:40:49 2016 UTC

# Line 1 | Line 1
1   /*
2 * @test
3 * @synopsis  multiple producers and single consumer using blocking queues
4 */
5 /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3 < * Expert Group and released to the public domain. Use, modify, and
4 < * redistribute this code in any way without acknowledgement.
3 > * Expert Group and released to the public domain, as explained at
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7 < import java.util.concurrent.*;
7 > import java.util.Random;
8 > import java.util.concurrent.ArrayBlockingQueue;
9 > import java.util.concurrent.BlockingQueue;
10 > import java.util.concurrent.CyclicBarrier;
11 > import java.util.concurrent.ExecutorService;
12 > import java.util.concurrent.Executors;
13 > import java.util.concurrent.LinkedBlockingDeque;
14 > import java.util.concurrent.LinkedBlockingQueue;
15 > import java.util.concurrent.LinkedTransferQueue;
16 > import java.util.concurrent.Phaser;
17 > import java.util.concurrent.PriorityBlockingQueue;
18 > import java.util.concurrent.SynchronousQueue;
19  
20   public class MultipleProducersSingleConsumerLoops {
21 <    static final int CAPACITY =      100;
21 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
22 >    static final Random rng = new Random();
23      static final ExecutorService pool = Executors.newCachedThreadPool();
24      static boolean print = false;
25      static int producerSum;
26      static int consumerSum;
19
27      static synchronized void addProducerSum(int x) {
28          producerSum += x;
29      }
# Line 30 | Line 37 | public class MultipleProducersSingleCons
37              throw new Error("CheckSum mismatch");
38      }
39  
40 +    // Number of elements passed around -- must be power of two
41 +    // Elements are reused from pool to minimize alloc impact
42 +    static final int POOL_SIZE = 1 << 8;
43 +    static final int POOL_MASK = POOL_SIZE-1;
44 +    static final Integer[] intPool = new Integer[POOL_SIZE];
45 +    static {
46 +        for (int i = 0; i < POOL_SIZE; ++i)
47 +            intPool[i] = Integer.valueOf(i);
48 +    }
49 +
50 +    // Number of puts by producers or takes by consumers
51 +    static final int ITERS = 1 << 20;
52 +
53 +    // max lag between a producer and consumer to avoid
54 +    // this becoming a GC test rather than queue test.
55 +    static final int LAG = (1 << 12);
56 +    static final int LAG_MASK = LAG - 1;
57 +
58      public static void main(String[] args) throws Exception {
59 <        int maxProducers = 100;
35 <        int iters = 100000;
59 >        int maxn = 12; // NCPUS * 3 / 2;
60  
61 <        if (args.length > 0)
62 <            maxProducers = Integer.parseInt(args[0]);
61 >        if (args.length > 0)
62 >            maxn = Integer.parseInt(args[0]);
63  
64 <        print = false;
41 <        System.out.println("Warmup...");
42 <        oneTest(1, 10000);
43 <        Thread.sleep(100);
44 <        oneTest(2, 10000);
45 <        Thread.sleep(100);
64 >        warmup();
65          print = true;
66 <        
48 <        for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
66 >        for (int k = 1, i = 1; i <= maxn;) {
67              System.out.println("Producers:" + i);
68 <            oneTest(i, iters);
69 <            Thread.sleep(100);
68 >            oneTest(i, ITERS);
69 >            if (i == k) {
70 >                k = i << 1;
71 >                i = i + (i >>> 1);
72 >            }
73 >            else
74 >                i = k;
75          }
76          pool.shutdown();
77 <   }
77 >    }
78  
79 <    static void oneTest(int producers, int iters) throws Exception {
79 >    static void warmup() throws Exception {
80 >        print = false;
81 >        System.out.print("Warmup ");
82 >        int it = 2000;
83 >        for (int j = 5; j > 0; --j) {
84 >            oneTest(j, it);
85 >            System.out.print(".");
86 >            it += 1000;
87 >        }
88 >        System.gc();
89 >        it = 20000;
90 >        for (int j = 5; j > 0; --j) {
91 >            oneTest(j, it);
92 >            System.out.print(".");
93 >            it += 10000;
94 >        }
95 >        System.gc();
96 >        System.out.println();
97 >    }
98 >
99 >    static void oneTest(int n, int iters) throws Exception {
100 >        int fairIters = iters/16;
101 >
102 >        Thread.sleep(100); // System.gc();
103          if (print)
104 <            System.out.print("ArrayBlockingQueue      ");
105 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
104 >            System.out.print("LinkedTransferQueue     ");
105 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
106  
107 +        Thread.sleep(100); // System.gc();
108          if (print)
109              System.out.print("LinkedBlockingQueue     ");
110 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
110 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
111 >
112 >        Thread.sleep(100); // System.gc();
113 >        if (print)
114 >            System.out.print("LinkedBlockingQueue(cap)");
115 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
116 >
117 >        Thread.sleep(100); // System.gc();
118 >        if (print)
119 >            System.out.print("LinkedBlockingDeque     ");
120 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
121  
122 <        // Don't run PBQ since can legitimately run out of memory
123 <        //        if (print)
124 <        //            System.out.print("PriorityBlockingQueue   ");
125 <        //        oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
122 >        Thread.sleep(100); // System.gc();
123 >        if (print)
124 >            System.out.print("ArrayBlockingQueue      ");
125 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
126  
127 +        Thread.sleep(100); // System.gc();
128          if (print)
129              System.out.print("SynchronousQueue        ");
130 <        oneRun(new SynchronousQueue<Integer>(), producers, iters);
130 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
131 >
132 >        Thread.sleep(100); // System.gc();
133 >        if (print)
134 >            System.out.print("SynchronousQueue(fair)  ");
135 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
136 >
137 >        Thread.sleep(100); // System.gc();
138 >        if (print)
139 >            System.out.print("LinkedTransferQueue(xfer)");
140 >        oneRun(new LTQasSQ<Integer>(), n, iters);
141 >
142 >        Thread.sleep(100); // System.gc();
143 >        if (print)
144 >            System.out.print("LinkedTransferQueue(half)");
145 >        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
146 >
147 >        Thread.sleep(100); // System.gc();
148 >        if (print)
149 >            System.out.print("PriorityBlockingQueue   ");
150 >        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
151  
152 +        Thread.sleep(100); // System.gc();
153          if (print)
154              System.out.print("ArrayBlockingQueue(fair)");
155 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters/10);
155 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
156      }
157 <    
158 <    static abstract class Stage implements Runnable {
157 >
158 >    abstract static class Stage implements Runnable {
159          final int iters;
160          final BlockingQueue<Integer> queue;
161          final CyclicBarrier barrier;
162 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
163 <            queue = q;
162 >        final Phaser lagPhaser;
163 >        final int lag;
164 >        Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
165 >              int iters, int lag) {
166 >            queue = q;
167              barrier = b;
168 +            lagPhaser = s;
169              this.iters = iters;
170 +            this.lag = lag;
171          }
172      }
173  
174      static class Producer extends Stage {
175 <        Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
176 <            super(q, b, iters);
175 >        Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
176 >                 int iters, int lag) {
177 >            super(q, b, s, iters, lag);
178          }
179  
180          public void run() {
181              try {
182                  barrier.await();
183 <                int s = 0;
184 <                int l = hashCode();
183 >                int ps = 0;
184 >                int r = hashCode();
185 >                int j = 0;
186                  for (int i = 0; i < iters; ++i) {
187 <                    l = LoopHelpers.compute1(l);
188 <                    l = LoopHelpers.compute2(l);
189 <                    queue.put(new Integer(l));
190 <                    s += l;
187 >                    r = LoopHelpers.compute7(r);
188 >                    Integer v = intPool[r & POOL_MASK];
189 >                    int k = v.intValue();
190 >                    queue.put(v);
191 >                    ps += k;
192 >                    if (++j == lag) {
193 >                        j = 0;
194 >                        lagPhaser.arriveAndAwaitAdvance();
195 >                    }
196                  }
197 <                addProducerSum(s);
197 >                addProducerSum(ps);
198                  barrier.await();
199              }
200 <            catch (Exception ie) {
201 <                ie.printStackTrace();
202 <                return;
200 >            catch (Exception ie) {
201 >                ie.printStackTrace();
202 >                return;
203              }
204          }
205      }
206  
207      static class Consumer extends Stage {
208 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
209 <            super(q, b, iters);
208 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
209 >                 int iters, int lag) {
210 >            super(q, b, s, iters, lag);
211          }
212  
213          public void run() {
214              try {
215                  barrier.await();
216 <                int s = 0;
216 >                int cs = 0;
217 >                int j = 0;
218                  for (int i = 0; i < iters; ++i) {
219 <                    s += queue.take().intValue();
219 >                    Integer v = queue.take();
220 >                    int k = v.intValue();
221 >                    cs += k;
222 >                    if (++j == lag) {
223 >                        j = 0;
224 >                        lagPhaser.arriveAndAwaitAdvance();
225 >                    }
226                  }
227 <                addConsumerSum(s);
227 >                addConsumerSum(cs);
228                  barrier.await();
229              }
230 <            catch (Exception ie) {
231 <                ie.printStackTrace();
232 <                return;
230 >            catch (Exception ie) {
231 >                ie.printStackTrace();
232 >                return;
233              }
234          }
235  
236      }
237  
238 <    static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
238 >    static void oneRun(BlockingQueue<Integer> q, int n, int iters) throws Exception {
239 >
240          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
241 <        CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
242 <        for (int i = 0; i < nproducers; ++i) {
243 <            pool.execute(new Producer(q, barrier, iters));
241 >        CyclicBarrier barrier = new CyclicBarrier(n + 2, timer);
242 >        Phaser s = new Phaser(n + 1);
243 >        for (int i = 0; i < n; ++i) {
244 >            pool.execute(new Producer(q, barrier, s, iters, LAG));
245          }
246 <        pool.execute(new Consumer(q, barrier, iters * nproducers));
246 >        pool.execute(new Consumer(q, barrier, s, iters * n, LAG * n));
247          barrier.await();
248          barrier.await();
249          long time = timer.getTime();
250          checkSum();
251          if (print)
252 <            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer");
252 >            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * (n + 1))) + " ns per transfer");
253 >    }
254 >
255 >    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
256 >        LTQasSQ() { super(); }
257 >        public void put(T x) {
258 >            try { super.transfer(x);
259 >            } catch (InterruptedException ex) { throw new Error(); }
260 >        }
261      }
262  
263 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
264 +        int calls;
265 +        HalfSyncLTQ() { super(); }
266 +        public void put(T x) {
267 +            if ((++calls & 1) == 0)
268 +                super.put(x);
269 +            else {
270 +                try { super.transfer(x);
271 +                } catch (InterruptedException ex) {
272 +                    throw new Error();
273 +                }
274 +            }
275 +        }
276 +    }
277   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines