ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/ConcurrentQueueLoops.java
Revision: 1.4
Committed: Mon Nov 28 15:40:56 2005 UTC (18 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.3: +2 -1 lines
Log Message:
Update and add misc tests

File Contents

# User Rev Content
1 dl 1.1 /*
2     * @test %I% %E%
3     * @bug 4486658
4     * @compile -source 1.5 ConcurrentQueueLoops.java
5     * @run main/timeout=230 ConcurrentQueueLoops
6     * @summary Checks that a set of threads can repeatedly get and modify items
7     */
8     /*
9     * Written by Doug Lea with assistance from members of JCP JSR-166
10     * Expert Group and released to the public domain. Use, modify, and
11     * redistribute this code in any way without acknowledgement.
12     */
13    
14     import java.util.*;
15     import java.util.concurrent.*;
16 dl 1.2 import java.util.concurrent.locks.*;
17 dl 1.1 import java.util.concurrent.atomic.*;
18    
19     public class ConcurrentQueueLoops {
20     static final ExecutorService pool = Executors.newCachedThreadPool();
21     static boolean print = false;
22 dl 1.3 static final Integer zero = new Integer(0);
23     static final Integer one = new Integer(1);
24 dl 1.2 static int workMask;
25     static final long RUN_TIME_NANOS = 5 * 1000L * 1000L * 1000L;
26 dl 1.3 static final int BATCH_SIZE = 8;
27 dl 1.1
28     public static void main(String[] args) throws Exception {
29 dl 1.3 int maxStages = 100;
30     int work = 1024;
31 dl 1.1 Class klass = null;
32     if (args.length > 0) {
33     try {
34     klass = Class.forName(args[0]);
35     } catch(ClassNotFoundException e) {
36     throw new RuntimeException("Class " + args[0] + " not found.");
37     }
38     }
39    
40     if (args.length > 1)
41     maxStages = Integer.parseInt(args[1]);
42    
43 dl 1.2 if (args.length > 2)
44     work = Integer.parseInt(args[2]);
45    
46     workMask = work - 1;
47 dl 1.1 System.out.print("Class: " + klass.getName());
48 dl 1.2 System.out.print(" stages: " + maxStages);
49     System.out.println(" work: " + work);
50 dl 1.1
51     print = false;
52     System.out.println("Warmup...");
53 dl 1.4 // oneRun(klass, 4);
54     //
55 dl 1.1 Thread.sleep(100);
56 dl 1.2 oneRun(klass, 1);
57 dl 1.1 Thread.sleep(100);
58     print = true;
59    
60 dl 1.2 int k = 1;
61     for (int i = 1; i <= maxStages;) {
62     oneRun(klass, i);
63     if (i == k) {
64     k = i << 1;
65     i = i + (i >>> 1);
66     }
67     else
68     i = k;
69 dl 1.1 }
70     pool.shutdown();
71     }
72    
73 dl 1.2 static final class Stage implements Callable<Integer> {
74 dl 1.1 final Queue<Integer> queue;
75     final CyclicBarrier barrier;
76 dl 1.3 final int nthreads;
77     Stage (Queue<Integer> q, CyclicBarrier b, int nthreads) {
78 dl 1.1 queue = q;
79     barrier = b;
80 dl 1.3 this.nthreads = nthreads;
81 dl 1.2 }
82    
83     static int compute(int l) {
84     if (l == 0)
85     return (int)System.nanoTime();
86 dl 1.3 int nn = (l >>> 7) & workMask;
87 dl 1.2 while (nn-- > 0)
88 dl 1.3 l = LoopHelpers.compute6(l);
89 dl 1.2 return l;
90 dl 1.1 }
91    
92     public Integer call() {
93     try {
94     barrier.await();
95 dl 1.2 long now = System.nanoTime();
96     long stopTime = now + RUN_TIME_NANOS;
97     int l = (int)now;
98 dl 1.1 int takes = 0;
99 dl 1.2 int misses = 0;
100 dl 1.3 int lmask = 1;
101 dl 1.1 for (;;) {
102 dl 1.2 l = compute(l);
103 dl 1.1 Integer item = queue.poll();
104     if (item != null) {
105     ++takes;
106 dl 1.3 if (item == one)
107     l = LoopHelpers.compute6(l);
108 dl 1.2 } else if ((misses++ & 255) == 0 &&
109     System.nanoTime() >= stopTime) {
110 dl 1.3 return new Integer(takes);
111 dl 1.2 } else {
112 dl 1.3 for (int i = 0; i < BATCH_SIZE; ++i) {
113     queue.offer(((l & lmask)== 0)? zero : one);
114     if ((lmask <<= 1) == 0) lmask = 1;
115     if (i != 0) l = compute(l);
116     }
117 dl 1.1 }
118     }
119     }
120     catch (Exception ie) {
121     ie.printStackTrace();
122     throw new Error("Call loop failed");
123     }
124     }
125     }
126    
127 dl 1.2 static void oneRun(Class klass, int n) throws Exception {
128 dl 1.1 Queue<Integer> q = (Queue<Integer>)klass.newInstance();
129     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
130     CyclicBarrier barrier = new CyclicBarrier(n + 1, timer);
131     ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>(n);
132     for (int i = 0; i < n; ++i)
133 dl 1.3 results.add(pool.submit(new Stage(q, barrier, n)));
134 dl 1.1
135     if (print)
136     System.out.print("Threads: " + n + "\t:");
137     barrier.await();
138     int total = 0;
139     for (int i = 0; i < n; ++i) {
140     Future<Integer> f = results.get(i);
141     Integer r = f.get();
142     total += r.intValue();
143     }
144     long endTime = System.nanoTime();
145     long time = endTime - timer.startTime;
146 dl 1.3 long ips = 1000000000L * total / time;
147    
148     if (print)
149     System.out.print(LoopHelpers.rightJustify(ips) + " items per sec");
150 dl 1.1 if (print)
151 dl 1.3 System.out.println();
152 dl 1.1 }
153 dl 1.2
154 dl 1.1 }