ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/TimeoutProducerConsumerLoops.java (file contents):
Revision 1.1 by dl, Mon May 2 19:19:38 2005 UTC vs.
Revision 1.12 by jsr166, Thu Jan 15 18:34:19 2015 UTC

# Line 1 | Line 1
1   /*
2 * @test
3 * @synopsis  multiple producers and consumers using timeouts in blocking queues
4 */
5 /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3 < * Expert Group and released to the public domain. Use, modify, and
4 < * redistribute this code in any way without acknowledgement.
3 > * Expert Group and released to the public domain, as explained at
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   import java.util.concurrent.*;
8  
9   public class TimeoutProducerConsumerLoops {
10 <    static final int CAPACITY =      100;
10 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
11      static final ExecutorService pool = Executors.newCachedThreadPool();
12 +
13 +    // Number of elements passed around -- must be power of two
14 +    // Elements are reused from pool to minimize alloc impact
15 +    static final int POOL_SIZE = 1 << 8;
16 +    static final int POOL_MASK = POOL_SIZE-1;
17 +    static final Integer[] intPool = new Integer[POOL_SIZE];
18 +    static {
19 +        for (int i = 0; i < POOL_SIZE; ++i)
20 +            intPool[i] = Integer.valueOf(i);
21 +    }
22 +
23 +    // max lag between a producer and consumer to avoid
24 +    // this becoming a GC test rather than queue test.
25 +    // Used only per-pair to lessen impact on queue sync
26 +    static final int LAG_MASK = (1 << 12) - 1;
27 +
28      static boolean print = false;
29      static int producerSum;
30      static int consumerSum;
# Line 31 | Line 43 | public class TimeoutProducerConsumerLoop
43      }
44  
45      public static void main(String[] args) throws Exception {
46 <        int maxPairs = 100;
47 <        int iters = 100000;
46 >        int maxPairs = NCPUS * 3 / 2;
47 >        int iters = 1000000;
48  
49 <        if (args.length > 0)
49 >        if (args.length > 0)
50              maxPairs = Integer.parseInt(args[0]);
51  
40        print = false;
41        System.out.println("Warmup...");
42        oneTest(1, 10000);
43        Thread.sleep(100);
44        oneTest(2, 10000);
45        Thread.sleep(100);
46        oneTest(2, 10000);
47        Thread.sleep(100);
52          print = true;
53 <        
50 <        int k = 1;
51 <        for (int i = 1; i <= maxPairs;) {
53 >        for (int k = 1, i = 1; i <= maxPairs;) {
54              System.out.println("Pairs:" + i);
55              oneTest(i, iters);
56              Thread.sleep(100);
57              if (i == k) {
58                  k = i << 1;
59                  i = i + (i >>> 1);
60 <            }
61 <            else
60 >            }
61 >            else
62                  i = k;
63          }
64          pool.shutdown();
65 <   }
65 >    }
66  
67 <    static void oneTest(int pairs, int iters) throws Exception {
66 <        int fairIters = iters/20;
67 >    static void oneTest(int n, int iters) throws Exception {
68          if (print)
69 <            System.out.print("ArrayBlockingQueue      ");
70 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
69 >            System.out.print("LinkedTransferQueue      ");
70 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
71  
72          if (print)
73 <            System.out.print("LinkedBlockingQueue     ");
74 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
73 >            System.out.print("LinkedTransferQueue(xfer)");
74 >        oneRun(new LTQasSQ<Integer>(), n, iters);
75  
76          if (print)
77 <            System.out.print("LinkedBlockingDeque     ");
78 <        oneRun(new LinkedBlockingDeque<Integer>(CAPACITY), pairs, iters);
77 >            System.out.print("LinkedBlockingQueue      ");
78 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
79  
80          if (print)
81 <            System.out.print("SynchronousQueue        ");
82 <        oneRun(new SynchronousQueue<Integer>(), pairs, iters);
81 >            System.out.print("LinkedBlockingQueue(cap) ");
82 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
83  
84          if (print)
85 <            System.out.print("SynchronousQueue(fair)  ");
86 <        oneRun(new SynchronousQueue<Integer>(true), pairs, fairIters);
85 >            System.out.print("ArrayBlockingQueue(cap)  ");
86 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
87  
88          if (print)
89 <            System.out.print("PriorityBlockingQueue   ");
90 <        oneRun(new PriorityBlockingQueue<Integer>(), pairs, fairIters);
89 >            System.out.print("LinkedBlockingDeque      ");
90 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
91  
92          if (print)
93 <            System.out.print("ArrayBlockingQueue(fair)");
94 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), pairs, fairIters);
93 >            System.out.print("SynchronousQueue         ");
94 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
95 >
96 >        if (print)
97 >            System.out.print("SynchronousQueue(fair)   ");
98 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
99 >
100 >        if (print)
101 >            System.out.print("PriorityBlockingQueue    ");
102 >        oneRun(new PriorityBlockingQueue<Integer>(), n, iters / 16);
103 >
104 >        if (print)
105 >            System.out.print("ArrayBlockingQueue(fair) ");
106 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, iters/16);
107  
108      }
109 <    
110 <    static abstract class Stage implements Runnable {
109 >
110 >    abstract static class Stage implements Runnable {
111          final int iters;
112          final BlockingQueue<Integer> queue;
113          final CyclicBarrier barrier;
114 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
115 <            queue = q;
114 >        final Phaser lagPhaser;
115 >        Stage(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
116 >            queue = q;
117              barrier = b;
118 +            lagPhaser = s;
119              this.iters = iters;
120          }
121      }
122  
123      static class Producer extends Stage {
124 <        Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
125 <            super(q, b, iters);
124 >        Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
125 >            super(q, b, s, iters);
126          }
127  
128          public void run() {
# Line 116 | Line 131 | public class TimeoutProducerConsumerLoop
131                  int s = 0;
132                  int l = hashCode();
133                  int i = 0;
134 <                long timeout = 1;
134 >                long timeout = 1000;
135                  while (i < iters) {
136                      l = LoopHelpers.compute4(l);
137 <                    if (queue.offer(new Integer(l),
138 <                                    timeout, TimeUnit.NANOSECONDS)) {
139 <                        s += LoopHelpers.compute4(l);
137 >                    Integer v = intPool[l & POOL_MASK];
138 >                    if (queue.offer(v, timeout, TimeUnit.NANOSECONDS)) {
139 >                        s += LoopHelpers.compute4(v.intValue());
140                          ++i;
141                          if (timeout > 1)
142                              timeout--;
143 +                        if ((i & LAG_MASK) == LAG_MASK)
144 +                            lagPhaser.arriveAndAwaitAdvance();
145                      }
146                      else
147                          timeout++;
# Line 132 | Line 149 | public class TimeoutProducerConsumerLoop
149                  addProducerSum(s);
150                  barrier.await();
151              }
152 <            catch (Exception ie) {
153 <                ie.printStackTrace();
154 <                return;
152 >            catch (Exception ie) {
153 >                ie.printStackTrace();
154 >                return;
155              }
156          }
157      }
158  
159      static class Consumer extends Stage {
160 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
161 <            super(q, b, iters);
160 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
161 >            super(q, b, s, iters);
162          }
163  
164          public void run() {
# Line 150 | Line 167 | public class TimeoutProducerConsumerLoop
167                  int l = 0;
168                  int s = 0;
169                  int i = 0;
170 <                long timeout = 1;
170 >                long timeout = 1000;
171                  while (i < iters) {
172 <                    Integer e = queue.poll(timeout,
172 >                    Integer e = queue.poll(timeout,
173                                             TimeUnit.NANOSECONDS);
174                      if (e != null) {
175                          l = LoopHelpers.compute4(e.intValue());
# Line 160 | Line 177 | public class TimeoutProducerConsumerLoop
177                          ++i;
178                          if (timeout > 1)
179                              --timeout;
180 +                        if ((i & LAG_MASK) == LAG_MASK)
181 +                            lagPhaser.arriveAndAwaitAdvance();
182                      }
183                      else
184                          ++timeout;
# Line 167 | Line 186 | public class TimeoutProducerConsumerLoop
186                  addConsumerSum(s);
187                  barrier.await();
188              }
189 <            catch (Exception ie) {
190 <                ie.printStackTrace();
191 <                return;
189 >            catch (Exception ie) {
190 >                ie.printStackTrace();
191 >                return;
192              }
193          }
194  
# Line 179 | Line 198 | public class TimeoutProducerConsumerLoop
198          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
199          CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
200          for (int i = 0; i < npairs; ++i) {
201 <            pool.execute(new Producer(q, barrier, iters));
202 <            pool.execute(new Consumer(q, barrier, iters));
201 >            Phaser s = new Phaser(2);
202 >            pool.execute(new Producer(q, barrier, s, iters));
203 >            pool.execute(new Consumer(q, barrier, s, iters));
204          }
205          barrier.await();
206          barrier.await();
207          long time = timer.getTime();
208          checkSum();
209 +        q.clear();
210          if (print)
211              System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * npairs)) + " ns per transfer");
212      }
213  
214 +    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
215 +        LTQasSQ() { super(); }
216 +        public void put(T x) {
217 +            try { super.transfer(x);
218 +            } catch (InterruptedException ex) { throw new Error(); }
219 +        }
220 +
221 +        public boolean offer(T x, long timeout, TimeUnit unit) {
222 +            return super.offer(x, timeout, unit);
223 +        }
224 +
225 +    }
226 +
227   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines