80 |
|
* // log the completion of this iteration |
81 |
|
* }</pre> |
82 |
|
* |
83 |
< |
* <p>The <tt>CyclicBarrier</tt> uses an all-or-none breakage model |
84 |
< |
* for failed synchronization attempts: If a thread leaves a barrier |
85 |
< |
* point prematurely because of interruption or timeout, all others |
86 |
< |
* will also leave abnormally (via {@link BrokenBarrierException}), |
87 |
< |
* until the barrier is {@link #reset}. This is usually the simplest |
88 |
< |
* and best strategy for sharing knowledge about failures among |
89 |
< |
* cooperating threads in the most common usage contexts of barriers. |
83 |
> |
* <p>The <tt>CyclicBarrier</tt> uses a fast-fail all-or-none breakage |
84 |
> |
* model for failed synchronization attempts: If a thread leaves a |
85 |
> |
* barrier point prematurely because of interruption, failure, or |
86 |
> |
* timeout, all other threads, even those that have not yet resumed |
87 |
> |
* from a previous {@link await}. will also leave abnormally via |
88 |
> |
* {@link BrokenBarrierException} (or <tt>InterruptedException</tt> if |
89 |
> |
* they too were interrupted at about the same time). |
90 |
|
* |
91 |
|
* @since 1.5 |
92 |
|
* @spec JSR-166 |
104 |
|
/** The number of parties */ |
105 |
|
private final int parties; |
106 |
|
/* The command to run when tripped */ |
107 |
< |
private Runnable barrierCommand; |
107 |
> |
private final Runnable barrierCommand; |
108 |
|
|
109 |
|
/** |
110 |
< |
* The generation number. Incremented mod Integer.MAX_VALUE every |
111 |
< |
* time barrier tripped. Starts at 1 to simplify handling of |
112 |
< |
* breakage indicator |
110 |
> |
* The generation number. Incremented upon barrier trip. |
111 |
> |
* Retracted upon reset. |
112 |
|
*/ |
113 |
< |
private int generation = 1; |
113 |
> |
private long generation; |
114 |
|
|
115 |
|
/** |
116 |
< |
* Breakage indicator: last generation of breakage, propagated |
118 |
< |
* across barrier generations until reset. |
116 |
> |
* Breakage indicator. |
117 |
|
*/ |
118 |
< |
private int broken = 0; |
118 |
> |
private boolean broken; |
119 |
|
|
120 |
|
/** |
121 |
|
* Number of parties still waiting. Counts down from parties to 0 |
124 |
|
private int count; |
125 |
|
|
126 |
|
/** |
127 |
< |
* Update state on barrier trip. |
127 |
> |
* Update state on barrier trip and wake up everyone. |
128 |
|
*/ |
129 |
|
private void nextGeneration() { |
130 |
|
count = parties; |
131 |
< |
int g = generation; |
132 |
< |
// avoid generation == 0 |
133 |
< |
if (++generation < 0) generation = 1; |
134 |
< |
// propagate breakage |
135 |
< |
if (broken == g) broken = generation; |
131 |
> |
++generation; |
132 |
> |
trip.signalAll(); |
133 |
> |
} |
134 |
> |
|
135 |
> |
/** |
136 |
> |
* Set barrier as broken and wake up everyone |
137 |
> |
*/ |
138 |
> |
private void breakBarrier() { |
139 |
> |
broken = true; |
140 |
> |
trip.signalAll(); |
141 |
|
} |
142 |
|
|
143 |
|
/** |
148 |
|
lock.lock(); |
149 |
|
try { |
150 |
|
int index = --count; |
151 |
< |
int g = generation; |
151 |
> |
long g = generation; |
152 |
|
|
153 |
< |
if (broken == g) |
153 |
> |
if (broken) |
154 |
|
throw new BrokenBarrierException(); |
155 |
|
|
156 |
|
if (Thread.interrupted()) { |
157 |
< |
broken = g; |
155 |
< |
trip.signalAll(); |
157 |
> |
breakBarrier(); |
158 |
|
throw new InterruptedException(); |
159 |
|
} |
160 |
|
|
161 |
|
if (index == 0) { // tripped |
162 |
|
nextGeneration(); |
163 |
< |
trip.signalAll(); |
163 |
> |
boolean ranAction = false; |
164 |
|
try { |
165 |
|
if (barrierCommand != null) |
166 |
|
barrierCommand.run(); |
167 |
+ |
ranAction = true; |
168 |
|
return 0; |
169 |
< |
} catch (RuntimeException ex) { |
170 |
< |
broken = generation; // next generation is broken |
171 |
< |
throw ex; |
169 |
< |
} |
170 |
< |
catch (Error ex) { |
171 |
< |
broken = generation; // next generation is broken |
172 |
< |
throw ex; |
169 |
> |
} finally { |
170 |
> |
if (!ranAction) |
171 |
> |
breakBarrier(); |
172 |
|
} |
173 |
|
} |
174 |
|
|
175 |
< |
while (generation == g) { |
175 |
> |
for (;;) { |
176 |
|
try { |
177 |
|
if (!timed) |
178 |
|
trip.await(); |
179 |
|
else if (nanos > 0) |
180 |
|
nanos = trip.awaitNanos(nanos); |
181 |
< |
} catch (InterruptedException ex) { |
182 |
< |
// Only claim that broken if interrupted before reset |
183 |
< |
if (generation == g) { |
185 |
< |
broken = g; |
186 |
< |
trip.signalAll(); |
187 |
< |
throw ex; |
188 |
< |
} else { |
189 |
< |
Thread.currentThread().interrupt(); // propagate |
190 |
< |
break; |
191 |
< |
} |
181 |
> |
} catch (InterruptedException ie) { |
182 |
> |
breakBarrier(); |
183 |
> |
throw ie; |
184 |
|
} |
185 |
|
|
186 |
+ |
if (broken || |
187 |
+ |
g > generation) // true if a reset occurred while waiting |
188 |
+ |
throw new BrokenBarrierException(); |
189 |
+ |
|
190 |
+ |
if (g < generation) |
191 |
+ |
return index; |
192 |
+ |
|
193 |
|
if (timed && nanos <= 0) { |
194 |
< |
broken = g; |
196 |
< |
trip.signalAll(); |
194 |
> |
breakBarrier(); |
195 |
|
throw new TimeoutException(); |
196 |
|
} |
199 |
– |
|
200 |
– |
if (broken == g) |
201 |
– |
throw new BrokenBarrierException(); |
202 |
– |
|
197 |
|
} |
204 |
– |
return index; |
198 |
|
|
199 |
|
} finally { |
200 |
|
lock.unlock(); |
291 |
|
* while waiting |
292 |
|
* @throws BrokenBarrierException if <em>another</em> thread was |
293 |
|
* interrupted while the current thread was waiting, or the barrier was |
294 |
< |
* reset, or the barrier was broken when <tt>await</tt> was called. |
294 |
> |
* reset, or the barrier was broken when <tt>await</tt> was called, |
295 |
> |
* or the barrier action (if present) failed due an exception. |
296 |
|
*/ |
297 |
|
public int await() throws InterruptedException, BrokenBarrierException { |
298 |
|
try { |
354 |
|
* @throws TimeoutException if the specified timeout elapses. |
355 |
|
* @throws BrokenBarrierException if <em>another</em> thread was |
356 |
|
* interrupted while the current thread was waiting, or the barrier was |
357 |
< |
* reset, or the barrier was broken when <tt>await</tt> was called. |
357 |
> |
* reset, or the barrier was broken when <tt>await</tt> was called, |
358 |
> |
* or the barrier action (if present) failed due an exception. |
359 |
|
*/ |
360 |
|
public int await(long timeout, TimeUnit unit) |
361 |
|
throws InterruptedException, |
373 |
|
public boolean isBroken() { |
374 |
|
lock.lock(); |
375 |
|
try { |
376 |
< |
return broken >= generation; |
376 |
> |
return broken; |
377 |
|
} finally { |
378 |
|
lock.unlock(); |
379 |
|
} |
383 |
|
* Reset the barrier to its initial state. If any parties are |
384 |
|
* currently waiting at the barrier, they will return with a |
385 |
|
* {@link BrokenBarrierException}. Note that resets <em>after</em> |
386 |
< |
* a breakage can be complicated to carry out; threads need to |
387 |
< |
* re-synchronize in some other way, and choose one to perform the |
388 |
< |
* reset. It may be preferable to instead create a new barrier |
389 |
< |
* for subsequent use. |
386 |
> |
* a breakage has occurred for other reasons can be complicated to |
387 |
> |
* carry out; threads need to re-synchronize in some other way, |
388 |
> |
* and choose one to perform the reset. It may be preferable to |
389 |
> |
* instead create a new barrier for subsequent use. |
390 |
|
*/ |
391 |
|
public void reset() { |
392 |
|
lock.lock(); |
393 |
|
try { |
394 |
< |
int g = generation; |
395 |
< |
nextGeneration(); |
396 |
< |
broken = g; // cause brokenness setting to stop at previous gen. |
394 |
> |
/* |
395 |
> |
* Retract generation number enough to cover threads |
396 |
> |
* currently waiting on current and still resuming from |
397 |
> |
* previous generation, plus similarly accommodating spans |
398 |
> |
* after the reset. |
399 |
> |
*/ |
400 |
> |
generation -= 4; |
401 |
> |
broken = false; |
402 |
|
trip.signalAll(); |
403 |
|
} finally { |
404 |
|
lock.unlock(); |