ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CancelledProducerConsumerLoops.java
Revision: 1.2
Committed: Mon Nov 28 15:40:56 2005 UTC (18 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.1: +0 -2 lines
Log Message:
Update and add misc tests

File Contents

# User Rev Content
1 dl 1.1 /*
2     * @test %I% %E%
3     * @bug 4486658
4     * @compile -source 1.5 CancelledProducerConsumerLoops.java
5     * @run main/timeout=7000 CancelledProducerConsumerLoops
6     * @summary Checks for responsiveness of blocking queues to cancellation.
7     * Runs under the assumption that ITERS computations require more than
8     * TIMEOUT msecs to complete.
9     */
10     /*
11     * Written by Doug Lea with assistance from members of JCP JSR-166
12     * Expert Group and released to the public domain. Use, modify, and
13     * redistribute this code in any way without acknowledgement.
14     */
15    
16     import java.util.concurrent.*;
17    
18     public class CancelledProducerConsumerLoops {
19     static final int CAPACITY = 100;
20     static final long TIMEOUT = 100;
21    
22     static final ExecutorService pool = Executors.newCachedThreadPool();
23     static boolean print = false;
24    
25     public static void main(String[] args) throws Exception {
26     int maxPairs = 8;
27     int iters = 1000000;
28    
29     if (args.length > 0)
30     maxPairs = Integer.parseInt(args[0]);
31    
32     print = true;
33    
34     for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
35     System.out.println("Pairs:" + i);
36     try {
37     oneTest(i, iters);
38     }
39     catch(BrokenBarrierException bb) {
40     // OK, ignore
41     }
42     Thread.sleep(100);
43     }
44     pool.shutdown();
45     }
46    
47     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
48     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
49     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
50     Future[] prods = new Future[npairs];
51     Future[] cons = new Future[npairs];
52    
53     for (int i = 0; i < npairs; ++i) {
54     prods[i] = pool.submit(new Producer(q, barrier, iters));
55     cons[i] = pool.submit(new Consumer(q, barrier, iters));
56     }
57     barrier.await();
58     Thread.sleep(TIMEOUT);
59     boolean tooLate = false;
60    
61     for (int i = 1; i < npairs; ++i) {
62     if (!prods[i].cancel(true))
63     tooLate = true;
64     if (!cons[i].cancel(true))
65     tooLate = true;
66     }
67    
68     Object p0 = prods[0].get();
69     Object c0 = cons[0].get();
70    
71     if (!tooLate) {
72     for (int i = 1; i < npairs; ++i) {
73     if (!prods[i].isDone() || !prods[i].isCancelled())
74     throw new Error("Only one producer thread should complete");
75     if (!cons[i].isDone() || !cons[i].isCancelled())
76     throw new Error("Only one consumer thread should complete");
77     }
78     }
79     else
80     System.out.print("(cancelled too late) ");
81    
82     long endTime = System.nanoTime();
83     long time = endTime - timer.startTime;
84     if (print) {
85     double secs = (double)(time) / 1000000000.0;
86     System.out.println("\t " + secs + "s run time");
87     }
88     }
89    
90     static void oneTest(int pairs, int iters) throws Exception {
91    
92     if (print)
93     System.out.print("ArrayBlockingQueue ");
94     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
95    
96     if (print)
97     System.out.print("LinkedBlockingQueue ");
98     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
99    
100    
101     if (print)
102     System.out.print("SynchronousQueue ");
103     oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
104    
105    
106     if (print)
107     System.out.print("SynchronousQueue(fair) ");
108     oneRun(new SynchronousQueue<Integer>(true), pairs, iters / 8);
109    
110     if (print)
111     System.out.print("PriorityBlockingQueue ");
112     oneRun(new PriorityBlockingQueue<Integer>(ITERS / 2 * pairs), pairs, iters / 4);
113     }
114    
115     static abstract class Stage implements Callable {
116     final BlockingQueue<Integer> queue;
117     final CyclicBarrier barrier;
118     final int iters;
119     Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
120     queue = q;
121     barrier = b;
122     this.iters = iters;
123     }
124     }
125    
126     static class Producer extends Stage {
127     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
128     super(q, b, iters);
129     }
130    
131     public Object call() throws Exception {
132     barrier.await();
133     int s = 0;
134     int l = 4321;
135     for (int i = 0; i < iters; ++i) {
136     l = LoopHelpers.compute1(l);
137     s += LoopHelpers.compute2(l);
138     if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
139     break;
140     }
141     return new Integer(s);
142     }
143     }
144    
145     static class Consumer extends Stage {
146     Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
147     super(q, b, iters);
148     }
149    
150     public Object call() throws Exception {
151     barrier.await();
152     int l = 0;
153     int s = 0;
154     for (int i = 0; i < iters; ++i) {
155     Integer x = queue.poll(1, TimeUnit.SECONDS);
156     if (x == null)
157     break;
158     l = LoopHelpers.compute1(x.intValue());
159     s += l;
160     }
161     return new Integer(s);
162     }
163     }
164    
165    
166     }