ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java (file contents):
Revision 1.3 by dl, Mon Feb 19 00:46:06 2007 UTC vs.
Revision 1.13 by jsr166, Sat Dec 31 19:50:56 2016 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7 < import java.util.concurrent.*;
7 > import java.util.Random;
8 > import java.util.concurrent.ArrayBlockingQueue;
9 > import java.util.concurrent.BlockingQueue;
10 > import java.util.concurrent.CyclicBarrier;
11 > import java.util.concurrent.ExecutorService;
12 > import java.util.concurrent.Executors;
13 > import java.util.concurrent.LinkedBlockingDeque;
14 > import java.util.concurrent.LinkedBlockingQueue;
15 > import java.util.concurrent.LinkedTransferQueue;
16 > import java.util.concurrent.Phaser;
17 > import java.util.concurrent.PriorityBlockingQueue;
18 > import java.util.concurrent.SynchronousQueue;
19  
20   public class MultipleProducersSingleConsumerLoops {
21 <    static final int CAPACITY =      100;
21 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
22      static final ExecutorService pool = Executors.newCachedThreadPool();
23      static boolean print = false;
24      static int producerSum;
25      static int consumerSum;
15
26      static synchronized void addProducerSum(int x) {
27          producerSum += x;
28      }
# Line 26 | Line 36 | public class MultipleProducersSingleCons
36              throw new Error("CheckSum mismatch");
37      }
38  
39 +    // Number of elements passed around -- must be power of two
40 +    // Elements are reused from pool to minimize alloc impact
41 +    static final int POOL_SIZE = 1 << 8;
42 +    static final int POOL_MASK = POOL_SIZE-1;
43 +    static final Integer[] intPool = new Integer[POOL_SIZE];
44 +    static {
45 +        for (int i = 0; i < POOL_SIZE; ++i)
46 +            intPool[i] = Integer.valueOf(i);
47 +    }
48 +
49 +    // Number of puts by producers or takes by consumers
50 +    static final int ITERS = 1 << 20;
51 +
52 +    // max lag between a producer and consumer to avoid
53 +    // this becoming a GC test rather than queue test.
54 +    static final int LAG = (1 << 12);
55 +    static final int LAG_MASK = LAG - 1;
56 +
57      public static void main(String[] args) throws Exception {
58 <        int maxProducers = 100;
31 <        int iters = 100000;
58 >        int maxn = 12; // NCPUS * 3 / 2;
59  
60 <        if (args.length > 0)
61 <            maxProducers = Integer.parseInt(args[0]);
60 >        if (args.length > 0)
61 >            maxn = Integer.parseInt(args[0]);
62  
63 <        print = false;
37 <        System.out.println("Warmup...");
38 <        oneTest(1, 10000);
39 <        Thread.sleep(100);
40 <        oneTest(2, 10000);
41 <        Thread.sleep(100);
63 >        warmup();
64          print = true;
65 <        
44 <        for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
65 >        for (int k = 1, i = 1; i <= maxn;) {
66              System.out.println("Producers:" + i);
67 <            oneTest(i, iters);
68 <            Thread.sleep(100);
67 >            oneTest(i, ITERS);
68 >            if (i == k) {
69 >                k = i << 1;
70 >                i = i + (i >>> 1);
71 >            }
72 >            else
73 >                i = k;
74          }
75          pool.shutdown();
76 <   }
76 >    }
77 >
78 >    static void warmup() throws Exception {
79 >        print = false;
80 >        System.out.print("Warmup ");
81 >        int it = 2000;
82 >        for (int j = 5; j > 0; --j) {
83 >            oneTest(j, it);
84 >            System.out.print(".");
85 >            it += 1000;
86 >        }
87 >        System.gc();
88 >        it = 20000;
89 >        for (int j = 5; j > 0; --j) {
90 >            oneTest(j, it);
91 >            System.out.print(".");
92 >            it += 10000;
93 >        }
94 >        System.gc();
95 >        System.out.println();
96 >    }
97 >
98 >    static void oneTest(int n, int iters) throws Exception {
99 >        int fairIters = iters/16;
100  
101 <    static void oneTest(int producers, int iters) throws Exception {
101 >        Thread.sleep(100); // System.gc();
102          if (print)
103 <            System.out.print("ArrayBlockingQueue      ");
104 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
103 >            System.out.print("LinkedTransferQueue     ");
104 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
105  
106 +        Thread.sleep(100); // System.gc();
107          if (print)
108              System.out.print("LinkedBlockingQueue     ");
109 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
109 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
110 >
111 >        Thread.sleep(100); // System.gc();
112 >        if (print)
113 >            System.out.print("LinkedBlockingQueue(cap)");
114 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
115 >
116 >        Thread.sleep(100); // System.gc();
117 >        if (print)
118 >            System.out.print("LinkedBlockingDeque     ");
119 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
120  
121 <        // Don't run PBQ since can legitimately run out of memory
122 <        //        if (print)
123 <        //            System.out.print("PriorityBlockingQueue   ");
124 <        //        oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
121 >        Thread.sleep(100); // System.gc();
122 >        if (print)
123 >            System.out.print("ArrayBlockingQueue      ");
124 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
125  
126 +        Thread.sleep(100); // System.gc();
127          if (print)
128              System.out.print("SynchronousQueue        ");
129 <        oneRun(new SynchronousQueue<Integer>(), producers, iters);
129 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
130  
131 +        Thread.sleep(100); // System.gc();
132          if (print)
133              System.out.print("SynchronousQueue(fair)  ");
134 <        oneRun(new SynchronousQueue<Integer>(true), producers, iters);
134 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
135 >
136 >        Thread.sleep(100); // System.gc();
137 >        if (print)
138 >            System.out.print("LinkedTransferQueue(xfer)");
139 >        oneRun(new LTQasSQ<Integer>(), n, iters);
140 >
141 >        Thread.sleep(100); // System.gc();
142 >        if (print)
143 >            System.out.print("LinkedTransferQueue(half)");
144 >        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
145 >
146 >        Thread.sleep(100); // System.gc();
147 >        if (print)
148 >            System.out.print("PriorityBlockingQueue   ");
149 >        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
150  
151 +        Thread.sleep(100); // System.gc();
152          if (print)
153              System.out.print("ArrayBlockingQueue(fair)");
154 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters/10);
154 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
155      }
156 <    
157 <    static abstract class Stage implements Runnable {
156 >
157 >    abstract static class Stage implements Runnable {
158          final int iters;
159          final BlockingQueue<Integer> queue;
160          final CyclicBarrier barrier;
161 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
162 <            queue = q;
161 >        final Phaser lagPhaser;
162 >        final int lag;
163 >        Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
164 >              int iters, int lag) {
165 >            queue = q;
166              barrier = b;
167 +            lagPhaser = s;
168              this.iters = iters;
169 +            this.lag = lag;
170          }
171      }
172  
173      static class Producer extends Stage {
174 <        Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
175 <            super(q, b, iters);
174 >        Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
175 >                 int iters, int lag) {
176 >            super(q, b, s, iters, lag);
177          }
178  
179          public void run() {
180              try {
181                  barrier.await();
182 <                int s = 0;
183 <                int l = hashCode();
182 >                int ps = 0;
183 >                int r = hashCode();
184 >                int j = 0;
185                  for (int i = 0; i < iters; ++i) {
186 <                    l = LoopHelpers.compute1(l);
187 <                    l = LoopHelpers.compute2(l);
188 <                    queue.put(new Integer(l));
189 <                    s += l;
186 >                    r = LoopHelpers.compute7(r);
187 >                    Integer v = intPool[r & POOL_MASK];
188 >                    int k = v.intValue();
189 >                    queue.put(v);
190 >                    ps += k;
191 >                    if (++j == lag) {
192 >                        j = 0;
193 >                        lagPhaser.arriveAndAwaitAdvance();
194 >                    }
195                  }
196 <                addProducerSum(s);
196 >                addProducerSum(ps);
197                  barrier.await();
198              }
199 <            catch (Exception ie) {
200 <                ie.printStackTrace();
201 <                return;
199 >            catch (Exception ie) {
200 >                ie.printStackTrace();
201 >                return;
202              }
203          }
204      }
205  
206      static class Consumer extends Stage {
207 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
208 <            super(q, b, iters);
207 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
208 >                 int iters, int lag) {
209 >            super(q, b, s, iters, lag);
210          }
211  
212          public void run() {
213              try {
214                  barrier.await();
215 <                int s = 0;
215 >                int cs = 0;
216 >                int j = 0;
217                  for (int i = 0; i < iters; ++i) {
218 <                    s += queue.take().intValue();
218 >                    Integer v = queue.take();
219 >                    int k = v.intValue();
220 >                    cs += k;
221 >                    if (++j == lag) {
222 >                        j = 0;
223 >                        lagPhaser.arriveAndAwaitAdvance();
224 >                    }
225                  }
226 <                addConsumerSum(s);
226 >                addConsumerSum(cs);
227                  barrier.await();
228              }
229 <            catch (Exception ie) {
230 <                ie.printStackTrace();
231 <                return;
229 >            catch (Exception ie) {
230 >                ie.printStackTrace();
231 >                return;
232              }
233          }
234  
235      }
236  
237 <    static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
237 >    static void oneRun(BlockingQueue<Integer> q, int n, int iters) throws Exception {
238 >
239          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
240 <        CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
241 <        for (int i = 0; i < nproducers; ++i) {
242 <            pool.execute(new Producer(q, barrier, iters));
240 >        CyclicBarrier barrier = new CyclicBarrier(n + 2, timer);
241 >        Phaser s = new Phaser(n + 1);
242 >        for (int i = 0; i < n; ++i) {
243 >            pool.execute(new Producer(q, barrier, s, iters, LAG));
244          }
245 <        pool.execute(new Consumer(q, barrier, iters * nproducers));
245 >        pool.execute(new Consumer(q, barrier, s, iters * n, LAG * n));
246          barrier.await();
247          barrier.await();
248          long time = timer.getTime();
249          checkSum();
250          if (print)
251 <            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer");
251 >            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * (n + 1))) + " ns per transfer");
252 >    }
253 >
254 >    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
255 >        LTQasSQ() { super(); }
256 >        public void put(T x) {
257 >            try { super.transfer(x);
258 >            } catch (InterruptedException ex) { throw new Error(); }
259 >        }
260      }
261  
262 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
263 +        int calls;
264 +        HalfSyncLTQ() { super(); }
265 +        public void put(T x) {
266 +            if ((++calls & 1) == 0)
267 +                super.put(x);
268 +            else {
269 +                try { super.transfer(x);
270 +                } catch (InterruptedException ex) {
271 +                    throw new Error();
272 +                }
273 +            }
274 +        }
275 +    }
276   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines