ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CancelledProducerConsumerLoops.java
Revision: 1.17
Committed: Sat Dec 31 19:16:42 2016 UTC (7 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.16: +13 -1 lines
Log Message:
organize imports

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.4 * Expert Group and released to the public domain, as explained at
4 jsr166 1.11 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6 jsr166 1.17
7     import java.util.concurrent.BrokenBarrierException;
8     import java.util.concurrent.Callable;
9     import java.util.concurrent.CyclicBarrier;
10     import java.util.concurrent.ExecutorService;
11     import java.util.concurrent.Executors;
12     import java.util.concurrent.ArrayBlockingQueue;
13     import java.util.concurrent.BlockingQueue;
14     import java.util.concurrent.Future;
15     import java.util.concurrent.LinkedBlockingQueue;
16     import java.util.concurrent.LinkedTransferQueue;
17     import java.util.concurrent.SynchronousQueue;
18     import java.util.concurrent.TimeUnit;
19 dl 1.1
20     public class CancelledProducerConsumerLoops {
21 jsr166 1.16 static final int CAPACITY = 100;
22 jsr166 1.5 static final long TIMEOUT = 100;
23 dl 1.1
24     static final ExecutorService pool = Executors.newCachedThreadPool();
25     static boolean print = false;
26    
27     public static void main(String[] args) throws Exception {
28     int maxPairs = 8;
29     int iters = 1000000;
30    
31 jsr166 1.5 if (args.length > 0)
32 dl 1.1 maxPairs = Integer.parseInt(args[0]);
33    
34     print = true;
35 jsr166 1.5
36 dl 1.1 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
37     System.out.println("Pairs:" + i);
38     try {
39     oneTest(i, iters);
40     }
41 jsr166 1.7 catch (BrokenBarrierException bb) {
42 dl 1.1 // OK, ignore
43     }
44     Thread.sleep(100);
45     }
46     pool.shutdown();
47 jsr166 1.12 }
48 dl 1.1
49     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
50     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
51     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
52 jsr166 1.13 Future<?>[] prods = new Future<?>[npairs];
53     Future<?>[] cons = new Future<?>[npairs];
54 jsr166 1.5
55 dl 1.1 for (int i = 0; i < npairs; ++i) {
56     prods[i] = pool.submit(new Producer(q, barrier, iters));
57     cons[i] = pool.submit(new Consumer(q, barrier, iters));
58     }
59     barrier.await();
60     Thread.sleep(TIMEOUT);
61     boolean tooLate = false;
62    
63     for (int i = 1; i < npairs; ++i) {
64     if (!prods[i].cancel(true))
65     tooLate = true;
66     if (!cons[i].cancel(true))
67     tooLate = true;
68     }
69    
70     Object p0 = prods[0].get();
71     Object c0 = cons[0].get();
72    
73     if (!tooLate) {
74     for (int i = 1; i < npairs; ++i) {
75     if (!prods[i].isDone() || !prods[i].isCancelled())
76     throw new Error("Only one producer thread should complete");
77     if (!cons[i].isDone() || !cons[i].isCancelled())
78     throw new Error("Only one consumer thread should complete");
79     }
80     }
81     else
82     System.out.print("(cancelled too late) ");
83    
84     long endTime = System.nanoTime();
85     long time = endTime - timer.startTime;
86     if (print) {
87 jsr166 1.8 double secs = (double) time / 1000000000.0;
88 dl 1.1 System.out.println("\t " + secs + "s run time");
89     }
90     }
91    
92     static void oneTest(int pairs, int iters) throws Exception {
93    
94     if (print)
95     System.out.print("ArrayBlockingQueue ");
96     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
97    
98     if (print)
99     System.out.print("LinkedBlockingQueue ");
100     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
101    
102 dl 1.6 if (print)
103     System.out.print("LinkedTransferQueue ");
104     oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
105 dl 1.1
106     if (print)
107     System.out.print("SynchronousQueue ");
108     oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
109    
110     if (print)
111     System.out.print("SynchronousQueue(fair) ");
112     oneRun(new SynchronousQueue<Integer>(true), pairs, iters / 8);
113    
114 dl 1.3 /* Can legitimately run out of memory before cancellation
115 dl 1.1 if (print)
116     System.out.print("PriorityBlockingQueue ");
117     oneRun(new PriorityBlockingQueue<Integer>(ITERS / 2 * pairs), pairs, iters / 4);
118 dl 1.3 */
119 dl 1.1 }
120 jsr166 1.5
121 jsr166 1.10 abstract static class Stage implements Callable {
122 dl 1.1 final BlockingQueue<Integer> queue;
123     final CyclicBarrier barrier;
124     final int iters;
125 jsr166 1.9 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
126 jsr166 1.5 queue = q;
127 dl 1.1 barrier = b;
128     this.iters = iters;
129     }
130     }
131    
132     static class Producer extends Stage {
133     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
134     super(q, b, iters);
135     }
136    
137     public Object call() throws Exception {
138     barrier.await();
139     int s = 0;
140     int l = 4321;
141     for (int i = 0; i < iters; ++i) {
142     l = LoopHelpers.compute1(l);
143     s += LoopHelpers.compute2(l);
144     if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
145     break;
146     }
147     return new Integer(s);
148     }
149     }
150    
151     static class Consumer extends Stage {
152 jsr166 1.5 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
153 dl 1.1 super(q, b, iters);
154     }
155    
156     public Object call() throws Exception {
157     barrier.await();
158     int l = 0;
159     int s = 0;
160     for (int i = 0; i < iters; ++i) {
161     Integer x = queue.poll(1, TimeUnit.SECONDS);
162     if (x == null)
163     break;
164     l = LoopHelpers.compute1(x.intValue());
165     s += l;
166     }
167     return new Integer(s);
168     }
169     }
170     }