ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CancelledProducerConsumerLoops.java
Revision: 1.5
Committed: Thu Oct 29 23:09:07 2009 UTC (14 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.4: +7 -7 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.4 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6     import java.util.concurrent.*;
7    
8     public class CancelledProducerConsumerLoops {
9     static final int CAPACITY = 100;
10 jsr166 1.5 static final long TIMEOUT = 100;
11 dl 1.1
12     static final ExecutorService pool = Executors.newCachedThreadPool();
13     static boolean print = false;
14    
15     public static void main(String[] args) throws Exception {
16     int maxPairs = 8;
17     int iters = 1000000;
18    
19 jsr166 1.5 if (args.length > 0)
20 dl 1.1 maxPairs = Integer.parseInt(args[0]);
21    
22     print = true;
23 jsr166 1.5
24 dl 1.1 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
25     System.out.println("Pairs:" + i);
26     try {
27     oneTest(i, iters);
28     }
29     catch(BrokenBarrierException bb) {
30     // OK, ignore
31     }
32     Thread.sleep(100);
33     }
34     pool.shutdown();
35     }
36    
37     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
38     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
39     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
40     Future[] prods = new Future[npairs];
41     Future[] cons = new Future[npairs];
42 jsr166 1.5
43 dl 1.1 for (int i = 0; i < npairs; ++i) {
44     prods[i] = pool.submit(new Producer(q, barrier, iters));
45     cons[i] = pool.submit(new Consumer(q, barrier, iters));
46     }
47     barrier.await();
48     Thread.sleep(TIMEOUT);
49     boolean tooLate = false;
50    
51     for (int i = 1; i < npairs; ++i) {
52     if (!prods[i].cancel(true))
53     tooLate = true;
54     if (!cons[i].cancel(true))
55     tooLate = true;
56     }
57    
58     Object p0 = prods[0].get();
59     Object c0 = cons[0].get();
60    
61     if (!tooLate) {
62     for (int i = 1; i < npairs; ++i) {
63     if (!prods[i].isDone() || !prods[i].isCancelled())
64     throw new Error("Only one producer thread should complete");
65     if (!cons[i].isDone() || !cons[i].isCancelled())
66     throw new Error("Only one consumer thread should complete");
67     }
68     }
69     else
70     System.out.print("(cancelled too late) ");
71    
72     long endTime = System.nanoTime();
73     long time = endTime - timer.startTime;
74     if (print) {
75     double secs = (double)(time) / 1000000000.0;
76     System.out.println("\t " + secs + "s run time");
77     }
78     }
79    
80     static void oneTest(int pairs, int iters) throws Exception {
81    
82     if (print)
83     System.out.print("ArrayBlockingQueue ");
84     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
85    
86     if (print)
87     System.out.print("LinkedBlockingQueue ");
88     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
89    
90    
91     if (print)
92     System.out.print("SynchronousQueue ");
93     oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
94    
95    
96     if (print)
97     System.out.print("SynchronousQueue(fair) ");
98     oneRun(new SynchronousQueue<Integer>(true), pairs, iters / 8);
99    
100 dl 1.3 /* Can legitimately run out of memory before cancellation
101 dl 1.1 if (print)
102     System.out.print("PriorityBlockingQueue ");
103     oneRun(new PriorityBlockingQueue<Integer>(ITERS / 2 * pairs), pairs, iters / 4);
104 dl 1.3 */
105 dl 1.1 }
106 jsr166 1.5
107 dl 1.1 static abstract class Stage implements Callable {
108     final BlockingQueue<Integer> queue;
109     final CyclicBarrier barrier;
110     final int iters;
111     Stage (BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
112 jsr166 1.5 queue = q;
113 dl 1.1 barrier = b;
114     this.iters = iters;
115     }
116     }
117    
118     static class Producer extends Stage {
119     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
120     super(q, b, iters);
121     }
122    
123     public Object call() throws Exception {
124     barrier.await();
125     int s = 0;
126     int l = 4321;
127     for (int i = 0; i < iters; ++i) {
128     l = LoopHelpers.compute1(l);
129     s += LoopHelpers.compute2(l);
130     if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
131     break;
132     }
133     return new Integer(s);
134     }
135     }
136    
137     static class Consumer extends Stage {
138 jsr166 1.5 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
139 dl 1.1 super(q, b, iters);
140     }
141    
142     public Object call() throws Exception {
143     barrier.await();
144     int l = 0;
145     int s = 0;
146     for (int i = 0; i < iters; ++i) {
147     Integer x = queue.poll(1, TimeUnit.SECONDS);
148     if (x == null)
149     break;
150     l = LoopHelpers.compute1(x.intValue());
151     s += l;
152     }
153     return new Integer(s);
154     }
155     }
156    
157    
158     }