ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/MultipleProducersSingleConsumerLoops.java (file contents):
Revision 1.3 by dl, Mon Feb 19 00:46:06 2007 UTC vs.
Revision 1.4 by dl, Fri Oct 23 19:57:06 2009 UTC

# Line 4 | Line 4
4   * http://creativecommons.org/licenses/publicdomain
5   */
6  
7 + import java.util.*;
8   import java.util.concurrent.*;
9 + //import jsr166y.*;
10  
11   public class MultipleProducersSingleConsumerLoops {
12 <    static final int CAPACITY =      100;
12 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
13 >    static final Random rng = new Random();
14      static final ExecutorService pool = Executors.newCachedThreadPool();
15      static boolean print = false;
16      static int producerSum;
17      static int consumerSum;
15
18      static synchronized void addProducerSum(int x) {
19          producerSum += x;
20      }
# Line 26 | Line 28 | public class MultipleProducersSingleCons
28              throw new Error("CheckSum mismatch");
29      }
30  
31 +    // Number of elements passed around -- must be power of two
32 +    // Elements are reused from pool to minimize alloc impact
33 +    static final int POOL_SIZE = 1 << 8;
34 +    static final int POOL_MASK = POOL_SIZE-1;
35 +    static final Integer[] intPool = new Integer[POOL_SIZE];
36 +    static {
37 +        for (int i = 0; i < POOL_SIZE; ++i)
38 +            intPool[i] = Integer.valueOf(i);
39 +    }
40 +
41 +    // Number of puts by producers or takes by consumers
42 +    static final int ITERS = 1 << 20;
43 +
44 +    // max lag between a producer and consumer to avoid
45 +    // this becoming a GC test rather than queue test.
46 +    static final int LAG = (1 << 12);
47 +    static final int LAG_MASK = LAG - 1;
48 +
49      public static void main(String[] args) throws Exception {
50 <        int maxProducers = 100;
31 <        int iters = 100000;
50 >        int maxn = 12; // NCPUS * 3 / 2;
51  
52          if (args.length > 0)
53 <            maxProducers = Integer.parseInt(args[0]);
53 >            maxn = Integer.parseInt(args[0]);
54  
55 <        print = false;
37 <        System.out.println("Warmup...");
38 <        oneTest(1, 10000);
39 <        Thread.sleep(100);
40 <        oneTest(2, 10000);
41 <        Thread.sleep(100);
55 >        warmup();
56          print = true;
57 <        
58 <        for (int i = 1; i <= maxProducers; i += (i+1) >>> 1) {
57 >        int k = 1;
58 >        for (int i = 1; i <= maxn;) {
59              System.out.println("Producers:" + i);
60 <            oneTest(i, iters);
61 <            Thread.sleep(100);
60 >            oneTest(i, ITERS);
61 >            if (i == k) {
62 >                k = i << 1;
63 >                i = i + (i >>> 1);
64 >            }
65 >            else
66 >                i = k;
67          }
68          pool.shutdown();
69 <   }
69 >    }
70 >
71 >    static void warmup() throws Exception {
72 >        print = false;
73 >        System.out.print("Warmup ");
74 >        int it = 2000;
75 >        for (int j = 5; j > 0; --j) {
76 >            oneTest(j, it);
77 >            System.out.print(".");
78 >            it += 1000;
79 >        }
80 >        System.gc();
81 >        it = 20000;
82 >        for (int j = 5; j > 0; --j) {
83 >            oneTest(j, it);
84 >            System.out.print(".");
85 >            it += 10000;
86 >        }
87 >        System.gc();
88 >        System.out.println();
89 >    }
90  
91 <    static void oneTest(int producers, int iters) throws Exception {
91 >    static void oneTest(int n, int iters) throws Exception {
92 >        int fairIters = iters/16;
93 >
94 >        Thread.sleep(100); // System.gc();
95          if (print)
96 <            System.out.print("ArrayBlockingQueue      ");
97 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), producers, iters);
96 >            System.out.print("LinkedTransferQueue     ");
97 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
98  
99 +        Thread.sleep(100); // System.gc();
100          if (print)
101              System.out.print("LinkedBlockingQueue     ");
102 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), producers, iters);
102 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
103  
104 <        // Don't run PBQ since can legitimately run out of memory
105 <        //        if (print)
106 <        //            System.out.print("PriorityBlockingQueue   ");
107 <        //        oneRun(new PriorityBlockingQueue<Integer>(), producers, iters);
104 >        Thread.sleep(100); // System.gc();
105 >        if (print)
106 >            System.out.print("LinkedBlockingQueue(cap)");
107 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
108 >
109 >        Thread.sleep(100); // System.gc();
110 >        if (print)
111 >            System.out.print("LinkedBlockingDeque     ");
112 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
113 >        
114 >        Thread.sleep(100); // System.gc();
115 >        if (print)
116 >            System.out.print("ArrayBlockingQueue      ");
117 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
118  
119 +        Thread.sleep(100); // System.gc();
120          if (print)
121              System.out.print("SynchronousQueue        ");
122 <        oneRun(new SynchronousQueue<Integer>(), producers, iters);
122 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
123  
124 +        
125 +        Thread.sleep(100); // System.gc();
126          if (print)
127              System.out.print("SynchronousQueue(fair)  ");
128 <        oneRun(new SynchronousQueue<Integer>(true), producers, iters);
128 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
129 >
130 >        Thread.sleep(100); // System.gc();
131 >        if (print)
132 >            System.out.print("LinkedTransferQueue(xfer)");
133 >        oneRun(new LTQasSQ<Integer>(), n, iters);
134 >
135 >        Thread.sleep(100); // System.gc();
136 >        if (print)
137 >            System.out.print("LinkedTransferQueue(half)");
138 >        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
139 >        
140 >        Thread.sleep(100); // System.gc();
141 >        if (print)
142 >            System.out.print("PriorityBlockingQueue   ");
143 >        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
144  
145 +        Thread.sleep(100); // System.gc();
146          if (print)
147              System.out.print("ArrayBlockingQueue(fair)");
148 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), producers, iters/10);
148 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
149 >
150 >
151      }
152      
153      static abstract class Stage implements Runnable {
154          final int iters;
155          final BlockingQueue<Integer> queue;
156          final CyclicBarrier barrier;
157 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
157 >        final Phaser lagPhaser;
158 >        final int lag;
159 >        Stage (BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
160 >               int iters, int lag) {
161              queue = q;
162              barrier = b;
163 +            lagPhaser = s;
164              this.iters = iters;
165 +            this.lag = lag;
166          }
167      }
168  
169      static class Producer extends Stage {
170 <        Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
171 <            super(q, b, iters);
170 >        Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
171 >                 int iters, int lag) {
172 >            super(q, b, s, iters, lag);
173          }
174  
175          public void run() {
176              try {
177                  barrier.await();
178 <                int s = 0;
179 <                int l = hashCode();
178 >                int ps = 0;
179 >                int r = hashCode();
180 >                int j = 0;
181                  for (int i = 0; i < iters; ++i) {
182 <                    l = LoopHelpers.compute1(l);
183 <                    l = LoopHelpers.compute2(l);
184 <                    queue.put(new Integer(l));
185 <                    s += l;
182 >                    r = LoopHelpers.compute7(r);
183 >                    Integer v = intPool[r & POOL_MASK];
184 >                    int k = v.intValue();
185 >                    queue.put(v);
186 >                    ps += k;
187 >                    if (++j == lag) {
188 >                        j = 0;
189 >                        lagPhaser.arriveAndAwaitAdvance();
190 >                    }
191                  }
192 <                addProducerSum(s);
192 >                addProducerSum(ps);
193                  barrier.await();
194              }
195              catch (Exception ie) {
# Line 114 | Line 200 | public class MultipleProducersSingleCons
200      }
201  
202      static class Consumer extends Stage {
203 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
204 <            super(q, b, iters);
203 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s,
204 >                 int iters, int lag) {
205 >            super(q, b, s, iters, lag);
206          }
207  
208          public void run() {
209              try {
210                  barrier.await();
211 <                int s = 0;
211 >                int cs = 0;
212 >                int j = 0;
213                  for (int i = 0; i < iters; ++i) {
214 <                    s += queue.take().intValue();
214 >                    Integer v = queue.take();
215 >                    int k = v.intValue();
216 >                    cs += k;
217 >                    if (++j == lag) {
218 >                        j = 0;
219 >                        lagPhaser.arriveAndAwaitAdvance();
220 >                    }
221                  }
222 <                addConsumerSum(s);
222 >                addConsumerSum(cs);
223                  barrier.await();
224              }
225              catch (Exception ie) {
# Line 136 | Line 230 | public class MultipleProducersSingleCons
230  
231      }
232  
233 <    static void oneRun(BlockingQueue<Integer> q, int nproducers, int iters) throws Exception {
233 >
234 >    static void oneRun(BlockingQueue<Integer> q, int n, int iters) throws Exception {
235 >
236          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
237 <        CyclicBarrier barrier = new CyclicBarrier(nproducers + 2, timer);
238 <        for (int i = 0; i < nproducers; ++i) {
239 <            pool.execute(new Producer(q, barrier, iters));
237 >        CyclicBarrier barrier = new CyclicBarrier(n + 2, timer);
238 >        Phaser s = new Phaser(n + 1);
239 >        for (int i = 0; i < n; ++i) {
240 >            pool.execute(new Producer(q, barrier, s, iters, LAG));
241          }
242 <        pool.execute(new Consumer(q, barrier, iters * nproducers));
242 >        pool.execute(new Consumer(q, barrier, s, iters * n, LAG * n));
243          barrier.await();
244          barrier.await();
245          long time = timer.getTime();
246          checkSum();
247          if (print)
248 <            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nproducers)) + " ns per transfer");
248 >            System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * (n + 1))) + " ns per transfer");
249 >    }
250 >    
251 >    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
252 >        LTQasSQ() { super(); }
253 >        public void put(T x) {
254 >            try { super.transfer(x);
255 >            } catch (InterruptedException ex) { throw new Error(); }
256 >        }
257      }
258  
259 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
260 +        int calls;
261 +        HalfSyncLTQ() { super(); }
262 +        public void put(T x) {
263 +            if ((++calls & 1) == 0)
264 +                super.put(x);
265 +            else {
266 +                try { super.transfer(x);
267 +                } catch (InterruptedException ex) {
268 +                    throw new Error();
269 +                }
270 +            }
271 +        }
272 +    }
273   }
274 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines