--- jsr166/src/test/loops/ConcurrentQueueLoops.java 2005/10/04 20:09:41 1.2 +++ jsr166/src/test/loops/ConcurrentQueueLoops.java 2009/11/02 23:51:32 1.9 @@ -1,14 +1,7 @@ /* - * @test %I% %E% - * @bug 4486658 - * @compile -source 1.5 ConcurrentQueueLoops.java - * @run main/timeout=230 ConcurrentQueueLoops - * @summary Checks that a set of threads can repeatedly get and modify items - */ -/* * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain. Use, modify, and - * redistribute this code in any way without acknowledgement. + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain */ import java.util.*; @@ -19,27 +12,28 @@ import java.util.concurrent.atomic.*; public class ConcurrentQueueLoops { static final ExecutorService pool = Executors.newCachedThreadPool(); static boolean print = false; - static final Integer even = new Integer(42); - static final Integer odd = new Integer(17); + static final Integer zero = new Integer(0); + static final Integer one = new Integer(1); static int workMask; static final long RUN_TIME_NANOS = 5 * 1000L * 1000L * 1000L; + static final int BATCH_SIZE = 8; public static void main(String[] args) throws Exception { - int maxStages = 48; - int work = 32; + int maxStages = 100; + int work = 1024; Class klass = null; if (args.length > 0) { try { klass = Class.forName(args[0]); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new RuntimeException("Class " + args[0] + " not found."); } } - if (args.length > 1) + if (args.length > 1) maxStages = Integer.parseInt(args[1]); - if (args.length > 2) + if (args.length > 2) work = Integer.parseInt(args[2]); workMask = work - 1; @@ -49,7 +43,8 @@ public class ConcurrentQueueLoops { print = false; System.out.println("Warmup..."); - oneRun(klass, 4); + // oneRun(klass, 4); + // Thread.sleep(100); oneRun(klass, 1); Thread.sleep(100); @@ -61,8 +56,8 @@ public class ConcurrentQueueLoops { if (i == k) { k = i << 1; i = i + (i >>> 1); - } - else + } + else i = k; } pool.shutdown(); @@ -71,27 +66,19 @@ public class ConcurrentQueueLoops { static final class Stage implements Callable { final Queue queue; final CyclicBarrier barrier; - Stage (Queue q, CyclicBarrier b) { - queue = q; + final int nthreads; + Stage (Queue q, CyclicBarrier b, int nthreads) { + queue = q; barrier = b; - } - - static int compute127(int l) { - l = LoopHelpers.compute1(l); - l = LoopHelpers.compute2(l); - l = LoopHelpers.compute3(l); - l = LoopHelpers.compute4(l); - l = LoopHelpers.compute5(l); - l = LoopHelpers.compute6(l); - return l; + this.nthreads = nthreads; } static int compute(int l) { - if (l == 0) + if (l == 0) return (int)System.nanoTime(); - int nn = l & workMask; - while (nn-- > 0) - l = compute127(l); + int nn = (l >>> 7) & workMask; + while (nn-- > 0) + l = LoopHelpers.compute6(l); return l; } @@ -103,23 +90,27 @@ public class ConcurrentQueueLoops { int l = (int)now; int takes = 0; int misses = 0; + int lmask = 1; for (;;) { l = compute(l); Integer item = queue.poll(); if (item != null) { - l += item.intValue(); ++takes; + if (item == one) + l = LoopHelpers.compute6(l); } else if ((misses++ & 255) == 0 && System.nanoTime() >= stopTime) { - break; + return Integer.valueOf(takes); } else { - Integer a = ((l++ & 4)== 0)? even : odd; - queue.add(a); + for (int i = 0; i < BATCH_SIZE; ++i) { + queue.offer(((l & lmask)== 0) ? zero : one); + if ((lmask <<= 1) == 0) lmask = 1; + if (i != 0) l = compute(l); + } } } - return new Integer(takes); } - catch (Exception ie) { + catch (Exception ie) { ie.printStackTrace(); throw new Error("Call loop failed"); } @@ -127,12 +118,12 @@ public class ConcurrentQueueLoops { } static void oneRun(Class klass, int n) throws Exception { - Queue q = (Queue)klass.newInstance(); + Queue q = (Queue) klass.newInstance(); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); CyclicBarrier barrier = new CyclicBarrier(n + 1, timer); ArrayList> results = new ArrayList>(n); - for (int i = 0; i < n; ++i) - results.add(pool.submit(new Stage(q, barrier))); + for (int i = 0; i < n; ++i) + results.add(pool.submit(new Stage(q, barrier, n))); if (print) System.out.print("Threads: " + n + "\t:"); @@ -145,10 +136,12 @@ public class ConcurrentQueueLoops { } long endTime = System.nanoTime(); long time = endTime - timer.startTime; - long tpi = time / total; + long ips = 1000000000L * total / time; + + if (print) + System.out.print(LoopHelpers.rightJustify(ips) + " items per sec"); if (print) - System.out.println(LoopHelpers.rightJustify(tpi) + " ns per item"); - + System.out.println(); } }