ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
Revision: 1.59
Committed: Fri Feb 1 18:26:06 2019 UTC (5 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.58: +8 -9 lines
Log Message:
dowait: use catch + rethrow instead of a boolean + finally

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11
12 /**
13 * A synchronization aid that allows a set of threads to all wait for
14 * each other to reach a common barrier point. CyclicBarriers are
15 * useful in programs involving a fixed sized party of threads that
16 * must occasionally wait for each other. The barrier is called
17 * <em>cyclic</em> because it can be re-used after the waiting threads
18 * are released.
19 *
20 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
21 * that is run once per barrier point, after the last thread in the party
22 * arrives, but before any threads are released.
23 * This <em>barrier action</em> is useful
24 * for updating shared-state before any of the parties continue.
25 *
26 * <p><b>Sample usage:</b> Here is an example of using a barrier in a
27 * parallel decomposition design:
28 *
29 * <pre> {@code
30 * class Solver {
31 * final int N;
32 * final float[][] data;
33 * final CyclicBarrier barrier;
34 *
35 * class Worker implements Runnable {
36 * int myRow;
37 * Worker(int row) { myRow = row; }
38 * public void run() {
39 * while (!done()) {
40 * processRow(myRow);
41 *
42 * try {
43 * barrier.await();
44 * } catch (InterruptedException ex) {
45 * return;
46 * } catch (BrokenBarrierException ex) {
47 * return;
48 * }
49 * }
50 * }
51 * }
52 *
53 * public Solver(float[][] matrix) {
54 * data = matrix;
55 * N = matrix.length;
56 * Runnable barrierAction = () -> mergeRows(...);
57 * barrier = new CyclicBarrier(N, barrierAction);
58 *
59 * List<Thread> threads = new ArrayList<>(N);
60 * for (int i = 0; i < N; i++) {
61 * Thread thread = new Thread(new Worker(i));
62 * threads.add(thread);
63 * thread.start();
64 * }
65 *
66 * // wait until done
67 * for (Thread thread : threads)
68 * thread.join();
69 * }
70 * }}</pre>
71 *
72 * Here, each worker thread processes a row of the matrix then waits at the
73 * barrier until all rows have been processed. When all rows are processed
74 * the supplied {@link Runnable} barrier action is executed and merges the
75 * rows. If the merger
76 * determines that a solution has been found then {@code done()} will return
77 * {@code true} and each worker will terminate.
78 *
79 * <p>If the barrier action does not rely on the parties being suspended when
80 * it is executed, then any of the threads in the party could execute that
81 * action when it is released. To facilitate this, each invocation of
82 * {@link #await} returns the arrival index of that thread at the barrier.
83 * You can then choose which thread should execute the barrier action, for
84 * example:
85 * <pre> {@code
86 * if (barrier.await() == 0) {
87 * // log the completion of this iteration
88 * }}</pre>
89 *
90 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
91 * for failed synchronization attempts: If a thread leaves a barrier
92 * point prematurely because of interruption, failure, or timeout, all
93 * other threads waiting at that barrier point will also leave
94 * abnormally via {@link BrokenBarrierException} (or
95 * {@link InterruptedException} if they too were interrupted at about
96 * the same time).
97 *
98 * <p>Memory consistency effects: Actions in a thread prior to calling
99 * {@code await()}
100 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
101 * actions that are part of the barrier action, which in turn
102 * <i>happen-before</i> actions following a successful return from the
103 * corresponding {@code await()} in other threads.
104 *
105 * @see CountDownLatch
106 *
107 * @author Doug Lea
108 * @since 1.5
109 */
110 public class CyclicBarrier {
111 /**
112 * Each use of the barrier is represented as a generation instance.
113 * The generation changes whenever the barrier is tripped, or
114 * is reset. There can be many generations associated with threads
115 * using the barrier - due to the non-deterministic way the lock
116 * may be allocated to waiting threads - but only one of these
117 * can be active at a time (the one to which {@code count} applies)
118 * and all the rest are either broken or tripped.
119 * There need not be an active generation if there has been a break
120 * but no subsequent reset.
121 */
122 private static class Generation {
123 Generation() {} // prevent access constructor creation
124 boolean broken; // initially false
125 }
126
127 /** The lock for guarding barrier entry */
128 private final ReentrantLock lock = new ReentrantLock();
129 /** Condition to wait on until tripped */
130 private final Condition trip = lock.newCondition();
131 /** The number of parties */
132 private final int parties;
133 /** The command to run when tripped */
134 private final Runnable barrierCommand;
135 /** The current generation */
136 private Generation generation = new Generation();
137
138 /**
139 * Number of parties still waiting. Counts down from parties to 0
140 * on each generation. It is reset to parties on each new
141 * generation or when broken.
142 */
143 private int count;
144
145 /**
146 * Updates state on barrier trip and wakes up everyone.
147 * Called only while holding lock.
148 */
149 private void nextGeneration() {
150 // signal completion of last generation
151 trip.signalAll();
152 // set up next generation
153 count = parties;
154 generation = new Generation();
155 }
156
157 /**
158 * Sets current barrier generation as broken and wakes up everyone.
159 * Called only while holding lock.
160 */
161 private void breakBarrier() {
162 generation.broken = true;
163 count = parties;
164 trip.signalAll();
165 }
166
167 /**
168 * Main barrier code, covering the various policies.
169 */
170 private int dowait(boolean timed, long nanos)
171 throws InterruptedException, BrokenBarrierException,
172 TimeoutException {
173 final ReentrantLock lock = this.lock;
174 lock.lock();
175 try {
176 final Generation g = generation;
177
178 if (g.broken)
179 throw new BrokenBarrierException();
180
181 if (Thread.interrupted()) {
182 breakBarrier();
183 throw new InterruptedException();
184 }
185
186 int index = --count;
187 if (index == 0) { // tripped
188 Runnable command = barrierCommand;
189 if (command != null) {
190 try {
191 command.run();
192 } catch (Throwable ex) {
193 breakBarrier();
194 throw ex;
195 }
196 }
197 nextGeneration();
198 return 0;
199 }
200
201 // loop until tripped, broken, interrupted, or timed out
202 for (;;) {
203 try {
204 if (!timed)
205 trip.await();
206 else if (nanos > 0L)
207 nanos = trip.awaitNanos(nanos);
208 } catch (InterruptedException ie) {
209 if (g == generation && ! g.broken) {
210 breakBarrier();
211 throw ie;
212 } else {
213 // We're about to finish waiting even if we had not
214 // been interrupted, so this interrupt is deemed to
215 // "belong" to subsequent execution.
216 Thread.currentThread().interrupt();
217 }
218 }
219
220 if (g.broken)
221 throw new BrokenBarrierException();
222
223 if (g != generation)
224 return index;
225
226 if (timed && nanos <= 0L) {
227 breakBarrier();
228 throw new TimeoutException();
229 }
230 }
231 } finally {
232 lock.unlock();
233 }
234 }
235
236 /**
237 * Creates a new {@code CyclicBarrier} that will trip when the
238 * given number of parties (threads) are waiting upon it, and which
239 * will execute the given barrier action when the barrier is tripped,
240 * performed by the last thread entering the barrier.
241 *
242 * @param parties the number of threads that must invoke {@link #await}
243 * before the barrier is tripped
244 * @param barrierAction the command to execute when the barrier is
245 * tripped, or {@code null} if there is no action
246 * @throws IllegalArgumentException if {@code parties} is less than 1
247 */
248 public CyclicBarrier(int parties, Runnable barrierAction) {
249 if (parties <= 0) throw new IllegalArgumentException();
250 this.parties = parties;
251 this.count = parties;
252 this.barrierCommand = barrierAction;
253 }
254
255 /**
256 * Creates a new {@code CyclicBarrier} that will trip when the
257 * given number of parties (threads) are waiting upon it, and
258 * does not perform a predefined action when the barrier is tripped.
259 *
260 * @param parties the number of threads that must invoke {@link #await}
261 * before the barrier is tripped
262 * @throws IllegalArgumentException if {@code parties} is less than 1
263 */
264 public CyclicBarrier(int parties) {
265 this(parties, null);
266 }
267
268 /**
269 * Returns the number of parties required to trip this barrier.
270 *
271 * @return the number of parties required to trip this barrier
272 */
273 public int getParties() {
274 return parties;
275 }
276
277 /**
278 * Waits until all {@linkplain #getParties parties} have invoked
279 * {@code await} on this barrier.
280 *
281 * <p>If the current thread is not the last to arrive then it is
282 * disabled for thread scheduling purposes and lies dormant until
283 * one of the following things happens:
284 * <ul>
285 * <li>The last thread arrives; or
286 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
287 * the current thread; or
288 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
289 * one of the other waiting threads; or
290 * <li>Some other thread times out while waiting for barrier; or
291 * <li>Some other thread invokes {@link #reset} on this barrier.
292 * </ul>
293 *
294 * <p>If the current thread:
295 * <ul>
296 * <li>has its interrupted status set on entry to this method; or
297 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
298 * </ul>
299 * then {@link InterruptedException} is thrown and the current thread's
300 * interrupted status is cleared.
301 *
302 * <p>If the barrier is {@link #reset} while any thread is waiting,
303 * or if the barrier {@linkplain #isBroken is broken} when
304 * {@code await} is invoked, or while any thread is waiting, then
305 * {@link BrokenBarrierException} is thrown.
306 *
307 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
308 * then all other waiting threads will throw
309 * {@link BrokenBarrierException} and the barrier is placed in the broken
310 * state.
311 *
312 * <p>If the current thread is the last thread to arrive, and a
313 * non-null barrier action was supplied in the constructor, then the
314 * current thread runs the action before allowing the other threads to
315 * continue.
316 * If an exception occurs during the barrier action then that exception
317 * will be propagated in the current thread and the barrier is placed in
318 * the broken state.
319 *
320 * @return the arrival index of the current thread, where index
321 * {@code getParties() - 1} indicates the first
322 * to arrive and zero indicates the last to arrive
323 * @throws InterruptedException if the current thread was interrupted
324 * while waiting
325 * @throws BrokenBarrierException if <em>another</em> thread was
326 * interrupted or timed out while the current thread was
327 * waiting, or the barrier was reset, or the barrier was
328 * broken when {@code await} was called, or the barrier
329 * action (if present) failed due to an exception
330 */
331 public int await() throws InterruptedException, BrokenBarrierException {
332 try {
333 return dowait(false, 0L);
334 } catch (TimeoutException toe) {
335 throw new Error(toe); // cannot happen
336 }
337 }
338
339 /**
340 * Waits until all {@linkplain #getParties parties} have invoked
341 * {@code await} on this barrier, or the specified waiting time elapses.
342 *
343 * <p>If the current thread is not the last to arrive then it is
344 * disabled for thread scheduling purposes and lies dormant until
345 * one of the following things happens:
346 * <ul>
347 * <li>The last thread arrives; or
348 * <li>The specified timeout elapses; or
349 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
350 * the current thread; or
351 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
352 * one of the other waiting threads; or
353 * <li>Some other thread times out while waiting for barrier; or
354 * <li>Some other thread invokes {@link #reset} on this barrier.
355 * </ul>
356 *
357 * <p>If the current thread:
358 * <ul>
359 * <li>has its interrupted status set on entry to this method; or
360 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
361 * </ul>
362 * then {@link InterruptedException} is thrown and the current thread's
363 * interrupted status is cleared.
364 *
365 * <p>If the specified waiting time elapses then {@link TimeoutException}
366 * is thrown. If the time is less than or equal to zero, the
367 * method will not wait at all.
368 *
369 * <p>If the barrier is {@link #reset} while any thread is waiting,
370 * or if the barrier {@linkplain #isBroken is broken} when
371 * {@code await} is invoked, or while any thread is waiting, then
372 * {@link BrokenBarrierException} is thrown.
373 *
374 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
375 * waiting, then all other waiting threads will throw {@link
376 * BrokenBarrierException} and the barrier is placed in the broken
377 * state.
378 *
379 * <p>If the current thread is the last thread to arrive, and a
380 * non-null barrier action was supplied in the constructor, then the
381 * current thread runs the action before allowing the other threads to
382 * continue.
383 * If an exception occurs during the barrier action then that exception
384 * will be propagated in the current thread and the barrier is placed in
385 * the broken state.
386 *
387 * @param timeout the time to wait for the barrier
388 * @param unit the time unit of the timeout parameter
389 * @return the arrival index of the current thread, where index
390 * {@code getParties() - 1} indicates the first
391 * to arrive and zero indicates the last to arrive
392 * @throws InterruptedException if the current thread was interrupted
393 * while waiting
394 * @throws TimeoutException if the specified timeout elapses.
395 * In this case the barrier will be broken.
396 * @throws BrokenBarrierException if <em>another</em> thread was
397 * interrupted or timed out while the current thread was
398 * waiting, or the barrier was reset, or the barrier was broken
399 * when {@code await} was called, or the barrier action (if
400 * present) failed due to an exception
401 */
402 public int await(long timeout, TimeUnit unit)
403 throws InterruptedException,
404 BrokenBarrierException,
405 TimeoutException {
406 return dowait(true, unit.toNanos(timeout));
407 }
408
409 /**
410 * Queries if this barrier is in a broken state.
411 *
412 * @return {@code true} if one or more parties broke out of this
413 * barrier due to interruption or timeout since
414 * construction or the last reset, or a barrier action
415 * failed due to an exception; {@code false} otherwise.
416 */
417 public boolean isBroken() {
418 final ReentrantLock lock = this.lock;
419 lock.lock();
420 try {
421 return generation.broken;
422 } finally {
423 lock.unlock();
424 }
425 }
426
427 /**
428 * Resets the barrier to its initial state. If any parties are
429 * currently waiting at the barrier, they will return with a
430 * {@link BrokenBarrierException}. Note that resets <em>after</em>
431 * a breakage has occurred for other reasons can be complicated to
432 * carry out; threads need to re-synchronize in some other way,
433 * and choose one to perform the reset. It may be preferable to
434 * instead create a new barrier for subsequent use.
435 */
436 public void reset() {
437 final ReentrantLock lock = this.lock;
438 lock.lock();
439 try {
440 breakBarrier(); // break the current generation
441 nextGeneration(); // start a new generation
442 } finally {
443 lock.unlock();
444 }
445 }
446
447 /**
448 * Returns the number of parties currently waiting at the barrier.
449 * This method is primarily useful for debugging and assertions.
450 *
451 * @return the number of parties currently blocked in {@link #await}
452 */
453 public int getNumberWaiting() {
454 final ReentrantLock lock = this.lock;
455 lock.lock();
456 try {
457 return parties - count;
458 } finally {
459 lock.unlock();
460 }
461 }
462 }