ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java (file contents):
Revision 1.2 by dl, Mon Feb 19 00:46:06 2007 UTC vs.
Revision 1.3 by dl, Fri Oct 23 19:57:07 2009 UTC

# Line 5 | Line 5
5   */
6  
7   import java.util.concurrent.*;
8 + //import jsr166y.*;
9  
10   public class SingleProducerMultipleConsumerLoops {
11 <    static final int CAPACITY =      100;
11 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
12 >
13 >    // Number of puts by producers or takes by consumers
14 >    static final int ITERS = 1 << 20;
15  
16      static final ExecutorService pool = Executors.newCachedThreadPool();
17      static boolean print = false;
18  
19 +    // Number of elements passed around -- must be power of two
20 +    // Elements are reused from pool to minimize alloc impact
21 +    static final int POOL_SIZE = 1 << 8;
22 +    static final int POOL_MASK = POOL_SIZE-1;
23 +    static final Integer[] intPool = new Integer[POOL_SIZE];
24 +    static {
25 +        for (int i = 0; i < POOL_SIZE; ++i)
26 +            intPool[i] = Integer.valueOf(i);
27 +    }
28 +
29      public static void main(String[] args) throws Exception {
30 <        int maxConsumers = 100;
17 <        int iters = 10000;
30 >        int maxn = 12;
31  
32          if (args.length > 0)
33 <            maxConsumers = Integer.parseInt(args[0]);
33 >            maxn = Integer.parseInt(args[0]);
34  
35          print = false;
36 <        System.out.println("Warmup...");
24 <        oneTest(1, 10000);
25 <        Thread.sleep(100);
26 <        oneTest(2, 10000);
27 <        Thread.sleep(100);
36 >        warmup();
37          print = true;
38 <        
39 <        for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
38 >
39 >        int k = 1;
40 >        for (int i = 1; i <= maxn;) {
41              System.out.println("Consumers:" + i);
42 <            oneTest(i, iters);
43 <            Thread.sleep(100);
42 >            oneTest(i, ITERS);
43 >            if (i == k) {
44 >                k = i << 1;
45 >                i = i + (i >>> 1);
46 >            }
47 >            else
48 >                i = k;
49          }
50 +        
51          pool.shutdown();
52     }
53  
54 <    static void oneTest(int consumers, int iters) throws Exception {
54 >    static void warmup() throws Exception {
55 >        print = false;
56 >        System.out.print("Warmup ");
57 >        int it = 2000;
58 >        for (int j = 5; j > 0; --j) {
59 >            oneTest(j, it);
60 >            System.out.print(".");
61 >            it += 1000;
62 >        }
63 >        System.gc();
64 >        it = 20000;
65 >        for (int j = 5; j > 0; --j) {
66 >            oneTest(j, it);
67 >            System.out.print(".");
68 >            it += 10000;
69 >        }
70 >        System.gc();
71 >        System.out.println();
72 >    }
73 >
74 >    static void oneTest(int n, int iters) throws Exception {
75 >        int fairIters = iters/16;
76 >
77 >        Thread.sleep(100); // System.gc();
78          if (print)
79 <            System.out.print("ArrayBlockingQueue      ");
80 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
79 >            System.out.print("LinkedTransferQueue     ");
80 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
81  
82 +        Thread.sleep(100); // System.gc();
83          if (print)
84              System.out.print("LinkedBlockingQueue     ");
85 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
85 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
86  
87 +        Thread.sleep(100); // System.gc();
88          if (print)
89 <            System.out.print("PriorityBlockingQueue   ");
90 <        oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters/10);
89 >            System.out.print("LinkedBlockingQueue(cap)");
90 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
91  
92 +        Thread.sleep(100); // System.gc();
93 +        if (print)
94 +            System.out.print("LinkedBlockingDeque     ");
95 +        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
96 +        
97 +        Thread.sleep(100); // System.gc();
98 +        if (print)
99 +            System.out.print("ArrayBlockingQueue      ");
100 +        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
101 +
102 +        Thread.sleep(100); // System.gc();
103          if (print)
104              System.out.print("SynchronousQueue        ");
105 <        oneRun(new SynchronousQueue<Integer>(), consumers, iters);
105 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
106 >        
107 >        Thread.sleep(100); // System.gc();
108 >        if (print)
109 >            System.out.print("SynchronousQueue(fair)  ");
110 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
111 >
112 >        Thread.sleep(100); // System.gc();
113 >        if (print)
114 >            System.out.print("LinkedTransferQueue(xfer)");
115 >        oneRun(new LTQasSQ<Integer>(), n, iters);
116  
117 +        Thread.sleep(100); // System.gc();
118 +        if (print)
119 +            System.out.print("LinkedTransferQueue(half)");
120 +        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
121 +        
122 +        Thread.sleep(100); // System.gc();
123 +        if (print)
124 +            System.out.print("PriorityBlockingQueue   ");
125 +        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
126 +
127 +        Thread.sleep(100); // System.gc();
128          if (print)
129              System.out.print("ArrayBlockingQueue(fair)");
130 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters/10);
130 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
131 >
132      }
133      
134      static abstract class Stage implements Runnable {
# Line 77 | Line 151 | public class SingleProducerMultipleConsu
151          public void run() {
152              try {
153                  barrier.await();
154 +                int r = hashCode();
155                  for (int i = 0; i < iters; ++i) {
156 <                    queue.put(new Integer(i));
156 >                    r = LoopHelpers.compute7(r);
157 >                    Integer v = intPool[r & POOL_MASK];
158 >                    queue.put(v);
159                  }
160                  barrier.await();
161                  result = 432;
# Line 93 | Line 170 | public class SingleProducerMultipleConsu
170      static class Consumer extends Stage {
171          Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
172              super(q, b, iters);
173 +
174          }
175  
176          public void run() {
# Line 100 | Line 178 | public class SingleProducerMultipleConsu
178                  barrier.await();
179                  int l = 0;
180                  int s = 0;
103                int last = -1;
181                  for (int i = 0; i < iters; ++i) {
182                      Integer item = queue.take();
183 <                    int v = item.intValue();
107 <                    if (v < last)
108 <                        throw new Error("Out-of-Order transfer");
109 <                    last = v;
110 <                    l = LoopHelpers.compute1(v);
111 <                    s += l;
183 >                    s += item.intValue();
184                  }
185                  barrier.await();
186                  result = s;
187 +                if (s == 0) System.out.print(" ");
188              }
189              catch (Exception ie) {
190                  ie.printStackTrace();
# Line 135 | Line 208 | public class SingleProducerMultipleConsu
208              System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
209      }
210  
211 +    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
212 +        LTQasSQ() { super(); }
213 +        public void put(T x) {
214 +            try { super.transfer(x);
215 +            } catch (InterruptedException ex) { throw new Error(); }
216 +        }
217 +    }
218 +
219 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
220 +        int calls;
221 +        HalfSyncLTQ() { super(); }
222 +        public void put(T x) {
223 +            if ((++calls & 1) == 0)
224 +                super.put(x);
225 +            else {
226 +                try { super.transfer(x);
227 +                } catch (InterruptedException ex) {
228 +                    throw new Error();
229 +                }
230 +            }
231 +        }
232 +    }
233 +
234   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines