ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/test/loops/ConcurrentQueueLoops.java
(Generate patch)

Comparing jsr166/src/test/loops/ConcurrentQueueLoops.java (file contents):
Revision 1.1 by dl, Mon May 2 19:19:38 2005 UTC vs.
Revision 1.4 by dl, Mon Nov 28 15:40:56 2005 UTC

# Line 13 | Line 13
13  
14   import java.util.*;
15   import java.util.concurrent.*;
16 + import java.util.concurrent.locks.*;
17   import java.util.concurrent.atomic.*;
18  
19   public class ConcurrentQueueLoops {
20      static final ExecutorService pool = Executors.newCachedThreadPool();
20    static AtomicInteger totalItems;
21      static boolean print = false;
22 +    static final Integer zero = new Integer(0);
23 +    static final Integer one = new Integer(1);
24 +    static int workMask;
25 +    static final long RUN_TIME_NANOS = 5 * 1000L * 1000L * 1000L;
26 +    static final int BATCH_SIZE = 8;
27  
28      public static void main(String[] args) throws Exception {
29 <        int maxStages = 8;
30 <        int items = 100000;
26 <
29 >        int maxStages = 100;
30 >        int work = 1024;
31          Class klass = null;
32          if (args.length > 0) {
33              try {
# Line 32 | Line 36 | public class ConcurrentQueueLoops {
36                  throw new RuntimeException("Class " + args[0] + " not found.");
37              }
38          }
35        else
36            klass = java.util.concurrent.ConcurrentLinkedQueue.class;
39  
40          if (args.length > 1)
41              maxStages = Integer.parseInt(args[1]);
42  
43 +        if (args.length > 2)
44 +            work = Integer.parseInt(args[2]);
45 +
46 +        workMask = work - 1;
47          System.out.print("Class: " + klass.getName());
48 <        System.out.println(" stages: " + maxStages);
48 >        System.out.print(" stages: " + maxStages);
49 >        System.out.println(" work: " + work);
50  
51          print = false;
52          System.out.println("Warmup...");
53 <        oneRun(klass, 1, items);
53 >        //        oneRun(klass, 4);
54 >        //
55          Thread.sleep(100);
56 <        oneRun(klass, 1, items);
56 >        oneRun(klass, 1);
57          Thread.sleep(100);
58          print = true;
59  
60 <        for (int i = 1; i <= maxStages; i += (i+1) >>> 1) {
61 <            oneRun(klass, i, items);
60 >        int k = 1;
61 >        for (int i = 1; i <= maxStages;) {
62 >            oneRun(klass, i);
63 >            if (i == k) {
64 >                k = i << 1;
65 >                i = i + (i >>> 1);
66 >            }
67 >            else
68 >                i = k;
69          }
70          pool.shutdown();
71     }
72  
73 <    static class Stage implements Callable<Integer> {
73 >    static final class Stage implements Callable<Integer> {
74          final Queue<Integer> queue;
75          final CyclicBarrier barrier;
76 <        int items;
77 <        Stage (Queue<Integer> q, CyclicBarrier b, int items) {
76 >        final int nthreads;
77 >        Stage (Queue<Integer> q, CyclicBarrier b, int nthreads) {
78              queue = q;
79              barrier = b;
80 <            this.items = items;
80 >            this.nthreads = nthreads;
81 >        }
82 >
83 >        static int compute(int l) {
84 >            if (l == 0)
85 >                return (int)System.nanoTime();
86 >            int nn =  (l >>> 7) & workMask;
87 >            while (nn-- > 0)
88 >                l = LoopHelpers.compute6(l);
89 >            return l;
90          }
91  
92          public Integer call() {
69            // Repeatedly take something from queue if possible,
70            // transform it, and put back in.
93              try {
94                  barrier.await();
95 <                int l = (int)System.nanoTime();
95 >                long now = System.nanoTime();
96 >                long stopTime = now + RUN_TIME_NANOS;
97 >                int l = (int)now;
98                  int takes = 0;
99 <                int seq = l;
99 >                int misses = 0;
100 >                int lmask = 1;
101                  for (;;) {
102 +                    l = compute(l);
103                      Integer item = queue.poll();
104                      if (item != null) {
105                          ++takes;
106 <                        l = LoopHelpers.compute2(item.intValue());
107 <                    }
108 <                    else if (takes != 0) {
109 <                        totalItems.getAndAdd(-takes);
110 <                        takes = 0;
106 >                        if (item == one)
107 >                            l = LoopHelpers.compute6(l);
108 >                    } else if ((misses++ & 255) == 0 &&
109 >                               System.nanoTime() >= stopTime) {
110 >                        return new Integer(takes);
111 >                    } else {
112 >                        for (int i = 0; i < BATCH_SIZE; ++i) {
113 >                            queue.offer(((l & lmask)== 0)? zero : one);
114 >                            if ((lmask <<= 1) == 0) lmask = 1;
115 >                            if (i != 0) l = compute(l);
116 >                        }
117                      }
86                    else if (totalItems.get() <= 0)
87                        break;
88                    l = LoopHelpers.compute1(l);
89                    if (items > 0) {
90                        --items;
91                        while (!queue.offer(new Integer(l^seq++))) ;
92                    }
93                    else if ( (l & (3 << 5)) == 0) // spinwait
94                        Thread.sleep(1);
118                  }
96                return new Integer(l);
119              }
120              catch (Exception ie) {
121                  ie.printStackTrace();
# Line 102 | Line 124 | public class ConcurrentQueueLoops {
124          }
125      }
126  
127 <    static void oneRun(Class klass, int n, int items) throws Exception {
127 >    static void oneRun(Class klass, int n) throws Exception {
128          Queue<Integer> q = (Queue<Integer>)klass.newInstance();
129          LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
130          CyclicBarrier barrier = new CyclicBarrier(n + 1, timer);
109        totalItems = new AtomicInteger(n * items);
131          ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>(n);
132          for (int i = 0; i < n; ++i)
133 <            results.add(pool.submit(new Stage(q, barrier, items)));
133 >            results.add(pool.submit(new Stage(q, barrier, n)));
134  
135          if (print)
136              System.out.print("Threads: " + n + "\t:");
# Line 122 | Line 143 | public class ConcurrentQueueLoops {
143          }
144          long endTime = System.nanoTime();
145          long time = endTime - timer.startTime;
146 <        if (print)
126 <            System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item");
127 <        if (total == 0) // avoid overoptimization
128 <            System.out.println("useless result: " + total);
146 >        long ips = 1000000000L * total / time;
147          
148 +        if (print)
149 +            System.out.print(LoopHelpers.rightJustify(ips) + " items per sec");
150 +        if (print)
151 +            System.out.println();
152      }
153 +
154   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines