ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/SingleProducerMultipleConsumerLoops.java
Revision: 1.2
Committed: Mon Feb 19 00:46:06 2007 UTC (17 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.1: +2 -6 lines
Log Message:
Uniform headers

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 import java.util.concurrent.*;
8
9 public class SingleProducerMultipleConsumerLoops {
10 static final int CAPACITY = 100;
11
12 static final ExecutorService pool = Executors.newCachedThreadPool();
13 static boolean print = false;
14
15 public static void main(String[] args) throws Exception {
16 int maxConsumers = 100;
17 int iters = 10000;
18
19 if (args.length > 0)
20 maxConsumers = Integer.parseInt(args[0]);
21
22 print = false;
23 System.out.println("Warmup...");
24 oneTest(1, 10000);
25 Thread.sleep(100);
26 oneTest(2, 10000);
27 Thread.sleep(100);
28 print = true;
29
30 for (int i = 1; i <= maxConsumers; i += (i+1) >>> 1) {
31 System.out.println("Consumers:" + i);
32 oneTest(i, iters);
33 Thread.sleep(100);
34 }
35 pool.shutdown();
36 }
37
38 static void oneTest(int consumers, int iters) throws Exception {
39 if (print)
40 System.out.print("ArrayBlockingQueue ");
41 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), consumers, iters);
42
43 if (print)
44 System.out.print("LinkedBlockingQueue ");
45 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), consumers, iters);
46
47 if (print)
48 System.out.print("PriorityBlockingQueue ");
49 oneRun(new PriorityBlockingQueue<Integer>(), consumers, iters/10);
50
51 if (print)
52 System.out.print("SynchronousQueue ");
53 oneRun(new SynchronousQueue<Integer>(), consumers, iters);
54
55 if (print)
56 System.out.print("ArrayBlockingQueue(fair)");
57 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY, true), consumers, iters/10);
58 }
59
60 static abstract class Stage implements Runnable {
61 final int iters;
62 final BlockingQueue<Integer> queue;
63 final CyclicBarrier barrier;
64 volatile int result;
65 Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
66 queue = q;
67 barrier = b;
68 this.iters = iters;
69 }
70 }
71
72 static class Producer extends Stage {
73 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
74 super(q, b, iters);
75 }
76
77 public void run() {
78 try {
79 barrier.await();
80 for (int i = 0; i < iters; ++i) {
81 queue.put(new Integer(i));
82 }
83 barrier.await();
84 result = 432;
85 }
86 catch (Exception ie) {
87 ie.printStackTrace();
88 return;
89 }
90 }
91 }
92
93 static class Consumer extends Stage {
94 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
95 super(q, b, iters);
96 }
97
98 public void run() {
99 try {
100 barrier.await();
101 int l = 0;
102 int s = 0;
103 int last = -1;
104 for (int i = 0; i < iters; ++i) {
105 Integer item = queue.take();
106 int v = item.intValue();
107 if (v < last)
108 throw new Error("Out-of-Order transfer");
109 last = v;
110 l = LoopHelpers.compute1(v);
111 s += l;
112 }
113 barrier.await();
114 result = s;
115 }
116 catch (Exception ie) {
117 ie.printStackTrace();
118 return;
119 }
120 }
121
122 }
123
124 static void oneRun(BlockingQueue<Integer> q, int nconsumers, int iters) throws Exception {
125 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
126 CyclicBarrier barrier = new CyclicBarrier(nconsumers + 2, timer);
127 pool.execute(new Producer(q, barrier, iters * nconsumers));
128 for (int i = 0; i < nconsumers; ++i) {
129 pool.execute(new Consumer(q, barrier, iters));
130 }
131 barrier.await();
132 barrier.await();
133 long time = timer.getTime();
134 if (print)
135 System.out.println("\t: " + LoopHelpers.rightJustify(time / (iters * nconsumers)) + " ns per transfer");
136 }
137
138 }