118 |
|
* |
119 |
|
*/ |
120 |
|
|
121 |
< |
public class Semaphore implements java.io.Serializable { |
122 |
< |
/* |
123 |
< |
* The underlying algorithm here is a simplified adaptation of |
124 |
< |
* that used for ReentrantLock. See the internal documentation of |
125 |
< |
* lock package classes for detailed explanation. |
126 |
< |
*/ |
127 |
< |
|
121 |
> |
public class Semaphore extends AbstractQueuedSynchronizer implements java.io.Serializable { |
122 |
|
private static final long serialVersionUID = -3222578661600680210L; |
129 |
– |
|
130 |
– |
/** Node status value to indicate thread has cancelled */ |
131 |
– |
private static final int CANCELLED = 1; |
132 |
– |
/** Node status value to indicate successor needs unparking */ |
133 |
– |
private static final int SIGNAL = -1; |
134 |
– |
/** Node class for waiting threads */ |
135 |
– |
private static class Node { |
136 |
– |
volatile int status; |
137 |
– |
volatile Node prev; |
138 |
– |
volatile Node next; |
139 |
– |
Thread thread; |
140 |
– |
Node(Thread t) { thread = t; } |
141 |
– |
} |
142 |
– |
|
143 |
– |
/** Number of available permits held in a separate AtomicInteger */ |
144 |
– |
private final AtomicInteger perms; |
145 |
– |
/** Head of the wait queue, lazily initialized. */ |
146 |
– |
private transient volatile Node head; |
147 |
– |
/** Tail of the wait queue, lazily initialized. */ |
148 |
– |
private transient volatile Node tail; |
149 |
– |
/** true if barging disabled */ |
123 |
|
private final boolean fair; |
124 |
|
|
125 |
< |
// Atomic update support |
153 |
< |
|
154 |
< |
private static final |
155 |
< |
AtomicReferenceFieldUpdater<Semaphore, Node> tailUpdater = |
156 |
< |
AtomicReferenceFieldUpdater.<Semaphore, Node>newUpdater |
157 |
< |
(Semaphore.class, Node.class, "tail"); |
158 |
< |
private static final |
159 |
< |
AtomicReferenceFieldUpdater<Semaphore, Node> headUpdater = |
160 |
< |
AtomicReferenceFieldUpdater.<Semaphore, Node>newUpdater |
161 |
< |
(Semaphore.class, Node.class, "head"); |
162 |
< |
private static final |
163 |
< |
AtomicIntegerFieldUpdater<Node> statusUpdater = |
164 |
< |
AtomicIntegerFieldUpdater.<Node>newUpdater |
165 |
< |
(Node.class, "status"); |
166 |
< |
|
167 |
< |
|
168 |
< |
/** |
169 |
< |
* Insert node into queue, initializing head and tail if necessary. |
170 |
< |
* @param node the node to insert |
171 |
< |
*/ |
172 |
< |
private void enq(Node node) { |
173 |
< |
Node t = tail; |
174 |
< |
if (t == null) { // Must initialize first |
175 |
< |
Node h = new Node(null); |
176 |
< |
while ((t = tail) == null) { |
177 |
< |
if (headUpdater.compareAndSet(this, null, h)) |
178 |
< |
tail = h; |
179 |
< |
} |
180 |
< |
} |
125 |
> |
// Implement abstract methods |
126 |
|
|
127 |
< |
for (;;) { |
128 |
< |
node.prev = t; // Prev field must be valid before/upon CAS |
129 |
< |
if (tailUpdater.compareAndSet(this, t, node)) { |
130 |
< |
t.next = node; // Next field assignment lags CAS |
131 |
< |
return; |
187 |
< |
} |
188 |
< |
t = tail; |
189 |
< |
} |
190 |
< |
} |
191 |
< |
|
192 |
< |
/** |
193 |
< |
* Unblock the successor of node |
194 |
< |
* @param node the node |
195 |
< |
*/ |
196 |
< |
private void unparkSuccessor(Node node) { |
197 |
< |
statusUpdater.compareAndSet(node, SIGNAL, 0); |
198 |
< |
Node s = node.next; |
199 |
< |
if (s == null || s.status == CANCELLED) { |
200 |
< |
s = tail; |
201 |
< |
if (s != null && s != node) { |
202 |
< |
Node p = s.prev; |
203 |
< |
while (p != null && p != node) { |
204 |
< |
if (p.status != CANCELLED) |
205 |
< |
s = p; |
206 |
< |
p = p.prev; |
207 |
< |
} |
208 |
< |
} |
209 |
< |
} |
210 |
< |
if (s != null && s != node) |
211 |
< |
LockSupport.unpark(s.thread); |
212 |
< |
} |
213 |
< |
|
214 |
< |
|
215 |
< |
/** |
216 |
< |
* Internal version of tryAcquire returning number of remaining |
217 |
< |
* permits, which is nonnegative only if the acquire succeeded. |
218 |
< |
* @param permits requested number of permits |
219 |
< |
* @return remaining number of permits |
220 |
< |
*/ |
221 |
< |
private int doTryAcquire(int permits) { |
222 |
< |
final AtomicInteger perms = this.perms; |
127 |
> |
protected int acquireSharedState(boolean isQueued, int acquires, |
128 |
> |
Thread current) { |
129 |
> |
final AtomicInteger perms = getState(); |
130 |
> |
if (!isQueued && fair && hasWaiters()) |
131 |
> |
return -1; |
132 |
|
for (;;) { |
133 |
|
int available = perms.get(); |
134 |
< |
int remaining = available - permits; |
134 |
> |
int remaining = available - acquires; |
135 |
|
if (remaining < 0 || |
136 |
|
perms.compareAndSet(available, remaining)) |
137 |
|
return remaining; |
138 |
|
} |
139 |
|
} |
140 |
< |
|
141 |
< |
/** |
142 |
< |
* Main code for untimed acquires. |
234 |
< |
* @param permits number of permits requested |
235 |
< |
* @param interrupts interrupt control: -1 for abort on interrupt, |
236 |
< |
* 0 for continue on interrupt |
237 |
< |
* @return true if lock acquired (can be false only if interruptible) |
238 |
< |
*/ |
239 |
< |
private boolean doAcquire(int permits, int interrupts) { |
240 |
< |
// Fast path bypasses queue |
241 |
< |
if ((!fair || head == tail) && doTryAcquire(permits) >= 0) |
242 |
< |
return true; |
243 |
< |
Thread current = Thread.currentThread(); |
244 |
< |
Node node = new Node(current); |
245 |
< |
// Retry fast path before enqueuing |
246 |
< |
if (!fair && doTryAcquire(permits) >= 0) |
247 |
< |
return true; |
248 |
< |
enq(node); |
249 |
< |
|
140 |
> |
|
141 |
> |
protected boolean releaseSharedState(int releases) { |
142 |
> |
final AtomicInteger perms = getState(); |
143 |
|
for (;;) { |
144 |
< |
Node p = node.prev; |
145 |
< |
if (p == head) { |
146 |
< |
int remaining = doTryAcquire(permits); |
254 |
< |
if (remaining >= 0) { |
255 |
< |
p.next = null; |
256 |
< |
node.thread = null; |
257 |
< |
node.prev = null; |
258 |
< |
head = node; |
259 |
< |
// if still some permits left, wake up successor |
260 |
< |
if (remaining > 0 && node.status < 0) |
261 |
< |
unparkSuccessor(node); |
262 |
< |
if (interrupts > 0) // Re-interrupt on normal exit |
263 |
< |
current.interrupt(); |
264 |
< |
return true; |
265 |
< |
} |
266 |
< |
} |
267 |
< |
int status = p.status; |
268 |
< |
if (status == 0) |
269 |
< |
statusUpdater.compareAndSet(p, 0, SIGNAL); |
270 |
< |
else if (status == CANCELLED) |
271 |
< |
node.prev = p.prev; |
272 |
< |
else { |
273 |
< |
assert (status == SIGNAL); |
274 |
< |
LockSupport.park(); |
275 |
< |
if (Thread.interrupted()) { |
276 |
< |
if (interrupts < 0) { |
277 |
< |
node.thread = null; |
278 |
< |
node.status = CANCELLED; |
279 |
< |
unparkSuccessor(node); |
280 |
< |
return false; |
281 |
< |
} |
282 |
< |
interrupts = 1; // set to re-interrupt on exit |
283 |
< |
} |
284 |
< |
} |
144 |
> |
int p = perms.get(); |
145 |
> |
if (perms.compareAndSet(p, p + releases)) |
146 |
> |
return true; |
147 |
|
} |
148 |
|
} |
149 |
|
|
150 |
< |
/** |
151 |
< |
* Main code for timed acquires. Same as doAcquire but with |
152 |
< |
* interspersed time checks. |
153 |
< |
* @param permits number of permits requested |
292 |
< |
* @param nanos timeout in nanosecs |
293 |
< |
* @return true if lock acquired |
294 |
< |
*/ |
295 |
< |
private boolean doTimedAcquire(int permits, long nanos) throws InterruptedException { |
296 |
< |
if ((!fair || head == tail) && doTryAcquire(permits) >= 0) |
297 |
< |
return true; |
298 |
< |
Thread current = Thread.currentThread(); |
299 |
< |
long lastTime = System.nanoTime(); |
300 |
< |
Node node = new Node(current); |
301 |
< |
// Retry fast path before enqueuing |
302 |
< |
if (!fair && doTryAcquire(permits) >= 0) |
303 |
< |
return true; |
304 |
< |
enq(node); |
150 |
> |
protected int acquireExclusiveState(boolean isQueued, int acquires, |
151 |
> |
Thread current) { |
152 |
> |
throw new UnsupportedOperationException(); |
153 |
> |
} |
154 |
|
|
155 |
< |
for (;;) { |
156 |
< |
Node p = node.prev; |
308 |
< |
if (p == head) { |
309 |
< |
int remaining = doTryAcquire(permits); |
310 |
< |
if (remaining >= 0) { |
311 |
< |
p.next = null; |
312 |
< |
node.thread = null; |
313 |
< |
node.prev = null; |
314 |
< |
head = node; |
315 |
< |
if (remaining > 0 && node.status < 0) |
316 |
< |
unparkSuccessor(node); |
317 |
< |
return true; |
318 |
< |
} |
319 |
< |
} |
320 |
< |
if (nanos <= 0L) { |
321 |
< |
node.thread = null; |
322 |
< |
node.status = CANCELLED; |
323 |
< |
unparkSuccessor(node); |
324 |
< |
return false; |
325 |
< |
} |
326 |
< |
|
327 |
< |
int status = p.status; |
328 |
< |
if (status == 0) |
329 |
< |
statusUpdater.compareAndSet(p, 0, SIGNAL); |
330 |
< |
else if (status == CANCELLED) |
331 |
< |
node.prev = p.prev; |
332 |
< |
else { |
333 |
< |
LockSupport.parkNanos(nanos); |
334 |
< |
if (Thread.interrupted()) { |
335 |
< |
node.thread = null; |
336 |
< |
node.status = CANCELLED; |
337 |
< |
unparkSuccessor(node); |
338 |
< |
throw new InterruptedException(); |
339 |
< |
} |
340 |
< |
long now = System.nanoTime(); |
341 |
< |
nanos -= now - lastTime; |
342 |
< |
lastTime = now; |
343 |
< |
} |
344 |
< |
} |
155 |
> |
protected boolean releaseExclusiveState(int releases) { |
156 |
> |
throw new UnsupportedOperationException(); |
157 |
|
} |
158 |
|
|
159 |
< |
/** |
160 |
< |
* Internal version of release |
349 |
< |
*/ |
350 |
< |
private void doRelease(int permits) { |
351 |
< |
final AtomicInteger perms = this.perms; |
352 |
< |
for (;;) { |
353 |
< |
int p = perms.get(); |
354 |
< |
if (perms.compareAndSet(p, p + permits)) { |
355 |
< |
Node h = head; |
356 |
< |
if (h != null && h.status < 0) |
357 |
< |
unparkSuccessor(h); |
358 |
< |
return; |
359 |
< |
} |
360 |
< |
} |
159 |
> |
protected void checkConditionAccess(Thread thread, boolean waiting) { |
160 |
> |
throw new UnsupportedOperationException(); |
161 |
|
} |
162 |
|
|
163 |
|
/** |
171 |
|
*/ |
172 |
|
public Semaphore(int permits, boolean fair) { |
173 |
|
this.fair = fair; |
174 |
< |
perms = new AtomicInteger(permits); |
174 |
> |
getState().set(permits); |
175 |
|
} |
176 |
|
|
177 |
|
/** |
204 |
|
* @see Thread#interrupt |
205 |
|
*/ |
206 |
|
public void acquire() throws InterruptedException { |
207 |
< |
if (Thread.interrupted() || !doAcquire(1, -1)) |
408 |
< |
throw new InterruptedException(); |
207 |
> |
acquireSharedInterruptibly(1); |
208 |
|
} |
209 |
|
|
210 |
|
/** |
227 |
|
* |
228 |
|
*/ |
229 |
|
public void acquireUninterruptibly() { |
230 |
< |
doAcquire(1, 0); |
230 |
> |
acquireSharedUninterruptibly(1); |
231 |
|
} |
232 |
|
|
233 |
|
/** |
244 |
|
* otherwise. |
245 |
|
*/ |
246 |
|
public boolean tryAcquire() { |
247 |
< |
return doTryAcquire(1) >= 0; |
247 |
> |
return acquireSharedState(false, 1, null) >= 0; |
248 |
|
} |
249 |
|
|
250 |
|
/** |
290 |
|
*/ |
291 |
|
public boolean tryAcquire(long timeout, TimeUnit unit) |
292 |
|
throws InterruptedException { |
293 |
< |
if (unit == null) |
495 |
< |
throw new NullPointerException(); |
496 |
< |
if (Thread.interrupted()) |
497 |
< |
throw new InterruptedException(); |
498 |
< |
return doTimedAcquire(1, unit.toNanos(timeout)); |
293 |
> |
return acquireSharedTimed(1, unit.toNanos(timeout)); |
294 |
|
} |
295 |
|
|
296 |
|
/** |
306 |
|
* in the application. |
307 |
|
*/ |
308 |
|
public void release() { |
309 |
< |
doRelease(1); |
309 |
> |
releaseShared(1); |
310 |
|
} |
311 |
|
|
312 |
|
/** |
350 |
|
*/ |
351 |
|
public void acquire(int permits) throws InterruptedException { |
352 |
|
if (permits < 0) throw new IllegalArgumentException(); |
353 |
< |
if (Thread.interrupted() || !doAcquire(permits, -1)) |
559 |
< |
throw new InterruptedException(); |
353 |
> |
acquireSharedInterruptibly(permits); |
354 |
|
} |
355 |
|
|
356 |
|
/** |
379 |
|
*/ |
380 |
|
public void acquireUninterruptibly(int permits) { |
381 |
|
if (permits < 0) throw new IllegalArgumentException(); |
382 |
< |
doAcquire(permits, 0); |
382 |
> |
acquireSharedUninterruptibly(permits); |
383 |
|
} |
384 |
|
|
385 |
|
/** |
401 |
|
*/ |
402 |
|
public boolean tryAcquire(int permits) { |
403 |
|
if (permits < 0) throw new IllegalArgumentException(); |
404 |
< |
return doTryAcquire(permits) >= 0; |
404 |
> |
return acquireSharedState(false, permits, null) >= 0; |
405 |
|
} |
406 |
|
|
407 |
|
/** |
458 |
|
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) |
459 |
|
throws InterruptedException { |
460 |
|
if (permits < 0) throw new IllegalArgumentException(); |
461 |
< |
if (unit == null) |
668 |
< |
throw new NullPointerException(); |
669 |
< |
if (Thread.interrupted()) |
670 |
< |
throw new InterruptedException(); |
671 |
< |
return doTimedAcquire(permits, unit.toNanos(timeout)); |
461 |
> |
return acquireSharedTimed(permits, unit.toNanos(timeout)); |
462 |
|
} |
463 |
|
|
464 |
|
|
488 |
|
*/ |
489 |
|
public void release(int permits) { |
490 |
|
if (permits < 0) throw new IllegalArgumentException(); |
491 |
< |
doRelease(permits); |
491 |
> |
releaseShared(permits); |
492 |
|
} |
493 |
|
|
494 |
|
/** |
497 |
|
* @return the number of permits available in this semaphore. |
498 |
|
*/ |
499 |
|
public int availablePermits() { |
500 |
< |
return perms.get(); |
500 |
> |
return getState().get(); |
501 |
|
} |
502 |
|
|
503 |
|
/** |
512 |
|
*/ |
513 |
|
protected void reducePermits(int reduction) { |
514 |
|
if (reduction < 0) throw new IllegalArgumentException(); |
515 |
< |
perms.getAndAdd(-reduction); |
515 |
> |
getState().getAndAdd(-reduction); |
516 |
|
} |
517 |
|
|
518 |
|
/** |
523 |
|
return fair; |
524 |
|
} |
525 |
|
|
736 |
– |
/** |
737 |
– |
* Returns an estimate of the number of threads waiting to acquire |
738 |
– |
* a permit. The value is only an estimate because the number of |
739 |
– |
* threads may change dynamically while this method traverses |
740 |
– |
* internal data structures. This method is designed for use in |
741 |
– |
* monitoring of the system state, not for synchronization |
742 |
– |
* control. |
743 |
– |
* @return the estimated number of threads waiting for a permit |
744 |
– |
*/ |
745 |
– |
public int getQueueLength() { |
746 |
– |
int n = 0; |
747 |
– |
for (Node p = tail; p != null && p != head; p = p.prev) |
748 |
– |
++n; |
749 |
– |
return n; |
750 |
– |
} |
751 |
– |
|
752 |
– |
/** |
753 |
– |
* Returns a collection containing threads that may be waiting to |
754 |
– |
* acquire a permit. Because the actual set of threads may |
755 |
– |
* change dynamically while constructing this result, the returned |
756 |
– |
* collection is only a best-effort estimate. The elements of the |
757 |
– |
* returned collection are in no particular order. This method is |
758 |
– |
* designed to facilitate construction of subclasses that provide |
759 |
– |
* more extensive monitoring facilities. |
760 |
– |
* @return the collection of threads |
761 |
– |
*/ |
762 |
– |
protected Collection<Thread> getQueuedThreads() { |
763 |
– |
ArrayList<Thread> list = new ArrayList<Thread>(); |
764 |
– |
for (Node p = tail; p != null; p = p.prev) { |
765 |
– |
Thread t = p.thread; |
766 |
– |
if (t != null) |
767 |
– |
list.add(t); |
768 |
– |
} |
769 |
– |
return list; |
770 |
– |
} |
771 |
– |
|
526 |
|
} |