ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
Revision: 1.60
Committed: Sat Feb 2 04:02:49 2019 UTC (5 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.59: +5 -6 lines
Log Message:
add a comma for clarity

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11
12 /**
13 * A synchronization aid that allows a set of threads to all wait for
14 * each other to reach a common barrier point. CyclicBarriers are
15 * useful in programs involving a fixed sized party of threads that
16 * must occasionally wait for each other. The barrier is called
17 * <em>cyclic</em> because it can be re-used after the waiting threads
18 * are released.
19 *
20 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
21 * that is run once per barrier point, after the last thread in the party
22 * arrives, but before any threads are released.
23 * This <em>barrier action</em> is useful
24 * for updating shared-state before any of the parties continue.
25 *
26 * <p><b>Sample usage:</b> Here is an example of using a barrier in a
27 * parallel decomposition design:
28 *
29 * <pre> {@code
30 * class Solver {
31 * final int N;
32 * final float[][] data;
33 * final CyclicBarrier barrier;
34 *
35 * class Worker implements Runnable {
36 * int myRow;
37 * Worker(int row) { myRow = row; }
38 * public void run() {
39 * while (!done()) {
40 * processRow(myRow);
41 *
42 * try {
43 * barrier.await();
44 * } catch (InterruptedException ex) {
45 * return;
46 * } catch (BrokenBarrierException ex) {
47 * return;
48 * }
49 * }
50 * }
51 * }
52 *
53 * public Solver(float[][] matrix) {
54 * data = matrix;
55 * N = matrix.length;
56 * Runnable barrierAction = () -> mergeRows(...);
57 * barrier = new CyclicBarrier(N, barrierAction);
58 *
59 * List<Thread> threads = new ArrayList<>(N);
60 * for (int i = 0; i < N; i++) {
61 * Thread thread = new Thread(new Worker(i));
62 * threads.add(thread);
63 * thread.start();
64 * }
65 *
66 * // wait until done
67 * for (Thread thread : threads)
68 * thread.join();
69 * }
70 * }}</pre>
71 *
72 * Here, each worker thread processes a row of the matrix, then waits at the
73 * barrier until all rows have been processed. When all rows are processed the
74 * supplied {@link Runnable} barrier action is executed and merges the rows.
75 * If the merger determines that a solution has been found then {@code done()}
76 * will return {@code true} and each worker will terminate.
77 *
78 * <p>If the barrier action does not rely on the parties being suspended when
79 * it is executed, then any of the threads in the party could execute that
80 * action when it is released. To facilitate this, each invocation of
81 * {@link #await} returns the arrival index of that thread at the barrier.
82 * You can then choose which thread should execute the barrier action, for
83 * example:
84 * <pre> {@code
85 * if (barrier.await() == 0) {
86 * // log the completion of this iteration
87 * }}</pre>
88 *
89 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
90 * for failed synchronization attempts: If a thread leaves a barrier
91 * point prematurely because of interruption, failure, or timeout, all
92 * other threads waiting at that barrier point will also leave
93 * abnormally via {@link BrokenBarrierException} (or
94 * {@link InterruptedException} if they too were interrupted at about
95 * the same time).
96 *
97 * <p>Memory consistency effects: Actions in a thread prior to calling
98 * {@code await()}
99 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
100 * actions that are part of the barrier action, which in turn
101 * <i>happen-before</i> actions following a successful return from the
102 * corresponding {@code await()} in other threads.
103 *
104 * @see CountDownLatch
105 *
106 * @author Doug Lea
107 * @since 1.5
108 */
109 public class CyclicBarrier {
110 /**
111 * Each use of the barrier is represented as a generation instance.
112 * The generation changes whenever the barrier is tripped, or
113 * is reset. There can be many generations associated with threads
114 * using the barrier - due to the non-deterministic way the lock
115 * may be allocated to waiting threads - but only one of these
116 * can be active at a time (the one to which {@code count} applies)
117 * and all the rest are either broken or tripped.
118 * There need not be an active generation if there has been a break
119 * but no subsequent reset.
120 */
121 private static class Generation {
122 Generation() {} // prevent access constructor creation
123 boolean broken; // initially false
124 }
125
126 /** The lock for guarding barrier entry */
127 private final ReentrantLock lock = new ReentrantLock();
128 /** Condition to wait on until tripped */
129 private final Condition trip = lock.newCondition();
130 /** The number of parties */
131 private final int parties;
132 /** The command to run when tripped */
133 private final Runnable barrierCommand;
134 /** The current generation */
135 private Generation generation = new Generation();
136
137 /**
138 * Number of parties still waiting. Counts down from parties to 0
139 * on each generation. It is reset to parties on each new
140 * generation or when broken.
141 */
142 private int count;
143
144 /**
145 * Updates state on barrier trip and wakes up everyone.
146 * Called only while holding lock.
147 */
148 private void nextGeneration() {
149 // signal completion of last generation
150 trip.signalAll();
151 // set up next generation
152 count = parties;
153 generation = new Generation();
154 }
155
156 /**
157 * Sets current barrier generation as broken and wakes up everyone.
158 * Called only while holding lock.
159 */
160 private void breakBarrier() {
161 generation.broken = true;
162 count = parties;
163 trip.signalAll();
164 }
165
166 /**
167 * Main barrier code, covering the various policies.
168 */
169 private int dowait(boolean timed, long nanos)
170 throws InterruptedException, BrokenBarrierException,
171 TimeoutException {
172 final ReentrantLock lock = this.lock;
173 lock.lock();
174 try {
175 final Generation g = generation;
176
177 if (g.broken)
178 throw new BrokenBarrierException();
179
180 if (Thread.interrupted()) {
181 breakBarrier();
182 throw new InterruptedException();
183 }
184
185 int index = --count;
186 if (index == 0) { // tripped
187 Runnable command = barrierCommand;
188 if (command != null) {
189 try {
190 command.run();
191 } catch (Throwable ex) {
192 breakBarrier();
193 throw ex;
194 }
195 }
196 nextGeneration();
197 return 0;
198 }
199
200 // loop until tripped, broken, interrupted, or timed out
201 for (;;) {
202 try {
203 if (!timed)
204 trip.await();
205 else if (nanos > 0L)
206 nanos = trip.awaitNanos(nanos);
207 } catch (InterruptedException ie) {
208 if (g == generation && ! g.broken) {
209 breakBarrier();
210 throw ie;
211 } else {
212 // We're about to finish waiting even if we had not
213 // been interrupted, so this interrupt is deemed to
214 // "belong" to subsequent execution.
215 Thread.currentThread().interrupt();
216 }
217 }
218
219 if (g.broken)
220 throw new BrokenBarrierException();
221
222 if (g != generation)
223 return index;
224
225 if (timed && nanos <= 0L) {
226 breakBarrier();
227 throw new TimeoutException();
228 }
229 }
230 } finally {
231 lock.unlock();
232 }
233 }
234
235 /**
236 * Creates a new {@code CyclicBarrier} that will trip when the
237 * given number of parties (threads) are waiting upon it, and which
238 * will execute the given barrier action when the barrier is tripped,
239 * performed by the last thread entering the barrier.
240 *
241 * @param parties the number of threads that must invoke {@link #await}
242 * before the barrier is tripped
243 * @param barrierAction the command to execute when the barrier is
244 * tripped, or {@code null} if there is no action
245 * @throws IllegalArgumentException if {@code parties} is less than 1
246 */
247 public CyclicBarrier(int parties, Runnable barrierAction) {
248 if (parties <= 0) throw new IllegalArgumentException();
249 this.parties = parties;
250 this.count = parties;
251 this.barrierCommand = barrierAction;
252 }
253
254 /**
255 * Creates a new {@code CyclicBarrier} that will trip when the
256 * given number of parties (threads) are waiting upon it, and
257 * does not perform a predefined action when the barrier is tripped.
258 *
259 * @param parties the number of threads that must invoke {@link #await}
260 * before the barrier is tripped
261 * @throws IllegalArgumentException if {@code parties} is less than 1
262 */
263 public CyclicBarrier(int parties) {
264 this(parties, null);
265 }
266
267 /**
268 * Returns the number of parties required to trip this barrier.
269 *
270 * @return the number of parties required to trip this barrier
271 */
272 public int getParties() {
273 return parties;
274 }
275
276 /**
277 * Waits until all {@linkplain #getParties parties} have invoked
278 * {@code await} on this barrier.
279 *
280 * <p>If the current thread is not the last to arrive then it is
281 * disabled for thread scheduling purposes and lies dormant until
282 * one of the following things happens:
283 * <ul>
284 * <li>The last thread arrives; or
285 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
286 * the current thread; or
287 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
288 * one of the other waiting threads; or
289 * <li>Some other thread times out while waiting for barrier; or
290 * <li>Some other thread invokes {@link #reset} on this barrier.
291 * </ul>
292 *
293 * <p>If the current thread:
294 * <ul>
295 * <li>has its interrupted status set on entry to this method; or
296 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
297 * </ul>
298 * then {@link InterruptedException} is thrown and the current thread's
299 * interrupted status is cleared.
300 *
301 * <p>If the barrier is {@link #reset} while any thread is waiting,
302 * or if the barrier {@linkplain #isBroken is broken} when
303 * {@code await} is invoked, or while any thread is waiting, then
304 * {@link BrokenBarrierException} is thrown.
305 *
306 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
307 * then all other waiting threads will throw
308 * {@link BrokenBarrierException} and the barrier is placed in the broken
309 * state.
310 *
311 * <p>If the current thread is the last thread to arrive, and a
312 * non-null barrier action was supplied in the constructor, then the
313 * current thread runs the action before allowing the other threads to
314 * continue.
315 * If an exception occurs during the barrier action then that exception
316 * will be propagated in the current thread and the barrier is placed in
317 * the broken state.
318 *
319 * @return the arrival index of the current thread, where index
320 * {@code getParties() - 1} indicates the first
321 * to arrive and zero indicates the last to arrive
322 * @throws InterruptedException if the current thread was interrupted
323 * while waiting
324 * @throws BrokenBarrierException if <em>another</em> thread was
325 * interrupted or timed out while the current thread was
326 * waiting, or the barrier was reset, or the barrier was
327 * broken when {@code await} was called, or the barrier
328 * action (if present) failed due to an exception
329 */
330 public int await() throws InterruptedException, BrokenBarrierException {
331 try {
332 return dowait(false, 0L);
333 } catch (TimeoutException toe) {
334 throw new Error(toe); // cannot happen
335 }
336 }
337
338 /**
339 * Waits until all {@linkplain #getParties parties} have invoked
340 * {@code await} on this barrier, or the specified waiting time elapses.
341 *
342 * <p>If the current thread is not the last to arrive then it is
343 * disabled for thread scheduling purposes and lies dormant until
344 * one of the following things happens:
345 * <ul>
346 * <li>The last thread arrives; or
347 * <li>The specified timeout elapses; or
348 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
349 * the current thread; or
350 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
351 * one of the other waiting threads; or
352 * <li>Some other thread times out while waiting for barrier; or
353 * <li>Some other thread invokes {@link #reset} on this barrier.
354 * </ul>
355 *
356 * <p>If the current thread:
357 * <ul>
358 * <li>has its interrupted status set on entry to this method; or
359 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
360 * </ul>
361 * then {@link InterruptedException} is thrown and the current thread's
362 * interrupted status is cleared.
363 *
364 * <p>If the specified waiting time elapses then {@link TimeoutException}
365 * is thrown. If the time is less than or equal to zero, the
366 * method will not wait at all.
367 *
368 * <p>If the barrier is {@link #reset} while any thread is waiting,
369 * or if the barrier {@linkplain #isBroken is broken} when
370 * {@code await} is invoked, or while any thread is waiting, then
371 * {@link BrokenBarrierException} is thrown.
372 *
373 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
374 * waiting, then all other waiting threads will throw {@link
375 * BrokenBarrierException} and the barrier is placed in the broken
376 * state.
377 *
378 * <p>If the current thread is the last thread to arrive, and a
379 * non-null barrier action was supplied in the constructor, then the
380 * current thread runs the action before allowing the other threads to
381 * continue.
382 * If an exception occurs during the barrier action then that exception
383 * will be propagated in the current thread and the barrier is placed in
384 * the broken state.
385 *
386 * @param timeout the time to wait for the barrier
387 * @param unit the time unit of the timeout parameter
388 * @return the arrival index of the current thread, where index
389 * {@code getParties() - 1} indicates the first
390 * to arrive and zero indicates the last to arrive
391 * @throws InterruptedException if the current thread was interrupted
392 * while waiting
393 * @throws TimeoutException if the specified timeout elapses.
394 * In this case the barrier will be broken.
395 * @throws BrokenBarrierException if <em>another</em> thread was
396 * interrupted or timed out while the current thread was
397 * waiting, or the barrier was reset, or the barrier was broken
398 * when {@code await} was called, or the barrier action (if
399 * present) failed due to an exception
400 */
401 public int await(long timeout, TimeUnit unit)
402 throws InterruptedException,
403 BrokenBarrierException,
404 TimeoutException {
405 return dowait(true, unit.toNanos(timeout));
406 }
407
408 /**
409 * Queries if this barrier is in a broken state.
410 *
411 * @return {@code true} if one or more parties broke out of this
412 * barrier due to interruption or timeout since
413 * construction or the last reset, or a barrier action
414 * failed due to an exception; {@code false} otherwise.
415 */
416 public boolean isBroken() {
417 final ReentrantLock lock = this.lock;
418 lock.lock();
419 try {
420 return generation.broken;
421 } finally {
422 lock.unlock();
423 }
424 }
425
426 /**
427 * Resets the barrier to its initial state. If any parties are
428 * currently waiting at the barrier, they will return with a
429 * {@link BrokenBarrierException}. Note that resets <em>after</em>
430 * a breakage has occurred for other reasons can be complicated to
431 * carry out; threads need to re-synchronize in some other way,
432 * and choose one to perform the reset. It may be preferable to
433 * instead create a new barrier for subsequent use.
434 */
435 public void reset() {
436 final ReentrantLock lock = this.lock;
437 lock.lock();
438 try {
439 breakBarrier(); // break the current generation
440 nextGeneration(); // start a new generation
441 } finally {
442 lock.unlock();
443 }
444 }
445
446 /**
447 * Returns the number of parties currently waiting at the barrier.
448 * This method is primarily useful for debugging and assertions.
449 *
450 * @return the number of parties currently blocked in {@link #await}
451 */
452 public int getNumberWaiting() {
453 final ReentrantLock lock = this.lock;
454 lock.lock();
455 try {
456 return parties - count;
457 } finally {
458 lock.unlock();
459 }
460 }
461 }