ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/TimeoutProducerConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/TimeoutProducerConsumerLoops.java (file contents):
Revision 1.4 by jsr166, Thu Oct 29 23:09:08 2009 UTC vs.
Revision 1.5 by dl, Sat Nov 14 20:58:11 2009 UTC

# Line 5 | Line 5
5   */
6  
7   import java.util.concurrent.*;
8 < //import jsr166y.*;
8 >
9  
10   public class TimeoutProducerConsumerLoops {
11      static final int NCPUS = Runtime.getRuntime().availableProcessors();
# Line 17 | Line 17 | public class TimeoutProducerConsumerLoop
17      static final int POOL_MASK = POOL_SIZE-1;
18      static final Integer[] intPool = new Integer[POOL_SIZE];
19      static {
20 <        for (int i = 0; i < POOL_SIZE; ++i)
20 >        for (int i = 0; i < POOL_SIZE; ++i)
21              intPool[i] = Integer.valueOf(i);
22      }
23  
24 +    // max lag between a producer and consumer to avoid
25 +    // this becoming a GC test rather than queue test.
26 +    // Used only per-pair to lessen impact on queue sync
27 +    static final int LAG_MASK = (1 << 12) - 1;
28  
29      static boolean print = false;
30      static int producerSum;
# Line 43 | Line 47 | public class TimeoutProducerConsumerLoop
47          int maxPairs = NCPUS * 3 / 2;
48          int iters = 1000000;
49  
50 <        if (args.length > 0)
50 >        if (args.length > 0)
51              maxPairs = Integer.parseInt(args[0]);
52  
53          print = true;
# Line 55 | Line 59 | public class TimeoutProducerConsumerLoop
59              if (i == k) {
60                  k = i << 1;
61                  i = i + (i >>> 1);
62 <            }
63 <            else
62 >            }
63 >            else
64                  i = k;
65          }
66          pool.shutdown();
# Line 104 | Line 108 | public class TimeoutProducerConsumerLoop
108          oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, iters/16);
109  
110      }
111 <
111 >    
112      static abstract class Stage implements Runnable {
113          final int iters;
114          final BlockingQueue<Integer> queue;
115          final CyclicBarrier barrier;
116 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
117 <            queue = q;
116 >        final Phaser lagPhaser;
117 >        Stage (BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
118 >            queue = q;
119              barrier = b;
120 +            lagPhaser = s;
121              this.iters = iters;
122          }
123      }
124  
125      static class Producer extends Stage {
126 <        Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
127 <            super(q, b, iters);
126 >        Producer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
127 >            super(q, b, s, iters);
128          }
129  
130          public void run() {
# Line 136 | Line 142 | public class TimeoutProducerConsumerLoop
142                          ++i;
143                          if (timeout > 1)
144                              timeout--;
145 +                        if ((i & LAG_MASK) == LAG_MASK)
146 +                            lagPhaser.arriveAndAwaitAdvance();
147                      }
148                      else
149                          timeout++;
# Line 143 | Line 151 | public class TimeoutProducerConsumerLoop
151                  addProducerSum(s);
152                  barrier.await();
153              }
154 <            catch (Exception ie) {
155 <                ie.printStackTrace();
156 <                return;
154 >            catch (Exception ie) {
155 >                ie.printStackTrace();
156 >                return;
157              }
158          }
159      }
160  
161      static class Consumer extends Stage {
162 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
163 <            super(q, b, iters);
162 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, Phaser s, int iters) {
163 >            super(q, b, s, iters);
164          }
165  
166          public void run() {
# Line 163 | Line 171 | public class TimeoutProducerConsumerLoop
171                  int i = 0;
172                  long timeout = 1000;
173                  while (i < iters) {
174 <                    Integer e = queue.poll(timeout,
174 >                    Integer e = queue.poll(timeout,
175                                             TimeUnit.NANOSECONDS);
176                      if (e != null) {
177                          l = LoopHelpers.compute4(e.intValue());
# Line 171 | Line 179 | public class TimeoutProducerConsumerLoop
179                          ++i;
180                          if (timeout > 1)
181                              --timeout;
182 +                        if ((i & LAG_MASK) == LAG_MASK)
183 +                            lagPhaser.arriveAndAwaitAdvance();
184                      }
185                      else
186                          ++timeout;
# Line 178 | Line 188 | public class TimeoutProducerConsumerLoop
188                  addConsumerSum(s);
189                  barrier.await();
190              }
191 <            catch (Exception ie) {
192 <                ie.printStackTrace();
193 <                return;
191 >            catch (Exception ie) {
192 >                ie.printStackTrace();
193 >                return;
194              }
195          }
196  
# Line 190 | Line 200 | public class TimeoutProducerConsumerLoop
200          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
201          CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
202          for (int i = 0; i < npairs; ++i) {
203 <            pool.execute(new Producer(q, barrier, iters));
204 <            pool.execute(new Consumer(q, barrier, iters));
203 >            Phaser s = new Phaser(2);
204 >            pool.execute(new Producer(q, barrier, s, iters));
205 >            pool.execute(new Consumer(q, barrier, s, iters));
206          }
207          barrier.await();
208          barrier.await();
# Line 205 | Line 216 | public class TimeoutProducerConsumerLoop
216      static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
217          LTQasSQ() { super(); }
218          public void put(T x) {
219 <            try { super.transfer(x);
219 >            try { super.transfer(x);
220              } catch (InterruptedException ex) { throw new Error(); }
221          }
222  
223          public boolean offer(T x, long timeout, TimeUnit unit) {
224 <            return super.offer(x, timeout, unit);
224 >            return super.offer(x, timeout, unit);
225          }
226  
227      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines