--- jsr166/src/test/loops/CachedThreadPoolLoops.java 2007/02/19 00:46:06 1.3 +++ jsr166/src/test/loops/CachedThreadPoolLoops.java 2015/08/10 03:13:33 1.9 @@ -1,69 +1,87 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ +//import jsr166y.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; public class CachedThreadPoolLoops { + static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final AtomicInteger remaining = new AtomicInteger(); static final int maxIters = 1000000; public static void main(String[] args) throws Exception { - int maxThreads = 100; - - if (args.length > 0) + int maxThreads = NCPUS * 3 / 2; // 100; + if (args.length > 0) maxThreads = Integer.parseInt(args[0]); System.out.print("Warmup:"); - for (int j = 0; j < 2; ++j) { - int k = 1; - for (int i = 1; i <= maxThreads;) { + for (int j = 0; j < 1; ++j) { + for (int k = 1, i = 1; i <= maxThreads;) { System.out.print(" " + i); oneTest(i, 10000, false); Thread.sleep(100); if (i == k) { k = i << 1; i = i + (i >>> 1); - } - else + } + else i = k; } } System.out.println(); - int k = 1; - for (int i = 1; i <= maxThreads;) { + for (int k = 1, i = 1; i <= maxThreads;) { System.out.println("Threads:" + i); oneTest(i, maxIters, true); Thread.sleep(100); if (i == k) { k = i << 1; i = i + (i >>> 1); - } - else + } + else i = k; } - } + } static void oneTest(int nThreads, int iters, boolean print) throws Exception { - // if (print) System.out.print("LinkedBlockingQueue "); - // oneRun(new LinkedBlockingQueue(256), nThreads, iters, print); - // if (print) System.out.print("ArrayBlockingQueue "); - // oneRun(new ArrayBlockingQueue(256), nThreads, iters, print); - if (print) System.out.print("SynchronousQueue "); + Thread.sleep(100); // System.gc(); + if (print) System.out.print("LinkedTransferQueue "); + oneRun(new LinkedTransferQueue(), nThreads, iters, print); + + Thread.sleep(100); // System.gc(); + if (print) System.out.print("LinkedBlockingQueue "); + oneRun(new LinkedBlockingQueue(), nThreads, iters, print); + + Thread.sleep(100); // System.gc(); + if (print) System.out.print("SynchronousQueue "); oneRun(new SynchronousQueue(false), nThreads, iters, print); - if (print) System.out.print("SynchronousQueue(fair) "); + + Thread.sleep(100); // System.gc(); + if (print) System.out.print("SynchronousQueue(fair) "); oneRun(new SynchronousQueue(true), nThreads, iters, print); + + Thread.sleep(100); // System.gc(); + if (print) System.out.print("LinkedTransferQueue(xfer)"); + oneRun(new LTQasSQ(), nThreads, iters, print); + + Thread.sleep(100); // System.gc(); + if (print) System.out.print("LinkedTransferQueue(half)"); + oneRun(new HalfSyncLTQ(), nThreads, iters, print); + + Thread.sleep(100); // System.gc(); + if (print) System.out.print("ArrayBlockingQueue(256) "); + oneRun(new ArrayBlockingQueue(256), nThreads, iters, print); } static final class Task implements Runnable { final ThreadPoolExecutor pool; final CountDownLatch done; - Task(ThreadPoolExecutor p, CountDownLatch d) { - pool = p; + Task(ThreadPoolExecutor p, CountDownLatch d) { + pool = p; done = d; } public void run() { @@ -71,7 +89,7 @@ public class CachedThreadPoolLoops { remaining.incrementAndGet(); int n; while (!Thread.interrupted() && - (n = remaining.get()) > 0 && + (n = remaining.get()) > 0 && done.getCount() > 0) { if (remaining.compareAndSet(n, n-1)) { try { @@ -86,10 +104,10 @@ public class CachedThreadPoolLoops { } } } - + static void oneRun(BlockingQueue q, int nThreads, int iters, boolean print) throws Exception { - - ThreadPoolExecutor pool = + + ThreadPoolExecutor pool = new ThreadPoolExecutor(nThreads+1, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, q); @@ -110,4 +128,27 @@ public class CachedThreadPoolLoops { pool.shutdownNow(); } + static final class LTQasSQ extends LinkedTransferQueue { + LTQasSQ() { super(); } + public void put(T x) { + try { super.transfer(x); + } catch (InterruptedException ex) { throw new Error(); } + } + } + + static final class HalfSyncLTQ extends LinkedTransferQueue { + int calls; + HalfSyncLTQ() { super(); } + public void put(T x) { + if ((++calls & 1) == 0) + super.put(x); + else { + try { super.transfer(x); + } catch (InterruptedException ex) { + throw new Error(); + } + } + } + } + }