ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CancelledProducerConsumerLoops.java
Revision: 1.9
Committed: Wed Sep 1 07:47:27 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.8: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.4 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6     import java.util.concurrent.*;
7    
8     public class CancelledProducerConsumerLoops {
9     static final int CAPACITY = 100;
10 jsr166 1.5 static final long TIMEOUT = 100;
11 dl 1.1
12     static final ExecutorService pool = Executors.newCachedThreadPool();
13     static boolean print = false;
14    
15     public static void main(String[] args) throws Exception {
16     int maxPairs = 8;
17     int iters = 1000000;
18    
19 jsr166 1.5 if (args.length > 0)
20 dl 1.1 maxPairs = Integer.parseInt(args[0]);
21    
22     print = true;
23 jsr166 1.5
24 dl 1.1 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
25     System.out.println("Pairs:" + i);
26     try {
27     oneTest(i, iters);
28     }
29 jsr166 1.7 catch (BrokenBarrierException bb) {
30 dl 1.1 // OK, ignore
31     }
32     Thread.sleep(100);
33     }
34     pool.shutdown();
35     }
36    
37     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
38     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
39     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
40     Future[] prods = new Future[npairs];
41     Future[] cons = new Future[npairs];
42 jsr166 1.5
43 dl 1.1 for (int i = 0; i < npairs; ++i) {
44     prods[i] = pool.submit(new Producer(q, barrier, iters));
45     cons[i] = pool.submit(new Consumer(q, barrier, iters));
46     }
47     barrier.await();
48     Thread.sleep(TIMEOUT);
49     boolean tooLate = false;
50    
51     for (int i = 1; i < npairs; ++i) {
52     if (!prods[i].cancel(true))
53     tooLate = true;
54     if (!cons[i].cancel(true))
55     tooLate = true;
56     }
57    
58     Object p0 = prods[0].get();
59     Object c0 = cons[0].get();
60    
61     if (!tooLate) {
62     for (int i = 1; i < npairs; ++i) {
63     if (!prods[i].isDone() || !prods[i].isCancelled())
64     throw new Error("Only one producer thread should complete");
65     if (!cons[i].isDone() || !cons[i].isCancelled())
66     throw new Error("Only one consumer thread should complete");
67     }
68     }
69     else
70     System.out.print("(cancelled too late) ");
71    
72     long endTime = System.nanoTime();
73     long time = endTime - timer.startTime;
74     if (print) {
75 jsr166 1.8 double secs = (double) time / 1000000000.0;
76 dl 1.1 System.out.println("\t " + secs + "s run time");
77     }
78     }
79    
80     static void oneTest(int pairs, int iters) throws Exception {
81    
82     if (print)
83     System.out.print("ArrayBlockingQueue ");
84     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
85    
86     if (print)
87     System.out.print("LinkedBlockingQueue ");
88     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
89    
90 dl 1.6 if (print)
91     System.out.print("LinkedTransferQueue ");
92     oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
93 dl 1.1
94     if (print)
95     System.out.print("SynchronousQueue ");
96     oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
97    
98    
99     if (print)
100     System.out.print("SynchronousQueue(fair) ");
101     oneRun(new SynchronousQueue<Integer>(true), pairs, iters / 8);
102    
103 dl 1.3 /* Can legitimately run out of memory before cancellation
104 dl 1.1 if (print)
105     System.out.print("PriorityBlockingQueue ");
106     oneRun(new PriorityBlockingQueue<Integer>(ITERS / 2 * pairs), pairs, iters / 4);
107 dl 1.3 */
108 dl 1.1 }
109 jsr166 1.5
110 dl 1.1 static abstract class Stage implements Callable {
111     final BlockingQueue<Integer> queue;
112     final CyclicBarrier barrier;
113     final int iters;
114 jsr166 1.9 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
115 jsr166 1.5 queue = q;
116 dl 1.1 barrier = b;
117     this.iters = iters;
118     }
119     }
120    
121     static class Producer extends Stage {
122     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
123     super(q, b, iters);
124     }
125    
126     public Object call() throws Exception {
127     barrier.await();
128     int s = 0;
129     int l = 4321;
130     for (int i = 0; i < iters; ++i) {
131     l = LoopHelpers.compute1(l);
132     s += LoopHelpers.compute2(l);
133     if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
134     break;
135     }
136     return new Integer(s);
137     }
138     }
139    
140     static class Consumer extends Stage {
141 jsr166 1.5 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
142 dl 1.1 super(q, b, iters);
143     }
144    
145     public Object call() throws Exception {
146     barrier.await();
147     int l = 0;
148     int s = 0;
149     for (int i = 0; i < iters; ++i) {
150     Integer x = queue.poll(1, TimeUnit.SECONDS);
151     if (x == null)
152     break;
153     l = LoopHelpers.compute1(x.intValue());
154     s += l;
155     }
156     return new Integer(s);
157     }
158     }
159    
160    
161     }