ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
Revision: 1.62
Committed: Fri Nov 27 17:41:59 2020 UTC (3 years, 5 months ago) by dl
Branch: MAIN
CVS Tags: HEAD
Changes since 1.61: +3 -1 lines
Log Message:
Incorporate snippets code improvements from Pavel Rappo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11
12 /**
13 * A synchronization aid that allows a set of threads to all wait for
14 * each other to reach a common barrier point. CyclicBarriers are
15 * useful in programs involving a fixed sized party of threads that
16 * must occasionally wait for each other. The barrier is called
17 * <em>cyclic</em> because it can be re-used after the waiting threads
18 * are released.
19 *
20 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
21 * that is run once per barrier point, after the last thread in the party
22 * arrives, but before any threads are released.
23 * This <em>barrier action</em> is useful
24 * for updating shared-state before any of the parties continue.
25 *
26 * <p><b>Sample usage:</b> Here is an example of using a barrier in a
27 * parallel decomposition design:
28 *
29 * <pre> {@code
30 * class Solver {
31 * final int N;
32 * final float[][] data;
33 * final CyclicBarrier barrier;
34 *
35 * class Worker implements Runnable {
36 * int myRow;
37 * Worker(int row) { myRow = row; }
38 * public void run() {
39 * while (!done()) {
40 * processRow(myRow);
41 *
42 * try {
43 * barrier.await();
44 * } catch (InterruptedException ex) {
45 * return;
46 * } catch (BrokenBarrierException ex) {
47 * return;
48 * }
49 * }
50 * }
51 * }
52 *
53 * public Solver(float[][] matrix) {
54 * data = matrix;
55 * N = matrix.length;
56 * Runnable barrierAction = () -> mergeRows(...);
57 * barrier = new CyclicBarrier(N, barrierAction);
58 *
59 * List<Thread> threads = new ArrayList<>(N);
60 * for (int i = 0; i < N; i++) {
61 * Thread thread = new Thread(new Worker(i));
62 * threads.add(thread);
63 * thread.start();
64 * }
65 *
66 * // wait until done
67 * for (Thread thread : threads)
68 * try {
69 * thread.join();
70 * } catch (InterruptedException ex) { }
71 * }
72 * }}</pre>
73 *
74 * Here, each worker thread processes a row of the matrix, then waits at the
75 * barrier until all rows have been processed. When all rows are processed the
76 * supplied {@link Runnable} barrier action is executed and merges the rows.
77 * If the merger determines that a solution has been found then {@code done()}
78 * will return {@code true} and each worker will terminate.
79 *
80 * <p>If the barrier action does not rely on the parties being suspended when
81 * it is executed, then any of the threads in the party could execute that
82 * action when it is released. To facilitate this, each invocation of
83 * {@link #await} returns the arrival index of that thread at the barrier.
84 * You can then choose which thread should execute the barrier action, for
85 * example:
86 * <pre> {@code
87 * if (barrier.await() == 0) {
88 * // log the completion of this iteration
89 * }}</pre>
90 *
91 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
92 * for failed synchronization attempts: If a thread leaves a barrier
93 * point prematurely because of interruption, failure, or timeout, all
94 * other threads waiting at that barrier point will also leave
95 * abnormally via {@link BrokenBarrierException} (or
96 * {@link InterruptedException} if they too were interrupted at about
97 * the same time).
98 *
99 * <p>Memory consistency effects: Actions in a thread prior to calling
100 * {@code await()}
101 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
102 * actions that are part of the barrier action, which in turn
103 * <i>happen-before</i> actions following a successful return from the
104 * corresponding {@code await()} in other threads.
105 *
106 * @see CountDownLatch
107 * @see Phaser
108 *
109 * @author Doug Lea
110 * @since 1.5
111 */
112 public class CyclicBarrier {
113 /**
114 * Each use of the barrier is represented as a generation instance.
115 * The generation changes whenever the barrier is tripped, or
116 * is reset. There can be many generations associated with threads
117 * using the barrier - due to the non-deterministic way the lock
118 * may be allocated to waiting threads - but only one of these
119 * can be active at a time (the one to which {@code count} applies)
120 * and all the rest are either broken or tripped.
121 * There need not be an active generation if there has been a break
122 * but no subsequent reset.
123 */
124 private static class Generation {
125 Generation() {} // prevent access constructor creation
126 boolean broken; // initially false
127 }
128
129 /** The lock for guarding barrier entry */
130 private final ReentrantLock lock = new ReentrantLock();
131 /** Condition to wait on until tripped */
132 private final Condition trip = lock.newCondition();
133 /** The number of parties */
134 private final int parties;
135 /** The command to run when tripped */
136 private final Runnable barrierCommand;
137 /** The current generation */
138 private Generation generation = new Generation();
139
140 /**
141 * Number of parties still waiting. Counts down from parties to 0
142 * on each generation. It is reset to parties on each new
143 * generation or when broken.
144 */
145 private int count;
146
147 /**
148 * Updates state on barrier trip and wakes up everyone.
149 * Called only while holding lock.
150 */
151 private void nextGeneration() {
152 // signal completion of last generation
153 trip.signalAll();
154 // set up next generation
155 count = parties;
156 generation = new Generation();
157 }
158
159 /**
160 * Sets current barrier generation as broken and wakes up everyone.
161 * Called only while holding lock.
162 */
163 private void breakBarrier() {
164 generation.broken = true;
165 count = parties;
166 trip.signalAll();
167 }
168
169 /**
170 * Main barrier code, covering the various policies.
171 */
172 private int dowait(boolean timed, long nanos)
173 throws InterruptedException, BrokenBarrierException,
174 TimeoutException {
175 final ReentrantLock lock = this.lock;
176 lock.lock();
177 try {
178 final Generation g = generation;
179
180 if (g.broken)
181 throw new BrokenBarrierException();
182
183 if (Thread.interrupted()) {
184 breakBarrier();
185 throw new InterruptedException();
186 }
187
188 int index = --count;
189 if (index == 0) { // tripped
190 Runnable command = barrierCommand;
191 if (command != null) {
192 try {
193 command.run();
194 } catch (Throwable ex) {
195 breakBarrier();
196 throw ex;
197 }
198 }
199 nextGeneration();
200 return 0;
201 }
202
203 // loop until tripped, broken, interrupted, or timed out
204 for (;;) {
205 try {
206 if (!timed)
207 trip.await();
208 else if (nanos > 0L)
209 nanos = trip.awaitNanos(nanos);
210 } catch (InterruptedException ie) {
211 if (g == generation && ! g.broken) {
212 breakBarrier();
213 throw ie;
214 } else {
215 // We're about to finish waiting even if we had not
216 // been interrupted, so this interrupt is deemed to
217 // "belong" to subsequent execution.
218 Thread.currentThread().interrupt();
219 }
220 }
221
222 if (g.broken)
223 throw new BrokenBarrierException();
224
225 if (g != generation)
226 return index;
227
228 if (timed && nanos <= 0L) {
229 breakBarrier();
230 throw new TimeoutException();
231 }
232 }
233 } finally {
234 lock.unlock();
235 }
236 }
237
238 /**
239 * Creates a new {@code CyclicBarrier} that will trip when the
240 * given number of parties (threads) are waiting upon it, and which
241 * will execute the given barrier action when the barrier is tripped,
242 * performed by the last thread entering the barrier.
243 *
244 * @param parties the number of threads that must invoke {@link #await}
245 * before the barrier is tripped
246 * @param barrierAction the command to execute when the barrier is
247 * tripped, or {@code null} if there is no action
248 * @throws IllegalArgumentException if {@code parties} is less than 1
249 */
250 public CyclicBarrier(int parties, Runnable barrierAction) {
251 if (parties <= 0) throw new IllegalArgumentException();
252 this.parties = parties;
253 this.count = parties;
254 this.barrierCommand = barrierAction;
255 }
256
257 /**
258 * Creates a new {@code CyclicBarrier} that will trip when the
259 * given number of parties (threads) are waiting upon it, and
260 * does not perform a predefined action when the barrier is tripped.
261 *
262 * @param parties the number of threads that must invoke {@link #await}
263 * before the barrier is tripped
264 * @throws IllegalArgumentException if {@code parties} is less than 1
265 */
266 public CyclicBarrier(int parties) {
267 this(parties, null);
268 }
269
270 /**
271 * Returns the number of parties required to trip this barrier.
272 *
273 * @return the number of parties required to trip this barrier
274 */
275 public int getParties() {
276 return parties;
277 }
278
279 /**
280 * Waits until all {@linkplain #getParties parties} have invoked
281 * {@code await} on this barrier.
282 *
283 * <p>If the current thread is not the last to arrive then it is
284 * disabled for thread scheduling purposes and lies dormant until
285 * one of the following things happens:
286 * <ul>
287 * <li>The last thread arrives; or
288 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
289 * the current thread; or
290 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
291 * one of the other waiting threads; or
292 * <li>Some other thread times out while waiting for barrier; or
293 * <li>Some other thread invokes {@link #reset} on this barrier.
294 * </ul>
295 *
296 * <p>If the current thread:
297 * <ul>
298 * <li>has its interrupted status set on entry to this method; or
299 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
300 * </ul>
301 * then {@link InterruptedException} is thrown and the current thread's
302 * interrupted status is cleared.
303 *
304 * <p>If the barrier is {@link #reset} while any thread is waiting,
305 * or if the barrier {@linkplain #isBroken is broken} when
306 * {@code await} is invoked, or while any thread is waiting, then
307 * {@link BrokenBarrierException} is thrown.
308 *
309 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
310 * then all other waiting threads will throw
311 * {@link BrokenBarrierException} and the barrier is placed in the broken
312 * state.
313 *
314 * <p>If the current thread is the last thread to arrive, and a
315 * non-null barrier action was supplied in the constructor, then the
316 * current thread runs the action before allowing the other threads to
317 * continue.
318 * If an exception occurs during the barrier action then that exception
319 * will be propagated in the current thread and the barrier is placed in
320 * the broken state.
321 *
322 * @return the arrival index of the current thread, where index
323 * {@code getParties() - 1} indicates the first
324 * to arrive and zero indicates the last to arrive
325 * @throws InterruptedException if the current thread was interrupted
326 * while waiting
327 * @throws BrokenBarrierException if <em>another</em> thread was
328 * interrupted or timed out while the current thread was
329 * waiting, or the barrier was reset, or the barrier was
330 * broken when {@code await} was called, or the barrier
331 * action (if present) failed due to an exception
332 */
333 public int await() throws InterruptedException, BrokenBarrierException {
334 try {
335 return dowait(false, 0L);
336 } catch (TimeoutException toe) {
337 throw new Error(toe); // cannot happen
338 }
339 }
340
341 /**
342 * Waits until all {@linkplain #getParties parties} have invoked
343 * {@code await} on this barrier, or the specified waiting time elapses.
344 *
345 * <p>If the current thread is not the last to arrive then it is
346 * disabled for thread scheduling purposes and lies dormant until
347 * one of the following things happens:
348 * <ul>
349 * <li>The last thread arrives; or
350 * <li>The specified timeout elapses; or
351 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
352 * the current thread; or
353 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
354 * one of the other waiting threads; or
355 * <li>Some other thread times out while waiting for barrier; or
356 * <li>Some other thread invokes {@link #reset} on this barrier.
357 * </ul>
358 *
359 * <p>If the current thread:
360 * <ul>
361 * <li>has its interrupted status set on entry to this method; or
362 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
363 * </ul>
364 * then {@link InterruptedException} is thrown and the current thread's
365 * interrupted status is cleared.
366 *
367 * <p>If the specified waiting time elapses then {@link TimeoutException}
368 * is thrown. If the time is less than or equal to zero, the
369 * method will not wait at all.
370 *
371 * <p>If the barrier is {@link #reset} while any thread is waiting,
372 * or if the barrier {@linkplain #isBroken is broken} when
373 * {@code await} is invoked, or while any thread is waiting, then
374 * {@link BrokenBarrierException} is thrown.
375 *
376 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
377 * waiting, then all other waiting threads will throw {@link
378 * BrokenBarrierException} and the barrier is placed in the broken
379 * state.
380 *
381 * <p>If the current thread is the last thread to arrive, and a
382 * non-null barrier action was supplied in the constructor, then the
383 * current thread runs the action before allowing the other threads to
384 * continue.
385 * If an exception occurs during the barrier action then that exception
386 * will be propagated in the current thread and the barrier is placed in
387 * the broken state.
388 *
389 * @param timeout the time to wait for the barrier
390 * @param unit the time unit of the timeout parameter
391 * @return the arrival index of the current thread, where index
392 * {@code getParties() - 1} indicates the first
393 * to arrive and zero indicates the last to arrive
394 * @throws InterruptedException if the current thread was interrupted
395 * while waiting
396 * @throws TimeoutException if the specified timeout elapses.
397 * In this case the barrier will be broken.
398 * @throws BrokenBarrierException if <em>another</em> thread was
399 * interrupted or timed out while the current thread was
400 * waiting, or the barrier was reset, or the barrier was broken
401 * when {@code await} was called, or the barrier action (if
402 * present) failed due to an exception
403 */
404 public int await(long timeout, TimeUnit unit)
405 throws InterruptedException,
406 BrokenBarrierException,
407 TimeoutException {
408 return dowait(true, unit.toNanos(timeout));
409 }
410
411 /**
412 * Queries if this barrier is in a broken state.
413 *
414 * @return {@code true} if one or more parties broke out of this
415 * barrier due to interruption or timeout since
416 * construction or the last reset, or a barrier action
417 * failed due to an exception; {@code false} otherwise.
418 */
419 public boolean isBroken() {
420 final ReentrantLock lock = this.lock;
421 lock.lock();
422 try {
423 return generation.broken;
424 } finally {
425 lock.unlock();
426 }
427 }
428
429 /**
430 * Resets the barrier to its initial state. If any parties are
431 * currently waiting at the barrier, they will return with a
432 * {@link BrokenBarrierException}. Note that resets <em>after</em>
433 * a breakage has occurred for other reasons can be complicated to
434 * carry out; threads need to re-synchronize in some other way,
435 * and choose one to perform the reset. It may be preferable to
436 * instead create a new barrier for subsequent use.
437 */
438 public void reset() {
439 final ReentrantLock lock = this.lock;
440 lock.lock();
441 try {
442 breakBarrier(); // break the current generation
443 nextGeneration(); // start a new generation
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 /**
450 * Returns the number of parties currently waiting at the barrier.
451 * This method is primarily useful for debugging and assertions.
452 *
453 * @return the number of parties currently blocked in {@link #await}
454 */
455 public int getNumberWaiting() {
456 final ReentrantLock lock = this.lock;
457 lock.lock();
458 try {
459 return parties - count;
460 } finally {
461 lock.unlock();
462 }
463 }
464 }