ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
Revision: 1.2
Committed: Mon Feb 19 00:46:06 2007 UTC (17 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.1: +2 -6 lines
Log Message:
Uniform headers

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.2 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     import java.util.concurrent.*;
8    
9     public class SingleProducerMultipleConsumerLoops {
10     static final int CAPACITY = 100;
11    
12     static final ExecutorService pool = Executors.newCachedThreadPool();
13     static boolean print = false;
14    
15     public static void main(String[] args) throws Exception {
16     int maxConsumers = 100;
17     int iters = 10000;
18    
19     if (args.length > 0)
20     maxConsumers = Integer.parseInt(args[0]);
21    
22     print = false;
23     System.out.println("Warmup...");
24     oneTest(1, 10000);
25     Thread.sleep(100);
26     oneTest(2, 10000);
27     Thread.sleep(100);
28     print = true;
29    
30     for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
31     System.out.println("Consumers:" + i);
32     oneTest(i, iters);
33     Thread.sleep(100);
34     }
35     pool.shutdown();
36     }
37    
38     static void oneTest(int consumers, int iters) throws Exception {
39     if (print)
40     System.out.print("ArrayBlockingQueue ");
41     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
42    
43     if (print)
44     System.out.print("LinkedBlockingQueue ");
45     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
46    
47     if (print)
48     System.out.print("PriorityBlockingQueue ");
49     oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters/10);
50    
51     if (print)
52     System.out.print("SynchronousQueue ");
53     oneRun(new SynchronousQueue<Integer>(), consumers, iters);
54    
55     if (print)
56     System.out.print("ArrayBlockingQueue(fair)");
57     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters/10);
58     }
59    
60     static abstract class Stage implements Runnable {
61     final int iters;
62     final BlockingQueue<Integer> queue;
63     final CyclicBarrier barrier;
64     volatile int result;
65     Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
66     queue = q;
67     barrier = b;
68     this.iters = iters;
69     }
70     }
71    
72     static class Producer extends Stage {
73     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
74     super(q, b, iters);
75     }
76    
77     public void run() {
78     try {
79     barrier.await();
80     for (int i = 0; i < iters; ++i) {
81     queue.put(new Integer(i));
82     }
83     barrier.await();
84     result = 432;
85     }
86     catch (Exception ie) {
87     ie.printStackTrace();
88     return;
89     }
90     }
91     }
92    
93     static class Consumer extends Stage {
94     Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
95     super(q, b, iters);
96     }
97    
98     public void run() {
99     try {
100     barrier.await();
101     int l = 0;
102     int s = 0;
103     int last = -1;
104     for (int i = 0; i < iters; ++i) {
105     Integer item = queue.take();
106     int v = item.intValue();
107     if (v < last)
108     throw new Error("Out-of-Order transfer");
109     last = v;
110     l = LoopHelpers.compute1(v);
111     s += l;
112     }
113     barrier.await();
114     result = s;
115     }
116     catch (Exception ie) {
117     ie.printStackTrace();
118     return;
119     }
120     }
121    
122     }
123    
124     static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
125     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
126     CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
127     pool.execute(new Producer(q, barrier, iters * nconsumers));
128     for (int i = 0; i < nconsumers; ++i) {
129     pool.execute(new Consumer(q, barrier, iters));
130     }
131     barrier.await();
132     barrier.await();
133     long time = timer.getTime();
134     if (print)
135     System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
136     }
137    
138     }