ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
Revision: 1.46
Committed: Tue Feb 5 19:00:18 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.45: +2 -2 lines
Log Message:
typo

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.Condition;
9 import java.util.concurrent.locks.ReentrantLock;
10
11 /**
12 * A synchronization aid that allows a set of threads to all wait for
13 * each other to reach a common barrier point. CyclicBarriers are
14 * useful in programs involving a fixed sized party of threads that
15 * must occasionally wait for each other. The barrier is called
16 * <em>cyclic</em> because it can be re-used after the waiting threads
17 * are released.
18 *
19 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
20 * that is run once per barrier point, after the last thread in the party
21 * arrives, but before any threads are released.
22 * This <em>barrier action</em> is useful
23 * for updating shared-state before any of the parties continue.
24 *
25 * <p><b>Sample usage:</b> Here is an example of
26 * using a barrier in a parallel decomposition design:
27 *
28 * <pre> {@code
29 * class Solver {
30 * final int N;
31 * final float[][] data;
32 * final CyclicBarrier barrier;
33 *
34 * class Worker implements Runnable {
35 * int myRow;
36 * Worker(int row) { myRow = row; }
37 * public void run() {
38 * while (!done()) {
39 * processRow(myRow);
40 *
41 * try {
42 * barrier.await();
43 * } catch (InterruptedException ex) {
44 * return;
45 * } catch (BrokenBarrierException ex) {
46 * return;
47 * }
48 * }
49 * }
50 * }
51 *
52 * public Solver(float[][] matrix) {
53 * data = matrix;
54 * N = matrix.length;
55 * Runnable barrierAction =
56 * new Runnable() { public void run() { mergeRows(...); }};
57 * barrier = new CyclicBarrier(N, barrierAction);
58 *
59 * List<Thread> threads = new ArrayList<Thread>(N);
60 * for (int i = 0; i < N; i++) {
61 * Thread thread = new Thread(new Worker(i));
62 * threads.add(thread);
63 * thread.start();
64 * }
65 *
66 * // wait until done
67 * for (Thread thread : threads)
68 * thread.join();
69 * }
70 * }}</pre>
71 *
72 * Here, each worker thread processes a row of the matrix then waits at the
73 * barrier until all rows have been processed. When all rows are processed
74 * the supplied {@link Runnable} barrier action is executed and merges the
75 * rows. If the merger
76 * determines that a solution has been found then {@code done()} will return
77 * {@code true} and each worker will terminate.
78 *
79 * <p>If the barrier action does not rely on the parties being suspended when
80 * it is executed, then any of the threads in the party could execute that
81 * action when it is released. To facilitate this, each invocation of
82 * {@link #await} returns the arrival index of that thread at the barrier.
83 * You can then choose which thread should execute the barrier action, for
84 * example:
85 * <pre> {@code
86 * if (barrier.await() == 0) {
87 * // log the completion of this iteration
88 * }}</pre>
89 *
90 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
91 * for failed synchronization attempts: If a thread leaves a barrier
92 * point prematurely because of interruption, failure, or timeout, all
93 * other threads waiting at that barrier point will also leave
94 * abnormally via {@link BrokenBarrierException} (or
95 * {@link InterruptedException} if they too were interrupted at about
96 * the same time).
97 *
98 * <p>Memory consistency effects: Actions in a thread prior to calling
99 * {@code await()}
100 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
101 * actions that are part of the barrier action, which in turn
102 * <i>happen-before</i> actions following a successful return from the
103 * corresponding {@code await()} in other threads.
104 *
105 * @since 1.5
106 * @see CountDownLatch
107 *
108 * @author Doug Lea
109 */
110 public class CyclicBarrier {
111 /**
112 * Each use of the barrier is represented as a generation instance.
113 * The generation changes whenever the barrier is tripped, or
114 * is reset. There can be many generations associated with threads
115 * using the barrier - due to the non-deterministic way the lock
116 * may be allocated to waiting threads - but only one of these
117 * can be active at a time (the one to which {@code count} applies)
118 * and all the rest are either broken or tripped.
119 * There need not be an active generation if there has been a break
120 * but no subsequent reset.
121 */
122 private static class Generation {
123 boolean broken = false;
124 }
125
126 /** The lock for guarding barrier entry */
127 private final ReentrantLock lock = new ReentrantLock();
128 /** Condition to wait on until tripped */
129 private final Condition trip = lock.newCondition();
130 /** The number of parties */
131 private final int parties;
132 /* The command to run when tripped */
133 private final Runnable barrierCommand;
134 /** The current generation */
135 private Generation generation = new Generation();
136
137 /**
138 * Number of parties still waiting. Counts down from parties to 0
139 * on each generation. It is reset to parties on each new
140 * generation or when broken.
141 */
142 private int count;
143
144 /**
145 * Updates state on barrier trip and wakes up everyone.
146 * Called only while holding lock.
147 */
148 private void nextGeneration() {
149 // signal completion of last generation
150 trip.signalAll();
151 // set up next generation
152 count = parties;
153 generation = new Generation();
154 }
155
156 /**
157 * Sets current barrier generation as broken and wakes up everyone.
158 * Called only while holding lock.
159 */
160 private void breakBarrier() {
161 generation.broken = true;
162 count = parties;
163 trip.signalAll();
164 }
165
166 /**
167 * Main barrier code, covering the various policies.
168 */
169 private int dowait(boolean timed, long nanos)
170 throws InterruptedException, BrokenBarrierException,
171 TimeoutException {
172 final ReentrantLock lock = this.lock;
173 lock.lock();
174 try {
175 final Generation g = generation;
176
177 if (g.broken)
178 throw new BrokenBarrierException();
179
180 if (Thread.interrupted()) {
181 breakBarrier();
182 throw new InterruptedException();
183 }
184
185 int index = --count;
186 if (index == 0) { // tripped
187 boolean ranAction = false;
188 try {
189 final Runnable command = barrierCommand;
190 if (command != null)
191 command.run();
192 ranAction = true;
193 nextGeneration();
194 return 0;
195 } finally {
196 if (!ranAction)
197 breakBarrier();
198 }
199 }
200
201 // loop until tripped, broken, interrupted, or timed out
202 for (;;) {
203 try {
204 if (!timed)
205 trip.await();
206 else if (nanos > 0L)
207 nanos = trip.awaitNanos(nanos);
208 } catch (InterruptedException ie) {
209 if (g == generation && ! g.broken) {
210 breakBarrier();
211 throw ie;
212 } else {
213 // We're about to finish waiting even if we had not
214 // been interrupted, so this interrupt is deemed to
215 // "belong" to subsequent execution.
216 Thread.currentThread().interrupt();
217 }
218 }
219
220 if (g.broken)
221 throw new BrokenBarrierException();
222
223 if (g != generation)
224 return index;
225
226 if (timed && nanos <= 0L) {
227 breakBarrier();
228 throw new TimeoutException();
229 }
230 }
231 } finally {
232 lock.unlock();
233 }
234 }
235
236 /**
237 * Creates a new {@code CyclicBarrier} that will trip when the
238 * given number of parties (threads) are waiting upon it, and which
239 * will execute the given barrier action when the barrier is tripped,
240 * performed by the last thread entering the barrier.
241 *
242 * @param parties the number of threads that must invoke {@link #await}
243 * before the barrier is tripped
244 * @param barrierAction the command to execute when the barrier is
245 * tripped, or {@code null} if there is no action
246 * @throws IllegalArgumentException if {@code parties} is less than 1
247 */
248 public CyclicBarrier(int parties, Runnable barrierAction) {
249 if (parties <= 0) throw new IllegalArgumentException();
250 this.parties = parties;
251 this.count = parties;
252 this.barrierCommand = barrierAction;
253 }
254
255 /**
256 * Creates a new {@code CyclicBarrier} that will trip when the
257 * given number of parties (threads) are waiting upon it, and
258 * does not perform a predefined action when the barrier is tripped.
259 *
260 * @param parties the number of threads that must invoke {@link #await}
261 * before the barrier is tripped
262 * @throws IllegalArgumentException if {@code parties} is less than 1
263 */
264 public CyclicBarrier(int parties) {
265 this(parties, null);
266 }
267
268 /**
269 * Returns the number of parties required to trip this barrier.
270 *
271 * @return the number of parties required to trip this barrier
272 */
273 public int getParties() {
274 return parties;
275 }
276
277 /**
278 * Waits until all {@linkplain #getParties parties} have invoked
279 * {@code await} on this barrier.
280 *
281 * <p>If the current thread is not the last to arrive then it is
282 * disabled for thread scheduling purposes and lies dormant until
283 * one of the following things happens:
284 * <ul>
285 * <li>The last thread arrives; or
286 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
287 * the current thread; or
288 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
289 * one of the other waiting threads; or
290 * <li>Some other thread times out while waiting for barrier; or
291 * <li>Some other thread invokes {@link #reset} on this barrier.
292 * </ul>
293 *
294 * <p>If the current thread:
295 * <ul>
296 * <li>has its interrupted status set on entry to this method; or
297 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
298 * </ul>
299 * then {@link InterruptedException} is thrown and the current thread's
300 * interrupted status is cleared.
301 *
302 * <p>If the barrier is {@link #reset} while any thread is waiting,
303 * or if the barrier {@linkplain #isBroken is broken} when
304 * {@code await} is invoked, or while any thread is waiting, then
305 * {@link BrokenBarrierException} is thrown.
306 *
307 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
308 * then all other waiting threads will throw
309 * {@link BrokenBarrierException} and the barrier is placed in the broken
310 * state.
311 *
312 * <p>If the current thread is the last thread to arrive, and a
313 * non-null barrier action was supplied in the constructor, then the
314 * current thread runs the action before allowing the other threads to
315 * continue.
316 * If an exception occurs during the barrier action then that exception
317 * will be propagated in the current thread and the barrier is placed in
318 * the broken state.
319 *
320 * @return the arrival index of the current thread, where index
321 * <tt>{@link #getParties()} - 1</tt> indicates the first
322 * to arrive and zero indicates the last to arrive
323 * @throws InterruptedException if the current thread was interrupted
324 * while waiting
325 * @throws BrokenBarrierException if <em>another</em> thread was
326 * interrupted or timed out while the current thread was
327 * waiting, or the barrier was reset, or the barrier was
328 * broken when {@code await} was called, or the barrier
329 * action (if present) failed due to an exception.
330 */
331 public int await() throws InterruptedException, BrokenBarrierException {
332 try {
333 return dowait(false, 0L);
334 } catch (TimeoutException toe) {
335 throw new Error(toe); // cannot happen
336 }
337 }
338
339 /**
340 * Waits until all {@linkplain #getParties parties} have invoked
341 * {@code await} on this barrier, or the specified waiting time elapses.
342 *
343 * <p>If the current thread is not the last to arrive then it is
344 * disabled for thread scheduling purposes and lies dormant until
345 * one of the following things happens:
346 * <ul>
347 * <li>The last thread arrives; or
348 * <li>The specified timeout elapses; or
349 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
350 * the current thread; or
351 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
352 * one of the other waiting threads; or
353 * <li>Some other thread times out while waiting for barrier; or
354 * <li>Some other thread invokes {@link #reset} on this barrier.
355 * </ul>
356 *
357 * <p>If the current thread:
358 * <ul>
359 * <li>has its interrupted status set on entry to this method; or
360 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
361 * </ul>
362 * then {@link InterruptedException} is thrown and the current thread's
363 * interrupted status is cleared.
364 *
365 * <p>If the specified waiting time elapses then {@link TimeoutException}
366 * is thrown. If the time is less than or equal to zero, the
367 * method will not wait at all.
368 *
369 * <p>If the barrier is {@link #reset} while any thread is waiting,
370 * or if the barrier {@linkplain #isBroken is broken} when
371 * {@code await} is invoked, or while any thread is waiting, then
372 * {@link BrokenBarrierException} is thrown.
373 *
374 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
375 * waiting, then all other waiting threads will throw {@link
376 * BrokenBarrierException} and the barrier is placed in the broken
377 * state.
378 *
379 * <p>If the current thread is the last thread to arrive, and a
380 * non-null barrier action was supplied in the constructor, then the
381 * current thread runs the action before allowing the other threads to
382 * continue.
383 * If an exception occurs during the barrier action then that exception
384 * will be propagated in the current thread and the barrier is placed in
385 * the broken state.
386 *
387 * @param timeout the time to wait for the barrier
388 * @param unit the time unit of the timeout parameter
389 * @return the arrival index of the current thread, where index
390 * <tt>{@link #getParties()} - 1</tt> indicates the first
391 * to arrive and zero indicates the last to arrive
392 * @throws InterruptedException if the current thread was interrupted
393 * while waiting
394 * @throws TimeoutException if the specified timeout elapses
395 * @throws BrokenBarrierException if <em>another</em> thread was
396 * interrupted or timed out while the current thread was
397 * waiting, or the barrier was reset, or the barrier was broken
398 * when {@code await} was called, or the barrier action (if
399 * present) failed due to an exception
400 */
401 public int await(long timeout, TimeUnit unit)
402 throws InterruptedException,
403 BrokenBarrierException,
404 TimeoutException {
405 return dowait(true, unit.toNanos(timeout));
406 }
407
408 /**
409 * Queries if this barrier is in a broken state.
410 *
411 * @return {@code true} if one or more parties broke out of this
412 * barrier due to interruption or timeout since
413 * construction or the last reset, or a barrier action
414 * failed due to an exception; {@code false} otherwise.
415 */
416 public boolean isBroken() {
417 final ReentrantLock lock = this.lock;
418 lock.lock();
419 try {
420 return generation.broken;
421 } finally {
422 lock.unlock();
423 }
424 }
425
426 /**
427 * Resets the barrier to its initial state. If any parties are
428 * currently waiting at the barrier, they will return with a
429 * {@link BrokenBarrierException}. Note that resets <em>after</em>
430 * a breakage has occurred for other reasons can be complicated to
431 * carry out; threads need to re-synchronize in some other way,
432 * and choose one to perform the reset. It may be preferable to
433 * instead create a new barrier for subsequent use.
434 */
435 public void reset() {
436 final ReentrantLock lock = this.lock;
437 lock.lock();
438 try {
439 breakBarrier(); // break the current generation
440 nextGeneration(); // start a new generation
441 } finally {
442 lock.unlock();
443 }
444 }
445
446 /**
447 * Returns the number of parties currently waiting at the barrier.
448 * This method is primarily useful for debugging and assertions.
449 *
450 * @return the number of parties currently blocked in {@link #await}
451 */
452 public int getNumberWaiting() {
453 final ReentrantLock lock = this.lock;
454 lock.lock();
455 try {
456 return parties - count;
457 } finally {
458 lock.unlock();
459 }
460 }
461 }