ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CancelledProducerConsumerLoops.java
Revision: 1.17
Committed: Sat Dec 31 19:16:42 2016 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.16: +13 -1 lines
Log Message:
organize imports

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 import java.util.concurrent.BrokenBarrierException;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.CyclicBarrier;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.ArrayBlockingQueue;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.LinkedTransferQueue;
17 import java.util.concurrent.SynchronousQueue;
18 import java.util.concurrent.TimeUnit;
19
20 public class CancelledProducerConsumerLoops {
21 static final int CAPACITY = 100;
22 static final long TIMEOUT = 100;
23
24 static final ExecutorService pool = Executors.newCachedThreadPool();
25 static boolean print = false;
26
27 public static void main(String[] args) throws Exception {
28 int maxPairs = 8;
29 int iters = 1000000;
30
31 if (args.length > 0)
32 maxPairs = Integer.parseInt(args[0]);
33
34 print = true;
35
36 for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
37 System.out.println("Pairs:" + i);
38 try {
39 oneTest(i, iters);
40 }
41 catch (BrokenBarrierException bb) {
42 // OK, ignore
43 }
44 Thread.sleep(100);
45 }
46 pool.shutdown();
47 }
48
49 static void oneRun(BlockingQueue<Integer> q, int npairs, int iters) throws Exception {
50 LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
51 CyclicBarrier barrier = new CyclicBarrier(npairs * 2 + 1, timer);
52 Future<?>[] prods = new Future<?>[npairs];
53 Future<?>[] cons = new Future<?>[npairs];
54
55 for (int i = 0; i < npairs; ++i) {
56 prods[i] = pool.submit(new Producer(q, barrier, iters));
57 cons[i] = pool.submit(new Consumer(q, barrier, iters));
58 }
59 barrier.await();
60 Thread.sleep(TIMEOUT);
61 boolean tooLate = false;
62
63 for (int i = 1; i < npairs; ++i) {
64 if (!prods[i].cancel(true))
65 tooLate = true;
66 if (!cons[i].cancel(true))
67 tooLate = true;
68 }
69
70 Object p0 = prods[0].get();
71 Object c0 = cons[0].get();
72
73 if (!tooLate) {
74 for (int i = 1; i < npairs; ++i) {
75 if (!prods[i].isDone() || !prods[i].isCancelled())
76 throw new Error("Only one producer thread should complete");
77 if (!cons[i].isDone() || !cons[i].isCancelled())
78 throw new Error("Only one consumer thread should complete");
79 }
80 }
81 else
82 System.out.print("(cancelled too late) ");
83
84 long endTime = System.nanoTime();
85 long time = endTime - timer.startTime;
86 if (print) {
87 double secs = (double) time / 1000000000.0;
88 System.out.println("\t " + secs + "s run time");
89 }
90 }
91
92 static void oneTest(int pairs, int iters) throws Exception {
93
94 if (print)
95 System.out.print("ArrayBlockingQueue ");
96 oneRun(new ArrayBlockingQueue<Integer>(CAPACITY), pairs, iters);
97
98 if (print)
99 System.out.print("LinkedBlockingQueue ");
100 oneRun(new LinkedBlockingQueue<Integer>(CAPACITY), pairs, iters);
101
102 if (print)
103 System.out.print("LinkedTransferQueue ");
104 oneRun(new LinkedTransferQueue<Integer>(), pairs, iters);
105
106 if (print)
107 System.out.print("SynchronousQueue ");
108 oneRun(new SynchronousQueue<Integer>(), pairs, iters / 8);
109
110 if (print)
111 System.out.print("SynchronousQueue(fair) ");
112 oneRun(new SynchronousQueue<Integer>(true), pairs, iters / 8);
113
114 /* Can legitimately run out of memory before cancellation
115 if (print)
116 System.out.print("PriorityBlockingQueue ");
117 oneRun(new PriorityBlockingQueue<Integer>(ITERS / 2 * pairs), pairs, iters / 4);
118 */
119 }
120
121 abstract static class Stage implements Callable {
122 final BlockingQueue<Integer> queue;
123 final CyclicBarrier barrier;
124 final int iters;
125 Stage(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
126 queue = q;
127 barrier = b;
128 this.iters = iters;
129 }
130 }
131
132 static class Producer extends Stage {
133 Producer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
134 super(q, b, iters);
135 }
136
137 public Object call() throws Exception {
138 barrier.await();
139 int s = 0;
140 int l = 4321;
141 for (int i = 0; i < iters; ++i) {
142 l = LoopHelpers.compute1(l);
143 s += LoopHelpers.compute2(l);
144 if (!queue.offer(new Integer(l), 1, TimeUnit.SECONDS))
145 break;
146 }
147 return new Integer(s);
148 }
149 }
150
151 static class Consumer extends Stage {
152 Consumer(BlockingQueue<Integer> q, CyclicBarrier b, int iters) {
153 super(q, b, iters);
154 }
155
156 public Object call() throws Exception {
157 barrier.await();
158 int l = 0;
159 int s = 0;
160 for (int i = 0; i < iters; ++i) {
161 Integer x = queue.poll(1, TimeUnit.SECONDS);
162 if (x == null)
163 break;
164 l = LoopHelpers.compute1(x.intValue());
165 s += l;
166 }
167 return new Integer(s);
168 }
169 }
170 }