ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CyclicBarrier.java
Revision: 1.51
Committed: Sat May 3 21:05:50 2014 UTC (10 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.50: +1 -1 lines
Log Message:
/* => /** for javadoc

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.22 * Expert Group and released to the public domain, as explained at
4 jsr166 1.39 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.43 import java.util.concurrent.locks.Condition;
9     import java.util.concurrent.locks.ReentrantLock;
10 tim 1.1
11     /**
12 tim 1.10 * A synchronization aid that allows a set of threads to all wait for
13 brian 1.4 * each other to reach a common barrier point. CyclicBarriers are
14 dl 1.3 * useful in programs involving a fixed sized party of threads that
15 brian 1.4 * must occasionally wait for each other. The barrier is called
16 dl 1.3 * <em>cyclic</em> because it can be re-used after the waiting threads
17     * are released.
18 tim 1.1 *
19 jsr166 1.45 * <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
20 tim 1.1 * that is run once per barrier point, after the last thread in the party
21 dl 1.28 * arrives, but before any threads are released.
22 tim 1.1 * This <em>barrier action</em> is useful
23     * for updating shared-state before any of the parties continue.
24 dl 1.28 *
25 jsr166 1.48 * <p><b>Sample usage:</b> Here is an example of using a barrier in a
26     * parallel decomposition design:
27 jsr166 1.41 *
28     * <pre> {@code
29 tim 1.1 * class Solver {
30     * final int N;
31     * final float[][] data;
32     * final CyclicBarrier barrier;
33 dl 1.28 *
34 tim 1.1 * class Worker implements Runnable {
35     * int myRow;
36     * Worker(int row) { myRow = row; }
37     * public void run() {
38     * while (!done()) {
39     * processRow(myRow);
40     *
41     * try {
42 dl 1.28 * barrier.await();
43     * } catch (InterruptedException ex) {
44     * return;
45     * } catch (BrokenBarrierException ex) {
46     * return;
47 tim 1.1 * }
48     * }
49     * }
50     * }
51     *
52     * public Solver(float[][] matrix) {
53     * data = matrix;
54     * N = matrix.length;
55 jsr166 1.44 * Runnable barrierAction =
56     * new Runnable() { public void run() { mergeRows(...); }};
57     * barrier = new CyclicBarrier(N, barrierAction);
58 tim 1.1 *
59 jsr166 1.44 * List<Thread> threads = new ArrayList<Thread>(N);
60     * for (int i = 0; i < N; i++) {
61     * Thread thread = new Thread(new Worker(i));
62     * threads.add(thread);
63     * thread.start();
64     * }
65     *
66     * // wait until done
67     * for (Thread thread : threads)
68     * thread.join();
69 tim 1.1 * }
70 jsr166 1.41 * }}</pre>
71     *
72 dl 1.28 * Here, each worker thread processes a row of the matrix then waits at the
73 tim 1.1 * barrier until all rows have been processed. When all rows are processed
74 dl 1.28 * the supplied {@link Runnable} barrier action is executed and merges the
75 tim 1.1 * rows. If the merger
76 jsr166 1.45 * determines that a solution has been found then {@code done()} will return
77     * {@code true} and each worker will terminate.
78 tim 1.1 *
79     * <p>If the barrier action does not rely on the parties being suspended when
80     * it is executed, then any of the threads in the party could execute that
81     * action when it is released. To facilitate this, each invocation of
82     * {@link #await} returns the arrival index of that thread at the barrier.
83 dl 1.28 * You can then choose which thread should execute the barrier action, for
84 tim 1.1 * example:
85 jsr166 1.41 * <pre> {@code
86     * if (barrier.await() == 0) {
87     * // log the completion of this iteration
88     * }}</pre>
89 tim 1.1 *
90 jsr166 1.45 * <p>The {@code CyclicBarrier} uses an all-or-none breakage model
91 dl 1.28 * for failed synchronization attempts: If a thread leaves a barrier
92     * point prematurely because of interruption, failure, or timeout, all
93     * other threads waiting at that barrier point will also leave
94     * abnormally via {@link BrokenBarrierException} (or
95 dl 1.31 * {@link InterruptedException} if they too were interrupted at about
96 dl 1.28 * the same time).
97 tim 1.1 *
98 jsr166 1.35 * <p>Memory consistency effects: Actions in a thread prior to calling
99     * {@code await()}
100     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
101 brian 1.33 * actions that are part of the barrier action, which in turn
102 jsr166 1.35 * <i>happen-before</i> actions following a successful return from the
103     * corresponding {@code await()} in other threads.
104 brian 1.33 *
105 tim 1.1 * @since 1.5
106 brian 1.4 * @see CountDownLatch
107 tim 1.1 *
108 dl 1.5 * @author Doug Lea
109 tim 1.1 */
110     public class CyclicBarrier {
111 dl 1.28 /**
112     * Each use of the barrier is represented as a generation instance.
113     * The generation changes whenever the barrier is tripped, or
114     * is reset. There can be many generations associated with threads
115     * using the barrier - due to the non-deterministic way the lock
116     * may be allocated to waiting threads - but only one of these
117 jsr166 1.45 * can be active at a time (the one to which {@code count} applies)
118 dl 1.28 * and all the rest are either broken or tripped.
119     * There need not be an active generation if there has been a break
120     * but no subsequent reset.
121     */
122     private static class Generation {
123     boolean broken = false;
124     }
125    
126 dl 1.5 /** The lock for guarding barrier entry */
127 dl 1.2 private final ReentrantLock lock = new ReentrantLock();
128 dl 1.5 /** Condition to wait on until tripped */
129 dl 1.21 private final Condition trip = lock.newCondition();
130 dl 1.5 /** The number of parties */
131 dl 1.2 private final int parties;
132 jsr166 1.51 /** The command to run when tripped */
133 dl 1.12 private final Runnable barrierCommand;
134 dl 1.28 /** The current generation */
135     private Generation generation = new Generation();
136 dl 1.2
137     /**
138     * Number of parties still waiting. Counts down from parties to 0
139 dl 1.31 * on each generation. It is reset to parties on each new
140     * generation or when broken.
141 dl 1.2 */
142 dl 1.28 private int count;
143 dl 1.2
144     /**
145 jsr166 1.29 * Updates state on barrier trip and wakes up everyone.
146 dl 1.28 * Called only while holding lock.
147     */
148 dl 1.2 private void nextGeneration() {
149 dl 1.28 // signal completion of last generation
150     trip.signalAll();
151     // set up next generation
152 dl 1.2 count = parties;
153 dl 1.28 generation = new Generation();
154 dl 1.12 }
155    
156     /**
157 jsr166 1.29 * Sets current barrier generation as broken and wakes up everyone.
158 dl 1.28 * Called only while holding lock.
159 dl 1.12 */
160     private void breakBarrier() {
161 dl 1.28 generation.broken = true;
162 jsr166 1.38 count = parties;
163 dl 1.12 trip.signalAll();
164 dl 1.2 }
165    
166 dl 1.5 /**
167 dl 1.7 * Main barrier code, covering the various policies.
168 dl 1.5 */
169 dl 1.28 private int dowait(boolean timed, long nanos)
170     throws InterruptedException, BrokenBarrierException,
171     TimeoutException {
172 dl 1.20 final ReentrantLock lock = this.lock;
173 dl 1.2 lock.lock();
174     try {
175 dl 1.30 final Generation g = generation;
176 dl 1.2
177 dl 1.28 if (g.broken)
178 dl 1.2 throw new BrokenBarrierException();
179    
180     if (Thread.interrupted()) {
181 dl 1.12 breakBarrier();
182 dl 1.2 throw new InterruptedException();
183     }
184    
185 jsr166 1.40 int index = --count;
186     if (index == 0) { // tripped
187     boolean ranAction = false;
188     try {
189     final Runnable command = barrierCommand;
190     if (command != null)
191     command.run();
192     ranAction = true;
193     nextGeneration();
194     return 0;
195     } finally {
196     if (!ranAction)
197     breakBarrier();
198     }
199     }
200 dl 1.2
201 dl 1.28 // loop until tripped, broken, interrupted, or timed out
202 dl 1.12 for (;;) {
203 dl 1.2 try {
204 dl 1.28 if (!timed)
205 dl 1.2 trip.await();
206 dl 1.23 else if (nanos > 0L)
207 dl 1.2 nanos = trip.awaitNanos(nanos);
208 dl 1.12 } catch (InterruptedException ie) {
209 jsr166 1.36 if (g == generation && ! g.broken) {
210     breakBarrier();
211 jsr166 1.38 throw ie;
212     } else {
213     // We're about to finish waiting even if we had not
214     // been interrupted, so this interrupt is deemed to
215     // "belong" to subsequent execution.
216     Thread.currentThread().interrupt();
217     }
218 dl 1.2 }
219 dl 1.28
220 dl 1.30 if (g.broken)
221 dl 1.12 throw new BrokenBarrierException();
222    
223 jsr166 1.36 if (g != generation)
224 dl 1.12 return index;
225    
226 dl 1.23 if (timed && nanos <= 0L) {
227 dl 1.12 breakBarrier();
228 dl 1.2 throw new TimeoutException();
229     }
230     }
231 tim 1.9 } finally {
232 dl 1.2 lock.unlock();
233     }
234     }
235 tim 1.1
236     /**
237 jsr166 1.45 * Creates a new {@code CyclicBarrier} that will trip when the
238 tim 1.1 * given number of parties (threads) are waiting upon it, and which
239 dl 1.17 * will execute the given barrier action when the barrier is tripped,
240 dl 1.19 * performed by the last thread entering the barrier.
241 tim 1.1 *
242     * @param parties the number of threads that must invoke {@link #await}
243 jsr166 1.37 * before the barrier is tripped
244 tim 1.1 * @param barrierAction the command to execute when the barrier is
245 jsr166 1.37 * tripped, or {@code null} if there is no action
246     * @throws IllegalArgumentException if {@code parties} is less than 1
247 tim 1.1 */
248     public CyclicBarrier(int parties, Runnable barrierAction) {
249 dl 1.2 if (parties <= 0) throw new IllegalArgumentException();
250 dl 1.28 this.parties = parties;
251 dl 1.2 this.count = parties;
252     this.barrierCommand = barrierAction;
253 tim 1.1 }
254    
255     /**
256 jsr166 1.45 * Creates a new {@code CyclicBarrier} that will trip when the
257 dl 1.14 * given number of parties (threads) are waiting upon it, and
258 dl 1.31 * does not perform a predefined action when the barrier is tripped.
259 tim 1.1 *
260     * @param parties the number of threads that must invoke {@link #await}
261 jsr166 1.37 * before the barrier is tripped
262     * @throws IllegalArgumentException if {@code parties} is less than 1
263 tim 1.1 */
264     public CyclicBarrier(int parties) {
265 dl 1.2 this(parties, null);
266 tim 1.1 }
267    
268     /**
269 dl 1.25 * Returns the number of parties required to trip this barrier.
270 jsr166 1.37 *
271     * @return the number of parties required to trip this barrier
272 dl 1.31 */
273 tim 1.1 public int getParties() {
274 dl 1.2 return parties;
275 tim 1.1 }
276    
277     /**
278 jsr166 1.37 * Waits until all {@linkplain #getParties parties} have invoked
279 jsr166 1.45 * {@code await} on this barrier.
280 tim 1.1 *
281     * <p>If the current thread is not the last to arrive then it is
282     * disabled for thread scheduling purposes and lies dormant until
283 jsr166 1.36 * one of the following things happens:
284 tim 1.1 * <ul>
285     * <li>The last thread arrives; or
286 jsr166 1.37 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
287     * the current thread; or
288     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
289     * one of the other waiting threads; or
290 dl 1.24 * <li>Some other thread times out while waiting for barrier; or
291 tim 1.1 * <li>Some other thread invokes {@link #reset} on this barrier.
292     * </ul>
293 jsr166 1.37 *
294 tim 1.1 * <p>If the current thread:
295     * <ul>
296     * <li>has its interrupted status set on entry to this method; or
297 jsr166 1.37 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
298 tim 1.1 * </ul>
299     * then {@link InterruptedException} is thrown and the current thread's
300     * interrupted status is cleared.
301     *
302 jsr166 1.37 * <p>If the barrier is {@link #reset} while any thread is waiting,
303     * or if the barrier {@linkplain #isBroken is broken} when
304 jsr166 1.45 * {@code await} is invoked, or while any thread is waiting, then
305 jsr166 1.37 * {@link BrokenBarrierException} is thrown.
306 tim 1.1 *
307 jsr166 1.37 * <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
308 dl 1.28 * then all other waiting threads will throw
309 tim 1.1 * {@link BrokenBarrierException} and the barrier is placed in the broken
310     * state.
311     *
312     * <p>If the current thread is the last thread to arrive, and a
313     * non-null barrier action was supplied in the constructor, then the
314 dl 1.28 * current thread runs the action before allowing the other threads to
315 tim 1.1 * continue.
316     * If an exception occurs during the barrier action then that exception
317 dholmes 1.13 * will be propagated in the current thread and the barrier is placed in
318     * the broken state.
319 tim 1.1 *
320     * @return the arrival index of the current thread, where index
321 jsr166 1.49 * {@code getParties() - 1} indicates the first
322 jsr166 1.37 * to arrive and zero indicates the last to arrive
323 dl 1.28 * @throws InterruptedException if the current thread was interrupted
324 jsr166 1.37 * while waiting
325 tim 1.1 * @throws BrokenBarrierException if <em>another</em> thread was
326 jsr166 1.37 * interrupted or timed out while the current thread was
327     * waiting, or the barrier was reset, or the barrier was
328     * broken when {@code await} was called, or the barrier
329 jsr166 1.47 * action (if present) failed due to an exception
330 tim 1.1 */
331     public int await() throws InterruptedException, BrokenBarrierException {
332 dl 1.2 try {
333 dl 1.23 return dowait(false, 0L);
334 tim 1.9 } catch (TimeoutException toe) {
335 jsr166 1.42 throw new Error(toe); // cannot happen
336 dl 1.2 }
337     }
338    
339     /**
340 jsr166 1.37 * Waits until all {@linkplain #getParties parties} have invoked
341 jsr166 1.45 * {@code await} on this barrier, or the specified waiting time elapses.
342 dl 1.2 *
343     * <p>If the current thread is not the last to arrive then it is
344     * disabled for thread scheduling purposes and lies dormant until
345     * one of the following things happens:
346     * <ul>
347     * <li>The last thread arrives; or
348 dholmes 1.13 * <li>The specified timeout elapses; or
349 jsr166 1.37 * <li>Some other thread {@linkplain Thread#interrupt interrupts}
350     * the current thread; or
351     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
352     * one of the other waiting threads; or
353 dl 1.24 * <li>Some other thread times out while waiting for barrier; or
354 dl 1.2 * <li>Some other thread invokes {@link #reset} on this barrier.
355     * </ul>
356 jsr166 1.37 *
357 dl 1.2 * <p>If the current thread:
358     * <ul>
359     * <li>has its interrupted status set on entry to this method; or
360 jsr166 1.37 * <li>is {@linkplain Thread#interrupt interrupted} while waiting
361 dl 1.2 * </ul>
362     * then {@link InterruptedException} is thrown and the current thread's
363     * interrupted status is cleared.
364     *
365 dl 1.26 * <p>If the specified waiting time elapses then {@link TimeoutException}
366     * is thrown. If the time is less than or equal to zero, the
367     * method will not wait at all.
368     *
369 jsr166 1.37 * <p>If the barrier is {@link #reset} while any thread is waiting,
370     * or if the barrier {@linkplain #isBroken is broken} when
371 jsr166 1.45 * {@code await} is invoked, or while any thread is waiting, then
372 jsr166 1.37 * {@link BrokenBarrierException} is thrown.
373     *
374     * <p>If any thread is {@linkplain Thread#interrupt interrupted} while
375     * waiting, then all other waiting threads will throw {@link
376     * BrokenBarrierException} and the barrier is placed in the broken
377 dl 1.2 * state.
378     *
379     * <p>If the current thread is the last thread to arrive, and a
380     * non-null barrier action was supplied in the constructor, then the
381 dl 1.28 * current thread runs the action before allowing the other threads to
382 dl 1.2 * continue.
383     * If an exception occurs during the barrier action then that exception
384 dholmes 1.13 * will be propagated in the current thread and the barrier is placed in
385     * the broken state.
386 dl 1.2 *
387 dl 1.5 * @param timeout the time to wait for the barrier
388     * @param unit the time unit of the timeout parameter
389 dl 1.2 * @return the arrival index of the current thread, where index
390 jsr166 1.49 * {@code getParties() - 1} indicates the first
391 jsr166 1.37 * to arrive and zero indicates the last to arrive
392 dl 1.28 * @throws InterruptedException if the current thread was interrupted
393 jsr166 1.37 * while waiting
394 jsr166 1.50 * @throws TimeoutException if the specified timeout elapses.
395     * In this case the barrier will be broken.
396 dl 1.2 * @throws BrokenBarrierException if <em>another</em> thread was
397 jsr166 1.37 * interrupted or timed out while the current thread was
398     * waiting, or the barrier was reset, or the barrier was broken
399     * when {@code await} was called, or the barrier action (if
400 jsr166 1.46 * present) failed due to an exception
401 dl 1.28 */
402     public int await(long timeout, TimeUnit unit)
403     throws InterruptedException,
404     BrokenBarrierException,
405     TimeoutException {
406 dl 1.2 return dowait(true, unit.toNanos(timeout));
407 tim 1.1 }
408    
409     /**
410 dl 1.25 * Queries if this barrier is in a broken state.
411 jsr166 1.37 *
412     * @return {@code true} if one or more parties broke out of this
413     * barrier due to interruption or timeout since
414     * construction or the last reset, or a barrier action
415     * failed due to an exception; {@code false} otherwise.
416 tim 1.1 */
417     public boolean isBroken() {
418 dl 1.20 final ReentrantLock lock = this.lock;
419 dl 1.2 lock.lock();
420     try {
421 dl 1.28 return generation.broken;
422 tim 1.9 } finally {
423 dl 1.2 lock.unlock();
424     }
425 tim 1.1 }
426    
427     /**
428 dl 1.25 * Resets the barrier to its initial state. If any parties are
429 tim 1.1 * currently waiting at the barrier, they will return with a
430 dl 1.8 * {@link BrokenBarrierException}. Note that resets <em>after</em>
431 dl 1.12 * a breakage has occurred for other reasons can be complicated to
432     * carry out; threads need to re-synchronize in some other way,
433     * and choose one to perform the reset. It may be preferable to
434     * instead create a new barrier for subsequent use.
435 tim 1.1 */
436     public void reset() {
437 dl 1.20 final ReentrantLock lock = this.lock;
438 dl 1.2 lock.lock();
439     try {
440 dl 1.28 breakBarrier(); // break the current generation
441     nextGeneration(); // start a new generation
442 tim 1.9 } finally {
443 dl 1.2 lock.unlock();
444     }
445 tim 1.1 }
446    
447     /**
448 dl 1.25 * Returns the number of parties currently waiting at the barrier.
449 tim 1.1 * This method is primarily useful for debugging and assertions.
450     *
451 jsr166 1.37 * @return the number of parties currently blocked in {@link #await}
452 jsr166 1.32 */
453 tim 1.1 public int getNumberWaiting() {
454 dl 1.20 final ReentrantLock lock = this.lock;
455 dl 1.2 lock.lock();
456     try {
457 dl 1.31 return parties - count;
458 tim 1.9 } finally {
459 dl 1.2 lock.unlock();
460     }
461 tim 1.1 }
462     }