ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CancelledProducerConsumerLoops.java
Revision: 1.16
Committed: Wed Oct 28 06:30:09 2015 UTC (8 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.15: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.4 * Expert Group and released to the public domain, as explained at
4 jsr166 1.11 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6     import java.util.concurrent.*;
7    
8     public class CancelledProducerConsumerLoops {
9 jsr166 1.16 static final int CAPACITY = 100;
10 jsr166 1.5 static final long TIMEOUT = 100;
11 dl 1.1
12     static final ExecutorService pool = Executors.newCachedThreadPool();
13     static boolean print = false;
14    
15     public static void main(String[] args) throws Exception {
16     int maxPairs = 8;
17     int iters = 1000000;
18    
19 jsr166 1.5 if (args.length > 0)
20 dl 1.1 maxPairs = Integer.parseInt(args[0]);
21    
22     print = true;
23 jsr166 1.5
24 dl 1.1 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
25     System.out.println("Pairs:" + i);
26     try {
27     oneTest(i, iters);
28     }
29 jsr166 1.7 catch (BrokenBarrierException bb) {
30 dl 1.1 // OK, ignore
31     }
32     Thread.sleep(100);
33     }
34     pool.shutdown();
35 jsr166 1.12 }
36 dl 1.1
37     static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
38     LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
39     CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
40 jsr166 1.13 Future<?>[] prods = new Future<?>[npairs];
41     Future<?>[] cons = new Future<?>[npairs];
42 jsr166 1.5
43 dl 1.1 for (int i = 0; i < npairs; ++i) {
44     prods[i] = pool.submit(new Producer(q, barrier, iters));
45     cons[i] = pool.submit(new Consumer(q, barrier, iters));
46     }
47     barrier.await();
48     Thread.sleep(TIMEOUT);
49     boolean tooLate = false;
50    
51     for (int i = 1; i < npairs; ++i) {
52     if (!prods[i].cancel(true))
53     tooLate = true;
54     if (!cons[i].cancel(true))
55     tooLate = true;
56     }
57    
58     Object p0 = prods[0].get();
59     Object c0 = cons[0].get();
60    
61     if (!tooLate) {
62     for (int i = 1; i < npairs; ++i) {
63     if (!prods[i].isDone() || !prods[i].isCancelled())
64     throw new Error("Only one producer thread should complete");
65     if (!cons[i].isDone() || !cons[i].isCancelled())
66     throw new Error("Only one consumer thread should complete");
67     }
68     }
69     else
70     System.out.print("(cancelled too late) ");
71    
72     long endTime = System.nanoTime();
73     long time = endTime - timer.startTime;
74     if (print) {
75 jsr166 1.8 double secs = (double) time / 1000000000.0;
76 dl 1.1 System.out.println("\t " + secs + "s run time");
77     }
78     }
79    
80     static void oneTest(int pairs, int iters) throws Exception {
81    
82     if (print)
83     System.out.print("ArrayBlockingQueue ");
84     oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
85    
86     if (print)
87     System.out.print("LinkedBlockingQueue ");
88     oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
89    
90 dl 1.6 if (print)
91     System.out.print("LinkedTransferQueue ");
92     oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
93 dl 1.1
94     if (print)
95     System.out.print("SynchronousQueue ");
96     oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
97    
98     if (print)
99     System.out.print("SynchronousQueue(fair) ");
100     oneRun(new SynchronousQueue<Integer>(true), pairs, iters / 8);
101    
102 dl 1.3 /* Can legitimately run out of memory before cancellation
103 dl 1.1 if (print)
104     System.out.print("PriorityBlockingQueue ");
105     oneRun(new PriorityBlockingQueue<Integer>(ITERS / 2 * pairs), pairs, iters / 4);
106 dl 1.3 */
107 dl 1.1 }
108 jsr166 1.5
109 jsr166 1.10 abstract static class Stage implements Callable {
110 dl 1.1 final BlockingQueue<Integer> queue;
111     final CyclicBarrier barrier;
112     final int iters;
113 jsr166 1.9 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
114 jsr166 1.5 queue = q;
115 dl 1.1 barrier = b;
116     this.iters = iters;
117     }
118     }
119    
120     static class Producer extends Stage {
121     Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
122     super(q, b, iters);
123     }
124    
125     public Object call() throws Exception {
126     barrier.await();
127     int s = 0;
128     int l = 4321;
129     for (int i = 0; i < iters; ++i) {
130     l = LoopHelpers.compute1(l);
131     s += LoopHelpers.compute2(l);
132     if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
133     break;
134     }
135     return new Integer(s);
136     }
137     }
138    
139     static class Consumer extends Stage {
140 jsr166 1.5 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
141 dl 1.1 super(q, b, iters);
142     }
143    
144     public Object call() throws Exception {
145     barrier.await();
146     int l = 0;
147     int s = 0;
148     for (int i = 0; i < iters; ++i) {
149     Integer x = queue.poll(1, TimeUnit.SECONDS);
150     if (x == null)
151     break;
152     l = LoopHelpers.compute1(x.intValue());
153     s += l;
154     }
155     return new Integer(s);
156     }
157     }
158     }