ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java (file contents):
Revision 1.2 by dl, Mon Feb 19 00:46:06 2007 UTC vs.
Revision 1.10 by jsr166, Mon Aug 10 03:13:33 2015 UTC

# Line 1 | Line 1
1   /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3   * Expert Group and released to the public domain, as explained at
4 < * http://creativecommons.org/licenses/publicdomain
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7   import java.util.concurrent.*;
8 + //import jsr166y.*;
9  
10   public class SingleProducerMultipleConsumerLoops {
11 <    static final int CAPACITY =      100;
11 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
12 >
13 >    // Number of puts by producers or takes by consumers
14 >    static final int ITERS = 1 << 20;
15  
16      static final ExecutorService pool = Executors.newCachedThreadPool();
17      static boolean print = false;
18  
19 +    // Number of elements passed around -- must be power of two
20 +    // Elements are reused from pool to minimize alloc impact
21 +    static final int POOL_SIZE = 1 << 8;
22 +    static final int POOL_MASK = POOL_SIZE-1;
23 +    static final Integer[] intPool = new Integer[POOL_SIZE];
24 +    static {
25 +        for (int i = 0; i < POOL_SIZE; ++i)
26 +            intPool[i] = Integer.valueOf(i);
27 +    }
28 +
29      public static void main(String[] args) throws Exception {
30 <        int maxConsumers = 100;
17 <        int iters = 10000;
30 >        int maxn = 12;
31  
32 <        if (args.length > 0)
33 <            maxConsumers = Integer.parseInt(args[0]);
32 >        if (args.length > 0)
33 >            maxn = Integer.parseInt(args[0]);
34  
35          print = false;
36 <        System.out.println("Warmup...");
24 <        oneTest(1, 10000);
25 <        Thread.sleep(100);
26 <        oneTest(2, 10000);
27 <        Thread.sleep(100);
36 >        warmup();
37          print = true;
38 <        
39 <        for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
38 >
39 >        for (int k = 1, i = 1; i <= maxn;) {
40              System.out.println("Consumers:" + i);
41 <            oneTest(i, iters);
42 <            Thread.sleep(100);
41 >            oneTest(i, ITERS);
42 >            if (i == k) {
43 >                k = i << 1;
44 >                i = i + (i >>> 1);
45 >            }
46 >            else
47 >                i = k;
48          }
49 +
50          pool.shutdown();
51 <   }
51 >    }
52  
53 <    static void oneTest(int consumers, int iters) throws Exception {
53 >    static void warmup() throws Exception {
54 >        print = false;
55 >        System.out.print("Warmup ");
56 >        int it = 2000;
57 >        for (int j = 5; j > 0; --j) {
58 >            oneTest(j, it);
59 >            System.out.print(".");
60 >            it += 1000;
61 >        }
62 >        System.gc();
63 >        it = 20000;
64 >        for (int j = 5; j > 0; --j) {
65 >            oneTest(j, it);
66 >            System.out.print(".");
67 >            it += 10000;
68 >        }
69 >        System.gc();
70 >        System.out.println();
71 >    }
72 >
73 >    static void oneTest(int n, int iters) throws Exception {
74 >        int fairIters = iters/16;
75 >
76 >        Thread.sleep(100); // System.gc();
77          if (print)
78 <            System.out.print("ArrayBlockingQueue      ");
79 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
78 >            System.out.print("LinkedTransferQueue     ");
79 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
80  
81 +        Thread.sleep(100); // System.gc();
82          if (print)
83              System.out.print("LinkedBlockingQueue     ");
84 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
84 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
85  
86 +        Thread.sleep(100); // System.gc();
87          if (print)
88 <            System.out.print("PriorityBlockingQueue   ");
89 <        oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters/10);
88 >            System.out.print("LinkedBlockingQueue(cap)");
89 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
90 >
91 >        Thread.sleep(100); // System.gc();
92 >        if (print)
93 >            System.out.print("LinkedBlockingDeque     ");
94 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
95 >
96 >        Thread.sleep(100); // System.gc();
97 >        if (print)
98 >            System.out.print("ArrayBlockingQueue      ");
99 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
100  
101 +        Thread.sleep(100); // System.gc();
102          if (print)
103              System.out.print("SynchronousQueue        ");
104 <        oneRun(new SynchronousQueue<Integer>(), consumers, iters);
104 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
105  
106 +        Thread.sleep(100); // System.gc();
107 +        if (print)
108 +            System.out.print("SynchronousQueue(fair)  ");
109 +        oneRun(new SynchronousQueue<Integer>(true), n, iters);
110 +
111 +        Thread.sleep(100); // System.gc();
112 +        if (print)
113 +            System.out.print("LinkedTransferQueue(xfer)");
114 +        oneRun(new LTQasSQ<Integer>(), n, iters);
115 +
116 +        Thread.sleep(100); // System.gc();
117 +        if (print)
118 +            System.out.print("LinkedTransferQueue(half)");
119 +        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
120 +
121 +        Thread.sleep(100); // System.gc();
122 +        if (print)
123 +            System.out.print("PriorityBlockingQueue   ");
124 +        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
125 +
126 +        Thread.sleep(100); // System.gc();
127          if (print)
128              System.out.print("ArrayBlockingQueue(fair)");
129 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters/10);
129 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
130      }
131 <    
132 <    static abstract class Stage implements Runnable {
131 >
132 >    abstract static class Stage implements Runnable {
133          final int iters;
134          final BlockingQueue<Integer> queue;
135          final CyclicBarrier barrier;
136          volatile int result;
137 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
138 <            queue = q;
137 >        Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
138 >            queue = q;
139              barrier = b;
140              this.iters = iters;
141          }
# Line 77 | Line 149 | public class SingleProducerMultipleConsu
149          public void run() {
150              try {
151                  barrier.await();
152 +                int r = hashCode();
153                  for (int i = 0; i < iters; ++i) {
154 <                    queue.put(new Integer(i));
154 >                    r = LoopHelpers.compute7(r);
155 >                    Integer v = intPool[r & POOL_MASK];
156 >                    queue.put(v);
157                  }
158                  barrier.await();
159                  result = 432;
160              }
161 <            catch (Exception ie) {
162 <                ie.printStackTrace();
163 <                return;
161 >            catch (Exception ie) {
162 >                ie.printStackTrace();
163 >                return;
164              }
165          }
166      }
167  
168      static class Consumer extends Stage {
169 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
169 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
170              super(q, b, iters);
171          }
172  
# Line 100 | Line 175 | public class SingleProducerMultipleConsu
175                  barrier.await();
176                  int l = 0;
177                  int s = 0;
103                int last = -1;
178                  for (int i = 0; i < iters; ++i) {
179                      Integer item = queue.take();
180 <                    int v = item.intValue();
107 <                    if (v < last)
108 <                        throw new Error("Out-of-Order transfer");
109 <                    last = v;
110 <                    l = LoopHelpers.compute1(v);
111 <                    s += l;
180 >                    s += item.intValue();
181                  }
182                  barrier.await();
183                  result = s;
184 +                if (s == 0) System.out.print(" ");
185              }
186 <            catch (Exception ie) {
187 <                ie.printStackTrace();
188 <                return;
186 >            catch (Exception ie) {
187 >                ie.printStackTrace();
188 >                return;
189              }
190          }
191  
# Line 135 | Line 205 | public class SingleProducerMultipleConsu
205              System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
206      }
207  
208 +    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
209 +        LTQasSQ() { super(); }
210 +        public void put(T x) {
211 +            try { super.transfer(x);
212 +            } catch (InterruptedException ex) { throw new Error(); }
213 +        }
214 +    }
215 +
216 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
217 +        int calls;
218 +        HalfSyncLTQ() { super(); }
219 +        public void put(T x) {
220 +            if ((++calls & 1) == 0)
221 +                super.put(x);
222 +            else {
223 +                try { super.transfer(x);
224 +                } catch (InterruptedException ex) {
225 +                    throw new Error();
226 +                }
227 +            }
228 +        }
229 +    }
230 +
231   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines