ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java (file contents):
Revision 1.1 by dl, Mon May 2 19:19:38 2005 UTC vs.
Revision 1.12 by jsr166, Sat Dec 31 21:34:47 2016 UTC

# Line 1 | Line 1
1   /*
2 * @test
3 * @synopsis  check ordering for blocking queues with 1 producer and multiple consumers
4 */
5 /*
2   * Written by Doug Lea with assistance from members of JCP JSR-166
3 < * Expert Group and released to the public domain. Use, modify, and
4 < * redistribute this code in any way without acknowledgement.
3 > * Expert Group and released to the public domain, as explained at
4 > * http://creativecommons.org/publicdomain/zero/1.0/
5   */
6  
7 < import java.util.concurrent.*;
7 > import java.util.concurrent.ArrayBlockingQueue;
8 > import java.util.concurrent.BlockingQueue;
9 > import java.util.concurrent.CyclicBarrier;
10 > import java.util.concurrent.ExecutorService;
11 > import java.util.concurrent.Executors;
12 > import java.util.concurrent.LinkedBlockingDeque;
13 > import java.util.concurrent.LinkedBlockingQueue;
14 > import java.util.concurrent.LinkedTransferQueue;
15 > import java.util.concurrent.PriorityBlockingQueue;
16 > import java.util.concurrent.SynchronousQueue;
17  
18   public class SingleProducerMultipleConsumerLoops {
19 <    static final int CAPACITY =      100;
19 >    static final int NCPUS = Runtime.getRuntime().availableProcessors();
20 >
21 >    // Number of puts by producers or takes by consumers
22 >    static final int ITERS = 1 << 20;
23  
24      static final ExecutorService pool = Executors.newCachedThreadPool();
25      static boolean print = false;
26  
27 +    // Number of elements passed around -- must be power of two
28 +    // Elements are reused from pool to minimize alloc impact
29 +    static final int POOL_SIZE = 1 << 8;
30 +    static final int POOL_MASK = POOL_SIZE-1;
31 +    static final Integer[] intPool = new Integer[POOL_SIZE];
32 +    static {
33 +        for (int i = 0; i < POOL_SIZE; ++i)
34 +            intPool[i] = Integer.valueOf(i);
35 +    }
36 +
37      public static void main(String[] args) throws Exception {
38 <        int maxConsumers = 100;
21 <        int iters = 10000;
38 >        int maxn = 12;
39  
40 <        if (args.length > 0)
41 <            maxConsumers = Integer.parseInt(args[0]);
40 >        if (args.length > 0)
41 >            maxn = Integer.parseInt(args[0]);
42  
43          print = false;
44 <        System.out.println("Warmup...");
28 <        oneTest(1, 10000);
29 <        Thread.sleep(100);
30 <        oneTest(2, 10000);
31 <        Thread.sleep(100);
44 >        warmup();
45          print = true;
46 <        
47 <        for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
46 >
47 >        for (int k = 1, i = 1; i <= maxn;) {
48              System.out.println("Consumers:" + i);
49 <            oneTest(i, iters);
50 <            Thread.sleep(100);
49 >            oneTest(i, ITERS);
50 >            if (i == k) {
51 >                k = i << 1;
52 >                i = i + (i >>> 1);
53 >            }
54 >            else
55 >                i = k;
56          }
57 +
58          pool.shutdown();
59 <   }
59 >    }
60  
61 <    static void oneTest(int consumers, int iters) throws Exception {
61 >    static void warmup() throws Exception {
62 >        print = false;
63 >        System.out.print("Warmup ");
64 >        int it = 2000;
65 >        for (int j = 5; j > 0; --j) {
66 >            oneTest(j, it);
67 >            System.out.print(".");
68 >            it += 1000;
69 >        }
70 >        System.gc();
71 >        it = 20000;
72 >        for (int j = 5; j > 0; --j) {
73 >            oneTest(j, it);
74 >            System.out.print(".");
75 >            it += 10000;
76 >        }
77 >        System.gc();
78 >        System.out.println();
79 >    }
80 >
81 >    static void oneTest(int n, int iters) throws Exception {
82 >        int fairIters = iters/16;
83 >
84 >        Thread.sleep(100); // System.gc();
85          if (print)
86 <            System.out.print("ArrayBlockingQueue      ");
87 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
86 >            System.out.print("LinkedTransferQueue     ");
87 >        oneRun(new LinkedTransferQueue<Integer>(), n, iters);
88  
89 +        Thread.sleep(100); // System.gc();
90          if (print)
91              System.out.print("LinkedBlockingQueue     ");
92 <        oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
92 >        oneRun(new LinkedBlockingQueue<Integer>(), n, iters);
93  
94 +        Thread.sleep(100); // System.gc();
95          if (print)
96 <            System.out.print("PriorityBlockingQueue   ");
97 <        oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters/10);
96 >            System.out.print("LinkedBlockingQueue(cap)");
97 >        oneRun(new LinkedBlockingQueue<Integer>(POOL_SIZE), n, iters);
98 >
99 >        Thread.sleep(100); // System.gc();
100 >        if (print)
101 >            System.out.print("LinkedBlockingDeque     ");
102 >        oneRun(new LinkedBlockingDeque<Integer>(), n, iters);
103 >
104 >        Thread.sleep(100); // System.gc();
105 >        if (print)
106 >            System.out.print("ArrayBlockingQueue      ");
107 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE), n, iters);
108  
109 +        Thread.sleep(100); // System.gc();
110          if (print)
111              System.out.print("SynchronousQueue        ");
112 <        oneRun(new SynchronousQueue<Integer>(), consumers, iters);
112 >        oneRun(new SynchronousQueue<Integer>(), n, iters);
113 >
114 >        Thread.sleep(100); // System.gc();
115 >        if (print)
116 >            System.out.print("SynchronousQueue(fair)  ");
117 >        oneRun(new SynchronousQueue<Integer>(true), n, iters);
118 >
119 >        Thread.sleep(100); // System.gc();
120 >        if (print)
121 >            System.out.print("LinkedTransferQueue(xfer)");
122 >        oneRun(new LTQasSQ<Integer>(), n, iters);
123 >
124 >        Thread.sleep(100); // System.gc();
125 >        if (print)
126 >            System.out.print("LinkedTransferQueue(half)");
127 >        oneRun(new HalfSyncLTQ<Integer>(), n, iters);
128  
129 +        Thread.sleep(100); // System.gc();
130 +        if (print)
131 +            System.out.print("PriorityBlockingQueue   ");
132 +        oneRun(new PriorityBlockingQueue<Integer>(), n, fairIters);
133 +
134 +        Thread.sleep(100); // System.gc();
135          if (print)
136              System.out.print("ArrayBlockingQueue(fair)");
137 <        oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters/10);
137 >        oneRun(new ArrayBlockingQueue<Integer>(POOL_SIZE, true), n, fairIters);
138      }
139 <    
140 <    static abstract class Stage implements Runnable {
139 >
140 >    abstract static class Stage implements Runnable {
141          final int iters;
142          final BlockingQueue<Integer> queue;
143          final CyclicBarrier barrier;
144          volatile int result;
145 <        Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
146 <            queue = q;
145 >        Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
146 >            queue = q;
147              barrier = b;
148              this.iters = iters;
149          }
# Line 81 | Line 157 | public class SingleProducerMultipleConsu
157          public void run() {
158              try {
159                  barrier.await();
160 +                int r = hashCode();
161                  for (int i = 0; i < iters; ++i) {
162 <                    queue.put(new Integer(i));
162 >                    r = LoopHelpers.compute7(r);
163 >                    Integer v = intPool[r & POOL_MASK];
164 >                    queue.put(v);
165                  }
166                  barrier.await();
167                  result = 432;
168              }
169 <            catch (Exception ie) {
170 <                ie.printStackTrace();
171 <                return;
169 >            catch (Exception ie) {
170 >                ie.printStackTrace();
171 >                return;
172              }
173          }
174      }
175  
176      static class Consumer extends Stage {
177 <        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
177 >        Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
178              super(q, b, iters);
179          }
180  
# Line 104 | Line 183 | public class SingleProducerMultipleConsu
183                  barrier.await();
184                  int l = 0;
185                  int s = 0;
107                int last = -1;
186                  for (int i = 0; i < iters; ++i) {
187                      Integer item = queue.take();
188 <                    int v = item.intValue();
111 <                    if (v < last)
112 <                        throw new Error("Out-of-Order transfer");
113 <                    last = v;
114 <                    l = LoopHelpers.compute1(v);
115 <                    s += l;
188 >                    s += item.intValue();
189                  }
190                  barrier.await();
191                  result = s;
192 +                if (s == 0) System.out.print(" ");
193              }
194 <            catch (Exception ie) {
195 <                ie.printStackTrace();
196 <                return;
194 >            catch (Exception ie) {
195 >                ie.printStackTrace();
196 >                return;
197              }
198          }
199  
# Line 139 | Line 213 | public class SingleProducerMultipleConsu
213              System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
214      }
215  
216 +    static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
217 +        LTQasSQ() { super(); }
218 +        public void put(T x) {
219 +            try { super.transfer(x); }
220 +            catch (InterruptedException ex) { throw new Error(ex); }
221 +        }
222 +    }
223 +
224 +    static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
225 +        int calls;
226 +        HalfSyncLTQ() { super(); }
227 +        public void put(T x) {
228 +            if ((++calls & 1) == 0)
229 +                super.put(x);
230 +            else {
231 +                try { super.transfer(x); }
232 +                catch (InterruptedException ex) { throw new Error(ex); }
233 +            }
234 +        }
235 +    }
236 +
237   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines