ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/CachedThreadPoolLoops.java
Revision: 1.10
Committed: Sat Dec 31 21:34:47 2016 UTC (7 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.9: +4 -6 lines
Log Message:
better error handling

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 //import jsr166y.*;
8 import java.util.concurrent.*;
9 import java.util.concurrent.atomic.*;
10
11 public class CachedThreadPoolLoops {
12 static final int NCPUS = Runtime.getRuntime().availableProcessors();
13 static final AtomicInteger remaining = new AtomicInteger();
14 static final int maxIters = 1000000;
15
16 public static void main(String[] args) throws Exception {
17 int maxThreads = NCPUS * 3 / 2; // 100;
18 if (args.length > 0)
19 maxThreads = Integer.parseInt(args[0]);
20
21 System.out.print("Warmup:");
22 for (int j = 0; j < 1; ++j) {
23 for (int k = 1, i = 1; i <= maxThreads;) {
24 System.out.print(" " + i);
25 oneTest(i, 10000, false);
26 Thread.sleep(100);
27 if (i == k) {
28 k = i << 1;
29 i = i + (i >>> 1);
30 }
31 else
32 i = k;
33 }
34 }
35 System.out.println();
36
37 for (int k = 1, i = 1; i <= maxThreads;) {
38 System.out.println("Threads:" + i);
39 oneTest(i, maxIters, true);
40 Thread.sleep(100);
41 if (i == k) {
42 k = i << 1;
43 i = i + (i >>> 1);
44 }
45 else
46 i = k;
47 }
48 }
49
50 static void oneTest(int nThreads, int iters, boolean print) throws Exception {
51 Thread.sleep(100); // System.gc();
52 if (print) System.out.print("LinkedTransferQueue ");
53 oneRun(new LinkedTransferQueue<Runnable>(), nThreads, iters, print);
54
55 Thread.sleep(100); // System.gc();
56 if (print) System.out.print("LinkedBlockingQueue ");
57 oneRun(new LinkedBlockingQueue<Runnable>(), nThreads, iters, print);
58
59 Thread.sleep(100); // System.gc();
60 if (print) System.out.print("SynchronousQueue ");
61 oneRun(new SynchronousQueue<Runnable>(false), nThreads, iters, print);
62
63 Thread.sleep(100); // System.gc();
64 if (print) System.out.print("SynchronousQueue(fair) ");
65 oneRun(new SynchronousQueue<Runnable>(true), nThreads, iters, print);
66
67 Thread.sleep(100); // System.gc();
68 if (print) System.out.print("LinkedTransferQueue(xfer)");
69 oneRun(new LTQasSQ<Runnable>(), nThreads, iters, print);
70
71 Thread.sleep(100); // System.gc();
72 if (print) System.out.print("LinkedTransferQueue(half)");
73 oneRun(new HalfSyncLTQ<Runnable>(), nThreads, iters, print);
74
75 Thread.sleep(100); // System.gc();
76 if (print) System.out.print("ArrayBlockingQueue(256) ");
77 oneRun(new ArrayBlockingQueue<Runnable>(256), nThreads, iters, print);
78 }
79
80 static final class Task implements Runnable {
81 final ThreadPoolExecutor pool;
82 final CountDownLatch done;
83 Task(ThreadPoolExecutor p, CountDownLatch d) {
84 pool = p;
85 done = d;
86 }
87 public void run() {
88 done.countDown();
89 remaining.incrementAndGet();
90 int n;
91 while (!Thread.interrupted() &&
92 (n = remaining.get()) > 0 &&
93 done.getCount() > 0) {
94 if (remaining.compareAndSet(n, n-1)) {
95 try {
96 pool.execute(this);
97 }
98 catch (RuntimeException ex) {
99 System.out.print("*");
100 while (done.getCount() > 0) done.countDown();
101 return;
102 }
103 }
104 }
105 }
106 }
107
108 static void oneRun(BlockingQueue<Runnable> q, int nThreads, int iters, boolean print) throws Exception {
109
110 ThreadPoolExecutor pool =
111 new ThreadPoolExecutor(nThreads+1, Integer.MAX_VALUE,
112 1L, TimeUnit.SECONDS, q);
113
114 CountDownLatch done = new CountDownLatch(iters);
115 remaining.set(nThreads-1);
116 pool.prestartAllCoreThreads();
117 Task t = new Task(pool, done);
118 long start = System.nanoTime();
119 pool.execute(t);
120 done.await();
121 long time = System.nanoTime() - start;
122 if (print)
123 System.out.println("\t: " + LoopHelpers.rightJustify(time / iters) + " ns per task");
124 q.clear();
125 Thread.sleep(100);
126 pool.shutdown();
127 Thread.sleep(100);
128 pool.shutdownNow();
129 }
130
131 static final class LTQasSQ<T> extends LinkedTransferQueue<T> {
132 LTQasSQ() { super(); }
133 public void put(T x) {
134 try { super.transfer(x); }
135 catch (InterruptedException ex) { throw new Error(ex); }
136 }
137 }
138
139 static final class HalfSyncLTQ<T> extends LinkedTransferQueue<T> {
140 int calls;
141 HalfSyncLTQ() { super(); }
142 public void put(T x) {
143 if ((++calls & 1) == 0)
144 super.put(x);
145 else {
146 try { super.transfer(x); }
147 catch (InterruptedException ex) { throw new Error(ex); }
148 }
149 }
150 }
151
152 }