--- jsr166/src/test/loops/ConcurrentQueueLoops.java 2005/05/02 19:19:38 1.1 +++ jsr166/src/test/loops/ConcurrentQueueLoops.java 2010/09/01 07:47:27 1.11 @@ -1,115 +1,129 @@ /* - * @test %I% %E% - * @bug 4486658 - * @compile -source 1.5 ConcurrentQueueLoops.java - * @run main/timeout=230 ConcurrentQueueLoops - * @summary Checks that a set of threads can repeatedly get and modify items - */ -/* * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain. Use, modify, and - * redistribute this code in any way without acknowledgement. + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain */ import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; public class ConcurrentQueueLoops { static final ExecutorService pool = Executors.newCachedThreadPool(); - static AtomicInteger totalItems; static boolean print = false; + static final Integer zero = new Integer(0); + static final Integer one = new Integer(1); + static int workMask; + static final long RUN_TIME_NANOS = 5 * 1000L * 1000L * 1000L; + static final int BATCH_SIZE = 8; public static void main(String[] args) throws Exception { - int maxStages = 8; - int items = 100000; - + int maxStages = 100; + int work = 1024; Class klass = null; if (args.length > 0) { try { klass = Class.forName(args[0]); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { throw new RuntimeException("Class " + args[0] + " not found."); } } - else - klass = java.util.concurrent.ConcurrentLinkedQueue.class; - if (args.length > 1) + if (args.length > 1) maxStages = Integer.parseInt(args[1]); + if (args.length > 2) + work = Integer.parseInt(args[2]); + + workMask = work - 1; System.out.print("Class: " + klass.getName()); - System.out.println(" stages: " + maxStages); + System.out.print(" stages: " + maxStages); + System.out.println(" work: " + work); print = false; System.out.println("Warmup..."); - oneRun(klass, 1, items); + // oneRun(klass, 4); + // Thread.sleep(100); - oneRun(klass, 1, items); + oneRun(klass, 1); Thread.sleep(100); print = true; - for (int i = 1; i <= maxStages; i += (i+1) >>> 1) { - oneRun(klass, i, items); + int k = 1; + for (int i = 1; i <= maxStages;) { + oneRun(klass, i); + if (i == k) { + k = i << 1; + i = i + (i >>> 1); + } + else + i = k; } pool.shutdown(); } - static class Stage implements Callable { + static final class Stage implements Callable { final Queue queue; final CyclicBarrier barrier; - int items; - Stage (Queue q, CyclicBarrier b, int items) { - queue = q; + final int nthreads; + Stage(Queue q, CyclicBarrier b, int nthreads) { + queue = q; barrier = b; - this.items = items; + this.nthreads = nthreads; + } + + static int compute(int l) { + if (l == 0) + return (int) System.nanoTime(); + int nn = (l >>> 7) & workMask; + while (nn-- > 0) + l = LoopHelpers.compute6(l); + return l; } public Integer call() { - // Repeatedly take something from queue if possible, - // transform it, and put back in. try { barrier.await(); - int l = (int)System.nanoTime(); + long now = System.nanoTime(); + long stopTime = now + RUN_TIME_NANOS; + int l = (int) now; int takes = 0; - int seq = l; + int misses = 0; + int lmask = 1; for (;;) { + l = compute(l); Integer item = queue.poll(); if (item != null) { ++takes; - l = LoopHelpers.compute2(item.intValue()); - } - else if (takes != 0) { - totalItems.getAndAdd(-takes); - takes = 0; - } - else if (totalItems.get() <= 0) - break; - l = LoopHelpers.compute1(l); - if (items > 0) { - --items; - while (!queue.offer(new Integer(l^seq++))) ; + if (item == one) + l = LoopHelpers.compute6(l); + } else if ((misses++ & 255) == 0 && + System.nanoTime() >= stopTime) { + return Integer.valueOf(takes); + } else { + for (int i = 0; i < BATCH_SIZE; ++i) { + queue.offer(((l & lmask)== 0) ? zero : one); + if ((lmask <<= 1) == 0) lmask = 1; + if (i != 0) l = compute(l); + } } - else if ( (l & (3 << 5)) == 0) // spinwait - Thread.sleep(1); } - return new Integer(l); } - catch (Exception ie) { + catch (Exception ie) { ie.printStackTrace(); throw new Error("Call loop failed"); } } } - static void oneRun(Class klass, int n, int items) throws Exception { - Queue q = (Queue)klass.newInstance(); + static void oneRun(Class klass, int n) throws Exception { + Queue q = (Queue) klass.newInstance(); LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); CyclicBarrier barrier = new CyclicBarrier(n + 1, timer); - totalItems = new AtomicInteger(n * items); ArrayList> results = new ArrayList>(n); - for (int i = 0; i < n; ++i) - results.add(pool.submit(new Stage(q, barrier, items))); + for (int i = 0; i < n; ++i) + results.add(pool.submit(new Stage(q, barrier, n))); if (print) System.out.print("Threads: " + n + "\t:"); @@ -122,10 +136,12 @@ public class ConcurrentQueueLoops { } long endTime = System.nanoTime(); long time = endTime - timer.startTime; + long ips = 1000000000L * total / time; + + if (print) + System.out.print(LoopHelpers.rightJustify(ips) + " items per sec"); if (print) - System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item"); - if (total == 0) // avoid overoptimization - System.out.println("useless result: " + total); - + System.out.println(); } + }