79 |
|
* immediately return without updating phaser state or waiting for |
80 |
|
* advance, and indicating (via a negative phase value) that execution |
81 |
|
* is complete. Termination is triggered when an invocation of {@code |
82 |
< |
* onAdvance} returns {@code true}. As illustrated below, when |
82 |
> |
* onAdvance} returns {@code true}. The default implementation returns |
83 |
> |
* {@code true} if a deregistration has caused the number of |
84 |
> |
* registered parties to become zero. As illustrated below, when |
85 |
|
* phasers control actions with a fixed number of iterations, it is |
86 |
|
* often convenient to override this method to cause termination when |
87 |
|
* the current phase number reaches a threshold. Method {@link |
242 |
|
*/ |
243 |
|
private volatile long state; |
244 |
|
|
245 |
< |
private static final int MAX_PARTIES = 0xffff; |
246 |
< |
private static final int MAX_PHASE = 0x7fffffff; |
247 |
< |
private static final int PARTIES_SHIFT = 16; |
248 |
< |
private static final int PHASE_SHIFT = 32; |
249 |
< |
private static final int UNARRIVED_MASK = 0xffff; |
250 |
< |
private static final long PARTIES_MASK = 0xffff0000L; // for masking long |
251 |
< |
private static final long ONE_ARRIVAL = 1L; |
252 |
< |
private static final long ONE_PARTY = 1L << PARTIES_SHIFT; |
253 |
< |
private static final long TERMINATION_PHASE = -1L << PHASE_SHIFT; |
245 |
> |
private static final int MAX_PARTIES = 0xffff; |
246 |
> |
private static final int MAX_PHASE = 0x7fffffff; |
247 |
> |
private static final int PARTIES_SHIFT = 16; |
248 |
> |
private static final int PHASE_SHIFT = 32; |
249 |
> |
private static final int UNARRIVED_MASK = 0xffff; // to mask ints |
250 |
> |
private static final long PARTIES_MASK = 0xffff0000L; // to mask longs |
251 |
> |
private static final long ONE_ARRIVAL = 1L; |
252 |
> |
private static final long ONE_PARTY = 1L << PARTIES_SHIFT; |
253 |
> |
private static final long TERMINATION_BIT = 1L << 63; |
254 |
|
|
255 |
|
// The following unpacking methods are usually manually inlined |
256 |
|
|
295 |
|
} |
296 |
|
|
297 |
|
/** |
298 |
+ |
* Returns message string for bounds exceptions on arrival. |
299 |
+ |
*/ |
300 |
+ |
private String badArrive(long s) { |
301 |
+ |
return "Attempted arrival of unregistered party for " + |
302 |
+ |
stateToString(s); |
303 |
+ |
} |
304 |
+ |
|
305 |
+ |
/** |
306 |
+ |
* Returns message string for bounds exceptions on registration. |
307 |
+ |
*/ |
308 |
+ |
private String badRegister(long s) { |
309 |
+ |
return "Attempt to register more than " + |
310 |
+ |
MAX_PARTIES + " parties for " + stateToString(s); |
311 |
+ |
} |
312 |
+ |
|
313 |
+ |
/** |
314 |
|
* Main implementation for methods arrive and arriveAndDeregister. |
315 |
|
* Manually tuned to speed up and minimize race windows for the |
316 |
|
* common case of just decrementing unarrived field. |
322 |
|
private int doArrive(long adj) { |
323 |
|
for (;;) { |
324 |
|
long s = state; |
325 |
+ |
int unarrived = (int)s & UNARRIVED_MASK; |
326 |
|
int phase = (int)(s >>> PHASE_SHIFT); |
327 |
|
if (phase < 0) |
328 |
|
return phase; |
329 |
< |
int unarrived = (int)s & UNARRIVED_MASK; |
330 |
< |
if (unarrived == 0) |
331 |
< |
checkBadArrive(s); |
329 |
> |
else if (unarrived == 0) { |
330 |
> |
if (reconcileState() == s) // recheck |
331 |
> |
throw new IllegalStateException(badArrive(s)); |
332 |
> |
} |
333 |
|
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) { |
334 |
|
if (unarrived == 1) { |
335 |
|
long p = s & PARTIES_MASK; // unshifted parties field |
340 |
|
final Phaser parent = this.parent; |
341 |
|
if (parent == null) { |
342 |
|
if (onAdvance(phase, u)) |
343 |
< |
next |= TERMINATION_PHASE; // obliterate phase |
343 |
> |
next |= TERMINATION_BIT; |
344 |
|
UNSAFE.compareAndSwapLong(this, stateOffset, s, next); |
345 |
|
releaseWaiters(phase); |
346 |
|
} |
360 |
|
} |
361 |
|
|
362 |
|
/** |
343 |
– |
* Rechecks state and throws bounds exceptions on arrival -- called |
344 |
– |
* only if unarrived is apparently zero. |
345 |
– |
*/ |
346 |
– |
private void checkBadArrive(long s) { |
347 |
– |
if (reconcileState() == s) |
348 |
– |
throw new IllegalStateException |
349 |
– |
("Attempted arrival of unregistered party for " + |
350 |
– |
stateToString(s)); |
351 |
– |
} |
352 |
– |
|
353 |
– |
/** |
363 |
|
* Implementation of register, bulkRegister |
364 |
|
* |
365 |
|
* @param registrations number to add to both parties and |
375 |
|
int phase = (int)(s >>> PHASE_SHIFT); |
376 |
|
if (phase < 0) |
377 |
|
return phase; |
369 |
– |
else if (parties != 0 && ((int)s & UNARRIVED_MASK) == 0) |
370 |
– |
internalAwaitAdvance(phase, null); // wait for onAdvance |
378 |
|
else if (registrations > MAX_PARTIES - parties) |
379 |
|
throw new IllegalStateException(badRegister(s)); |
380 |
< |
else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) |
381 |
< |
return phase; |
380 |
> |
else if ((parties == 0 && parent == null) || // first reg of root |
381 |
> |
((int)s & UNARRIVED_MASK) != 0) { // not advancing |
382 |
> |
if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adj)) |
383 |
> |
return phase; |
384 |
> |
} |
385 |
> |
else if (parties != 0) // wait for onAdvance |
386 |
> |
internalAwaitAdvance(phase, null); |
387 |
> |
else { // 1st registration of child |
388 |
> |
synchronized(this) { // register parent first |
389 |
> |
if (reconcileState() == s) { // recheck under lock |
390 |
> |
parent.doRegister(1); // OK if throws IllegalState |
391 |
> |
for (;;) { // simpler form of outer loop |
392 |
> |
s = reconcileState(); |
393 |
> |
phase = (int)(s >>> PHASE_SHIFT); |
394 |
> |
if (phase < 0 || |
395 |
> |
UNSAFE.compareAndSwapLong(this, stateOffset, |
396 |
> |
s, s + adj)) |
397 |
> |
return phase; |
398 |
> |
} |
399 |
> |
} |
400 |
> |
} |
401 |
> |
} |
402 |
|
} |
403 |
|
} |
404 |
|
|
405 |
|
/** |
379 |
– |
* Returns message string for out of bounds exceptions on registration. |
380 |
– |
*/ |
381 |
– |
private String badRegister(long s) { |
382 |
– |
return "Attempt to register more than " + |
383 |
– |
MAX_PARTIES + " parties for " + stateToString(s); |
384 |
– |
} |
385 |
– |
|
386 |
– |
/** |
406 |
|
* Recursively resolves lagged phase propagation from root if necessary. |
407 |
|
*/ |
408 |
|
private long reconcileState() { |
419 |
|
long u = s & PARTIES_MASK; // reset unarrived to parties |
420 |
|
long next = ((((long) rPhase) << PHASE_SHIFT) | u | |
421 |
|
(u >>> PARTIES_SHIFT)); |
422 |
< |
if (state == s && |
404 |
< |
UNSAFE.compareAndSwapLong(this, stateOffset, |
405 |
< |
s, s = next)) |
406 |
< |
break; |
422 |
> |
UNSAFE.compareAndSwapLong(this, stateOffset, s, next); |
423 |
|
} |
424 |
|
s = state; |
425 |
|
} |
449 |
|
} |
450 |
|
|
451 |
|
/** |
452 |
+ |
* Creates a new phaser with the given parent, and without any |
453 |
+ |
* initially registered parties. Any thread using this phaser |
454 |
+ |
* will need to first register for it, at which point, if the |
455 |
+ |
* given parent is non-null, this phaser will also be registered |
456 |
+ |
* with the parent. |
457 |
+ |
* |
458 |
|
* Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}. |
459 |
|
* |
460 |
|
* @param parent the parent phaser |
465 |
|
|
466 |
|
/** |
467 |
|
* Creates a new phaser with the given parent and number of |
468 |
< |
* registered unarrived parties. If parent is non-null, this |
469 |
< |
* phaser is registered with the parent and its initial phase |
470 |
< |
* number is the same as that of parent phaser. If the number of |
449 |
< |
* parties is zero, the parent phaser will not proceed until this |
450 |
< |
* child phaser registers parties and advances, or this child |
451 |
< |
* phaser deregisters with its parent, or the parent is otherwise |
452 |
< |
* terminated. This child Phaser will be deregistered from its |
453 |
< |
* parent automatically upon any invocation of the child's {@link |
454 |
< |
* #arriveAndDeregister} method that results in the child's number |
455 |
< |
* of registered parties becoming zero. (Although rarely |
456 |
< |
* appropriate, this child may also explicity deregister from its |
457 |
< |
* parent using {@code getParent().arriveAndDeregister()}.) After |
458 |
< |
* deregistration, the child cannot re-register. (Instead, you can |
459 |
< |
* create a new child Phaser.) |
468 |
> |
* registered unarrived parties. If parent is non-null and |
469 |
> |
* the number of parties is non-zero, this phaser is registered |
470 |
> |
* with the parent. |
471 |
|
* |
472 |
|
* @param parent the parent phaser |
473 |
|
* @param parties the number of parties required to trip barrier |
484 |
|
this.root = r; |
485 |
|
this.evenQ = r.evenQ; |
486 |
|
this.oddQ = r.oddQ; |
487 |
< |
phase = parent.doRegister(1); |
487 |
> |
phase = (parties == 0) ? parent.getPhase() : parent.doRegister(1); |
488 |
|
} |
489 |
|
else { |
490 |
|
this.root = this; |
497 |
|
} |
498 |
|
|
499 |
|
/** |
500 |
< |
* Adds a new unarrived party to this phaser. |
501 |
< |
* If an ongoing invocation of {@link #onAdvance} is in progress, |
502 |
< |
* this method may wait until its completion before registering. |
500 |
> |
* Adds a new unarrived party to this phaser. If an ongoing |
501 |
> |
* invocation of {@link #onAdvance} is in progress, this method |
502 |
> |
* may wait until its completion before registering. If this |
503 |
> |
* phaser has a parent, and this phaser previously had no |
504 |
> |
* registered parties, this phaser is also registered with its |
505 |
> |
* parent. |
506 |
|
* |
507 |
|
* @return the arrival phase number to which this registration applied |
508 |
|
* @throws IllegalStateException if attempting to register more |
516 |
|
* Adds the given number of new unarrived parties to this phaser. |
517 |
|
* If an ongoing invocation of {@link #onAdvance} is in progress, |
518 |
|
* this method may wait until its completion before registering. |
519 |
+ |
* If this phaser has a parent, and the given number of parities |
520 |
+ |
* is greater than zero, and this phaser previously had no |
521 |
+ |
* registered parties, this phaser is also registered with its |
522 |
+ |
* parent. |
523 |
|
* |
524 |
|
* @param parties the number of additional parties required to trip barrier |
525 |
|
* @return the arrival phase number to which this registration applied |
530 |
|
public int bulkRegister(int parties) { |
531 |
|
if (parties < 0) |
532 |
|
throw new IllegalArgumentException(); |
533 |
< |
if (parties == 0) |
533 |
> |
else if (parties == 0) |
534 |
|
return getPhase(); |
535 |
|
return doRegister(parties); |
536 |
|
} |
537 |
|
|
538 |
|
/** |
539 |
|
* Arrives at the barrier, but does not wait for others. (You can |
540 |
< |
* in turn wait for others via {@link #awaitAdvance}). It is an |
541 |
< |
* unenforced usage error for an unregistered party to invoke this |
542 |
< |
* method. |
540 |
> |
* in turn wait for others via {@link #awaitAdvance}). It is a |
541 |
> |
* usage error for an unregistered party to invoke this |
542 |
> |
* method. However, it is possible that this error will result in |
543 |
> |
* an {code IllegalStateException} only when some <em>other</em> |
544 |
> |
* party arrives. |
545 |
|
* |
546 |
|
* @return the arrival phase number, or a negative value if terminated |
547 |
|
* @throws IllegalStateException if not terminated and the number |
557 |
|
* required to trip the barrier in future phases. If this phaser |
558 |
|
* has a parent, and deregistration causes this phaser to have |
559 |
|
* zero parties, this phaser also arrives at and is deregistered |
560 |
< |
* from its parent. It is an unenforced usage error for an |
561 |
< |
* unregistered party to invoke this method. |
560 |
> |
* from its parent. It is a usage error for an unregistered party |
561 |
> |
* to invoke this method. However, it is possible that this error |
562 |
> |
* will result in an {code IllegalStateException} only when some |
563 |
> |
* <em>other</em> party arrives. |
564 |
|
* |
565 |
|
* @return the arrival phase number, or a negative value if terminated |
566 |
|
* @throws IllegalStateException if not terminated and the number |
576 |
|
* interruption or timeout, you can arrange this with an analogous |
577 |
|
* construction using one of the other forms of the {@code |
578 |
|
* awaitAdvance} method. If instead you need to deregister upon |
579 |
< |
* arrival, use {@link #arriveAndDeregister}. It is an unenforced |
580 |
< |
* usage error for an unregistered party to invoke this method. |
579 |
> |
* arrival, use {@link #arriveAndDeregister}. It is a usage error |
580 |
> |
* for an unregistered party to invoke this method. However, it is |
581 |
> |
* possible that this error will result in an {code |
582 |
> |
* IllegalStateException} only when some <em>other</em> party |
583 |
> |
* arrives. |
584 |
|
* |
585 |
|
* @return the arrival phase number, or a negative number if terminated |
586 |
|
* @throws IllegalStateException if not terminated and the number |
693 |
|
long s; |
694 |
|
while ((s = root.state) >= 0) { |
695 |
|
if (UNSAFE.compareAndSwapLong(root, stateOffset, |
696 |
< |
s, s | TERMINATION_PHASE)) { |
696 |
> |
s, s | TERMINATION_BIT)) { |
697 |
|
releaseWaiters(0); // signal all threads |
698 |
|
releaseWaiters(1); |
699 |
|
return; |
791 |
|
* {@code onAdvance} is invoked only for its root Phaser on each |
792 |
|
* advance. |
793 |
|
* |
794 |
< |
* <p>The default version returns {@code true} when the number of |
795 |
< |
* registered parties is zero. Normally, overrides that arrange |
796 |
< |
* termination for other reasons should also preserve this |
797 |
< |
* property. |
794 |
> |
* <p>To support the most common use cases, the default |
795 |
> |
* implementation of this method returns {@code true} when the |
796 |
> |
* number of registered parties has become zero as the result of a |
797 |
> |
* party invoking {@code arriveAndDeregister}. You can disable |
798 |
> |
* this behavior, thus enabling continuation upon future |
799 |
> |
* registrations, by overriding this method to always return |
800 |
> |
* {@code false}: |
801 |
> |
* |
802 |
> |
* <pre> {@code |
803 |
> |
* Phaser phaser = new Phaser() { |
804 |
> |
* protected boolean onAdvance(int phase, int parties) { return false; } |
805 |
> |
* }}</pre> |
806 |
|
* |
807 |
|
* @param phase the phase number on entering the barrier |
808 |
|
* @param registeredParties the current number of registered parties |