15 |
|
|
16 |
|
public class RLIBar { |
17 |
|
|
18 |
< |
static int batchLimit ; |
19 |
< |
static int mseq ; |
20 |
< |
static int nReady ; |
21 |
< |
static int ExThreads ; |
22 |
< |
static int ASum ; |
23 |
< |
static final ReentrantLock Gate = new ReentrantLock () ; |
24 |
< |
static final Condition GateCond = Gate.newCondition () ; |
25 |
< |
|
26 |
< |
static final ReentrantLock HoldQ = new ReentrantLock () ; |
27 |
< |
static final Condition HoldQCond = HoldQ.newCondition() ; |
28 |
< |
static boolean Hold = false ; |
29 |
< |
static int HoldPop ; |
30 |
< |
static int HoldLimit ; |
18 |
> |
static int batchLimit; |
19 |
> |
static int mseq; |
20 |
> |
static int nReady; |
21 |
> |
static int ExThreads; |
22 |
> |
static int ASum; |
23 |
> |
static final ReentrantLock Gate = new ReentrantLock(); |
24 |
> |
static final Condition GateCond = Gate.newCondition(); |
25 |
> |
|
26 |
> |
static final ReentrantLock HoldQ = new ReentrantLock(); |
27 |
> |
static final Condition HoldQCond = HoldQ.newCondition(); |
28 |
> |
static boolean Hold = false; |
29 |
> |
static int HoldPop; |
30 |
> |
static int HoldLimit; |
31 |
|
|
32 |
< |
static private boolean HoldCheck () { |
32 |
> |
private static boolean HoldCheck() { |
33 |
|
try { |
34 |
|
HoldQ.lock(); |
35 |
|
try { |
36 |
|
if (!Hold) return false; |
37 |
|
else { |
38 |
< |
++HoldPop ; |
38 |
> |
++HoldPop; |
39 |
|
if (HoldPop >= HoldLimit) { |
40 |
< |
System.out.print ("Holding ") ; |
41 |
< |
Thread.sleep (1000) ; |
42 |
< |
System.out.println () ; |
43 |
< |
Hold = false ; |
44 |
< |
HoldQCond.signalAll () ; |
40 |
> |
System.out.print("Holding "); |
41 |
> |
Thread.sleep(1000); |
42 |
> |
System.out.println(); |
43 |
> |
Hold = false; |
44 |
> |
HoldQCond.signalAll(); |
45 |
|
} |
46 |
|
else |
47 |
|
while (Hold) |
48 |
< |
HoldQCond.await() ; |
48 |
> |
HoldQCond.await(); |
49 |
|
|
50 |
< |
if (--HoldPop == 0) HoldQCond.signalAll () ; |
50 |
> |
if (--HoldPop == 0) HoldQCond.signalAll(); |
51 |
|
return true; |
52 |
|
} |
53 |
|
} |
55 |
|
HoldQ.unlock(); |
56 |
|
} |
57 |
|
} catch (Exception Ex) { |
58 |
< |
System.out.println ("Unexpected exception in Hold: " + Ex) ; |
58 |
> |
System.out.println("Unexpected exception in Hold: " + Ex); |
59 |
|
return false; |
60 |
|
} |
61 |
|
} |
65 |
|
final ReentrantLock thisLock = new ReentrantLock(); |
66 |
|
final Condition thisCond = thisLock.newCondition(); |
67 |
|
|
68 |
< |
Server (int nClients) { |
68 |
> |
Server(int nClients) { |
69 |
|
this.nClients = nClients; |
70 |
|
try { |
71 |
|
for (int i = 0; i < nClients; ++i) { |
72 |
< |
final int fix = i ; |
73 |
< |
new Thread() { public void run () { runServer(fix); }}.start(); |
72 |
> |
final int fix = i; |
73 |
> |
new Thread() { public void run() { runServer(fix); }}.start(); |
74 |
|
} |
75 |
|
} catch (Exception e) { |
76 |
< |
System.err.println(e) ; |
76 |
> |
System.err.println(e); |
77 |
|
} |
78 |
|
} |
79 |
|
|
88 |
|
// was incremented |
89 |
|
private int currentBatchSize = 0; |
90 |
|
|
91 |
< |
private void runServer (int id) { |
92 |
< |
int msg ; |
91 |
> |
private void runServer(int id) { |
92 |
> |
int msg; |
93 |
|
boolean held = false; |
94 |
|
final ReentrantLock thisLock = this.thisLock; |
95 |
|
final Condition thisCond = this.thisCond; |
101 |
|
// proper provisioning on T1. |
102 |
|
// Alternately, use THR_BOUND threads |
103 |
|
Gate.lock(); try { |
104 |
< |
++nReady ; |
105 |
< |
if (nReady == ExThreads ) { |
106 |
< |
GateCond.signalAll () ; |
104 |
> |
++nReady; |
105 |
> |
if (nReady == ExThreads) { |
106 |
> |
GateCond.signalAll(); |
107 |
|
} |
108 |
< |
while (nReady != ExThreads ) |
109 |
< |
GateCond.await() ; |
108 |
> |
while (nReady != ExThreads) |
109 |
> |
GateCond.await(); |
110 |
|
} finally { Gate.unlock(); } |
111 |
|
|
112 |
|
for (;;) { |
113 |
< |
// if (!held && currentBatchSize == 0) held = HoldCheck () ; |
114 |
< |
msg = (++ mseq) ^ id ; |
113 |
> |
// if (!held && currentBatchSize == 0) held = HoldCheck (); |
114 |
> |
msg = (++ mseq) ^ id; |
115 |
|
thisLock.lock(); |
116 |
|
try { |
117 |
< |
ASum += msg ; |
117 |
> |
ASum += msg; |
118 |
|
++msgsReceived; |
119 |
|
int myBatch = currentBatch; |
120 |
|
if (++currentBatchSize >= batchLimit) { |
122 |
|
++currentBatch; |
123 |
|
currentBatchSize = 0; |
124 |
|
// and wake up everyone in this one |
125 |
< |
thisCond.signalAll () ; |
125 |
> |
thisCond.signalAll(); |
126 |
|
} |
127 |
|
// Wait until our batch is complete |
128 |
|
while (myBatch == currentBatch) |
133 |
|
} |
134 |
|
} |
135 |
|
} catch (Exception e) { |
136 |
< |
System.err.println("Server thread: exception " + e) ; |
136 |
> |
System.err.println("Server thread: exception " + e); |
137 |
|
e.printStackTrace(); |
138 |
|
} |
139 |
|
} |
141 |
|
|
142 |
|
} |
143 |
|
|
144 |
< |
public static void main (String[] args) throws Exception { |
145 |
< |
int nServers = 10 ; |
146 |
< |
int nClients = 10 ; |
144 |
> |
public static void main(String[] args) throws Exception { |
145 |
> |
int nServers = 10; |
146 |
> |
int nClients = 10; |
147 |
|
int samplePeriod = 10000; |
148 |
|
int nSamples = 5; |
149 |
|
|
161 |
|
else if (arg.equals("-np")) |
162 |
|
nSamples = Integer.parseInt(args[nextArg++]); |
163 |
|
else { |
164 |
< |
System.err.println ("Argument error:" + arg) ; |
165 |
< |
System.exit (1) ; |
164 |
> |
System.err.println("Argument error:" + arg); |
165 |
> |
System.exit(1); |
166 |
|
} |
167 |
|
} |
168 |
|
if (nClients <= 0 || nServers <= 0 || samplePeriod <= 0 || batchLimit > nClients) { |
169 |
< |
System.err.println ("Argument error") ; |
170 |
< |
System.exit (1) ; |
169 |
> |
System.err.println("Argument error"); |
170 |
> |
System.exit(1); |
171 |
|
} |
172 |
|
|
173 |
|
// default batch size is 2/3 the number of clients |
175 |
|
if (false && batchLimit <= 0) |
176 |
|
batchLimit = (2 * nClients + 1) / 3; |
177 |
|
|
178 |
< |
ExThreads = nServers * nClients ; // expected # of threads |
179 |
< |
HoldLimit = ExThreads ; |
178 |
> |
ExThreads = nServers * nClients; // expected # of threads |
179 |
> |
HoldLimit = ExThreads; |
180 |
|
|
181 |
|
// start up all threads |
182 |
|
Server[] servers = new Server[nServers]; |
187 |
|
// Wait for consensus |
188 |
|
try { |
189 |
|
Gate.lock(); try { |
190 |
< |
while (nReady != ExThreads ) GateCond.await() ; |
190 |
> |
while (nReady != ExThreads ) GateCond.await(); |
191 |
|
} finally { Gate.unlock(); } |
192 |
|
} catch (Exception ex) { |
193 |
< |
System.out.println (ex); |
193 |
> |
System.out.println(ex); |
194 |
|
} |
195 |
< |
System.out.println ( |
196 |
< |
nReady + " Ready: nc=" + nClients + " ns=" + nServers + " batch=" + batchLimit) ; |
195 |
> |
System.out.println( |
196 |
> |
nReady + " Ready: nc=" + nClients + " ns=" + nServers + " batch=" + batchLimit); |
197 |
|
|
198 |
|
// Start sampling ... |
199 |
|
// Methodological problem: all the mutator threads |
219 |
|
} |
220 |
|
|
221 |
|
if (false && j == 2) { |
222 |
< |
System.out.print ("Hold activated ...") ; |
222 |
> |
System.out.print("Hold activated ..."); |
223 |
|
HoldQ.lock(); |
224 |
|
try { |
225 |
< |
Hold = true ; |
226 |
< |
while (Hold) HoldQCond.await() ; |
225 |
> |
Hold = true; |
226 |
> |
while (Hold) HoldQCond.await(); |
227 |
|
} |
228 |
|
finally { |
229 |
|
HoldQ.unlock(); |