ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.6
Committed: Fri Jul 11 13:12:06 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2, JSR166_CR1
Changes since 1.5: +298 -46 lines
Log Message:
Fair semaphores actually fair now.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * A semaphore guaranteeing that threads invoking any of the {@link
12 * #acquire() acquire} methods are allocated permits in the order in
13 * which their invocation of those methods was processed
14 * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15 * applies to specific internal points of execution within these
16 * methods. So, it is possible for one thread to invoke
17 * <tt>acquire</tt> before another, but reach the ordering point after
18 * the other, and similarly upon return from the method.
19 *
20 * <p>Because permits are allocated in-order, this class also provides
21 * convenience methods to {@link #acquire(long) acquire} and {@link
22 * #release(long) release} multiple permits at a time.
23 *
24 * @since 1.5
25 * @spec JSR-166
26 * @revised $Date: 2003/07/09 23:23:17 $
27 * @editor $Author: dl $
28 * @author Doug Lea
29 *
30 */
31 public class FairSemaphore extends Semaphore {
32
33 /*
34 * Basic algorithm is a variant of the one described
35 * in CPJ book 2nd edition, section 3.7.3
36 */
37
38 /**
39 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
40 */
41 private static class Node extends ReentrantLock {
42 /** The condition to wait on */
43 Condition done = newCondition();
44 /** True if interrupted before released */
45 boolean cancelled;
46 /** Number of permits remaining until can release */
47 long permitsNeeded;
48 /** Next node of queue */
49 Node next;
50
51 Node(long n) { permitsNeeded = n; }
52 }
53
54 /** Head of the wait queue */
55 private Node head;
56
57 /** Pointer to last node of the wait queue */
58 private Node last;
59
60 /** Add a new node to wait queue with given number of permits needed */
61 private Node enq(long n) {
62 Node p = new Node(n);
63 if (last == null)
64 last = head = p;
65 else
66 last = last.next = p;
67 return p;
68 }
69
70 /** Remove first node from wait queue */
71 private void removeFirst() {
72 Node p = head;
73 assert p != null;
74 if (p != null && (head = p.next) == null)
75 last = null;
76 }
77
78 /**
79 * Construct a <tt>FairSemaphore</tt> with the given number of
80 * permits.
81 * @param permits the initial number of permits available
82 */
83 public FairSemaphore(long permits) {
84 super(permits, new ReentrantLock(true));
85 }
86
87 /**
88 * Acquires a permit from this semaphore, blocking until one is
89 * available, or the thread is {@link Thread#interrupt interrupted}.
90 *
91 * <p>Acquires a permit, if one is available and returns immediately,
92 * reducing the number of available permits by one.
93 * <p>If no permit is available then the current thread becomes
94 * disabled for thread scheduling purposes and lies dormant until
95 * one of two things happens:
96 * <ul>
97 * <li>Some other thread invokes the {@link #release} method for this
98 * semaphore and the current thread happens to be chosen as the
99 * thread to receive the permit; or
100 * <li>Some other thread {@link Thread#interrupt interrupts} the current
101 * thread.
102 * </ul>
103 *
104 * <p>If the current thread:
105 * <ul>
106 * <li>has its interrupted status set on entry to this method; or
107 * <li>is {@link Thread#interrupt interrupted} while waiting
108 * for a permit,
109 * </ul>
110 * then {@link InterruptedException} is thrown and the current thread's
111 * interrupted status is cleared.
112 *
113 * @throws InterruptedException if the current thread is interrupted
114 *
115 * @see Thread#interrupt
116 */
117 public void acquire() throws InterruptedException {
118 acquire(1L);
119 }
120
121 /**
122 * Acquires a permit from this semaphore, blocking until one is
123 * available.
124 *
125 * <p>Acquires a permit, if one is available and returns immediately,
126 * reducing the number of available permits by one.
127 * <p>If no permit is available then the current thread becomes
128 * disabled for thread scheduling purposes and lies dormant until
129 * some other thread invokes the {@link #release} method for this
130 * semaphore and the current thread happens to be chosen as the
131 * thread to receive the permit.
132 *
133 * <p>If the current thread
134 * is {@link Thread#interrupt interrupted} while waiting
135 * for a permit then it will continue to wait, but the time at which
136 * the thread is assigned a permit may change compared to the time it
137 * would have received the permit had no interruption occurred. When the
138 * thread does return from this method its interrupt status will be set.
139 *
140 */
141 public void acquireUninterruptibly() {
142 acquireUninterruptibly(1L);
143 }
144
145 /**
146 * Acquires a permit from this semaphore, only if one is available at the
147 * time of invocation.
148 * <p>Acquires a permit, if one is available and returns immediately,
149 * with the value <tt>true</tt>,
150 * reducing the number of available permits by one.
151 *
152 * <p>If no permit is available then this method will return
153 * immediately with the value <tt>false</tt>.
154 *
155 * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
156 * otherwise.
157 */
158 public boolean tryAcquire() {
159 return tryAcquire(1L);
160 }
161
162 /**
163 * Acquires a permit from this semaphore, if one becomes available
164 * within the given waiting time and the
165 * current thread has not been {@link Thread#interrupt interrupted}.
166 * <p>Acquires a permit, if one is available and returns immediately,
167 * with the value <tt>true</tt>,
168 * reducing the number of available permits by one.
169 * <p>If no permit is available then
170 * the current thread becomes disabled for thread scheduling
171 * purposes and lies dormant until one of three things happens:
172 * <ul>
173 * <li>Some other thread invokes the {@link #release} method for this
174 * semaphore and the current thread happens to be chosen as the
175 * thread to receive the permit; or
176 * <li>Some other thread {@link Thread#interrupt interrupts} the current
177 * thread; or
178 * <li>The specified waiting time elapses.
179 * </ul>
180 * <p>If a permit is acquired then the value <tt>true</tt> is returned.
181 * <p>If the current thread:
182 * <ul>
183 * <li>has its interrupted status set on entry to this method; or
184 * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
185 * a permit,
186 * </ul>
187 * then {@link InterruptedException} is thrown and the current thread's
188 * interrupted status is cleared.
189 * <p>If the specified waiting time elapses then the value <tt>false</tt>
190 * is returned.
191 * The given waiting time is a best-effort lower bound. If the time is
192 * less than or equal to zero, the method will not wait at all.
193 *
194 * @param timeout the maximum time to wait for a permit
195 * @param unit the time unit of the <tt>timeout</tt> argument.
196 * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
197 * if the waiting time elapsed before a permit was acquired.
198 *
199 * @throws InterruptedException if the current thread is interrupted
200 *
201 * @see Thread#interrupt
202 *
203 */
204 public boolean tryAcquire(long timeout, TimeUnit unit)
205 throws InterruptedException {
206 return tryAcquire(1L, timeout, unit);
207 }
208
209 /**
210 * Releases a permit, returning it to the semaphore.
211 * <p>Releases a permit, increasing the number of available permits
212 * by one.
213 * If any threads are blocking trying to acquire a permit, then one
214 * is selected and given the permit that was just released.
215 * That thread is re-enabled for thread scheduling purposes.
216 * <p>There is no requirement that a thread that releases a permit must
217 * have acquired that permit by calling {@link #acquire}.
218 * Correct usage of a semaphore is established by programming convention
219 * in the application.
220 */
221 public void release() {
222 release(1L);
223 }
224
225
226 /**
227 * Acquires the given number of permits from this semaphore,
228 * blocking until all are available,
229 * or the thread is {@link Thread#interrupt interrupted}.
230 *
231 * <p>Acquires the given number of permits, if they are available,
232 * and returns immediately,
233 * reducing the number of available permits by the given amount.
234 *
235 * <p>If insufficient permits are available then the current thread becomes
236 * disabled for thread scheduling purposes and lies dormant until
237 * one of two things happens:
238 * <ul>
239 * <li>Some other thread invokes one of the {@link #release() release}
240 * methods for this semaphore, the current thread is next to be assigned
241 * permits and the number of available permits satisfies this request; or
242 * <li>Some other thread {@link Thread#interrupt interrupts} the current
243 * thread.
244 * </ul>
245 *
246 * <p>If the current thread:
247 * <ul>
248 * <li>has its interrupted status set on entry to this method; or
249 * <li>is {@link Thread#interrupt interrupted} while waiting
250 * for a permit,
251 * </ul>
252 * then {@link InterruptedException} is thrown and the current thread's
253 * interrupted status is cleared.
254 * Any available permits are assigned to the next waiting thread as if
255 * they had been made available by a call to {@link #release()}.
256 *
257 * @param permits the number of permits to acquire
258 *
259 * @throws InterruptedException if the current thread is interrupted
260 * @throws IllegalArgumentException if permits less than zero.
261 *
262 * @see Thread#interrupt
263 */
264 public void acquire(long permits) throws InterruptedException {
265 if (permits < 0) throw new IllegalArgumentException();
266 Node node = null;
267 lock.lockInterruptibly();
268 try {
269 long remaining = count - permits;
270 if (remaining >= 0) {
271 count = remaining;
272 return;
273 }
274 count = 0;
275 node = enq(-remaining);
276 }
277 finally {
278 lock.unlock();
279 }
280
281 node.lock();
282 try {
283 while (node.permitsNeeded > 0)
284 node.done.await();
285 }
286 catch(InterruptedException ie) {
287 if (node.permitsNeeded > 0) {
288 node.cancelled = true;
289 release(permits - node.permitsNeeded);
290 throw ie;
291 }
292 else { // ignore interrupt
293 Thread.currentThread().interrupt();
294 }
295 }
296 finally {
297 node.unlock();
298 }
299 }
300
301 /**
302 * Acquires the given number of permits from this semaphore,
303 * blocking until all are available.
304 *
305 * <p>Acquires the given number of permits, if they are available,
306 * and returns immediately,
307 * reducing the number of available permits by the given amount.
308 *
309 * <p>If insufficient permits are available then the current thread becomes
310 * disabled for thread scheduling purposes and lies dormant until
311 * some other thread invokes one of the {@link #release() release}
312 * methods for this semaphore, the current thread is next to be assigned
313 * permits and the number of available permits satisfies this request.
314 *
315 * <p>If the current thread
316 * is {@link Thread#interrupt interrupted} while waiting
317 * for permits then it will continue to wait and its position in the
318 * queue is not affected. When the
319 * thread does return from this method its interrupt status will be set.
320 *
321 * @param permits the number of permits to acquire
322 * @throws IllegalArgumentException if permits less than zero.
323 *
324 */
325 public void acquireUninterruptibly(long permits) {
326 if (permits < 0) throw new IllegalArgumentException();
327 Node node = null;
328 lock.lock();
329 try {
330 long remaining = count - permits;
331 if (remaining >= 0) {
332 count = remaining;
333 return;
334 }
335 count = 0;
336 node = enq(-remaining);
337 }
338 finally {
339 lock.unlock();
340 }
341
342 node.lock();
343 try {
344 while (node.permitsNeeded > 0)
345 node.done.awaitUninterruptibly();
346 }
347 finally {
348 node.unlock();
349 }
350 }
351
352 /**
353 * Acquires the given number of permits from this semaphore, only if
354 * all are available at the time of invocation.
355 * <p>Acquires the given number of permits, if they are available, and
356 * returns immediately, with the value <tt>true</tt>,
357 * reducing the number of available permits by the given amount.
358 *
359 * <p>If insufficient permits are available then this method will return
360 * immediately with the value <tt>false</tt> and the number of available
361 * permits is unchanged.
362 *
363 * @param permits the number of permits to acquire
364 *
365 * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
366 * otherwise.
367 * @throws IllegalArgumentException if permits less than zero.
368 */
369 public boolean tryAcquire(long permits) {
370 if (permits < 0) throw new IllegalArgumentException();
371 lock.lock();
372 try {
373 long remaining = count - permits;
374 if (remaining >= 0) {
375 count = remaining;
376 return true;
377 }
378 return false;
379 }
380 finally {
381 lock.unlock();
382 }
383 }
384
385 /**
386 * Acquires the given number of permits from this semaphore, if all
387 * become available within the given waiting time and the
388 * current thread has not been {@link Thread#interrupt interrupted}.
389 * <p>Acquires the given number of permits, if they are available and
390 * returns immediately, with the value <tt>true</tt>,
391 * reducing the number of available permits by the given amount.
392 * <p>If insufficient permits are available then
393 * the current thread becomes disabled for thread scheduling
394 * purposes and lies dormant until one of three things happens:
395 * <ul>
396 * <li>Some other thread invokes one of the {@link #release() release}
397 * methods for this semaphore, the current thread is next to be assigned
398 * permits and the number of available permits satisfies this request; or
399 * <li>Some other thread {@link Thread#interrupt interrupts} the current
400 * thread; or
401 * <li>The specified waiting time elapses.
402 * </ul>
403 * <p>If the permits are acquired then the value <tt>true</tt> is returned.
404 * <p>If the current thread:
405 * <ul>
406 * <li>has its interrupted status set on entry to this method; or
407 * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
408 * the permits,
409 * </ul>
410 * then {@link InterruptedException} is thrown and the current thread's
411 * interrupted status is cleared.
412 * Any available permits are assigned to the next waiting thread as if
413 * they had been made available by a call to {@link #release()}.
414 *
415 * <p>If the specified waiting time elapses then the value <tt>false</tt>
416 * is returned.
417 * The given waiting time is a best-effort lower bound. If the time is
418 * less than or equal to zero, the method will not wait at all.
419 * Any available permits are assigned to the next waiting thread as if
420 * they had been made available by a call to {@link #release()}.
421 *
422 * @param permits the number of permits to acquire
423 * @param timeout the maximum time to wait for the permits
424 * @param unit the time unit of the <tt>timeout</tt> argument.
425 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
426 * if the waiting time elapsed before all permits were acquired.
427 *
428 * @throws InterruptedException if the current thread is interrupted
429 * @throws IllegalArgumentException if permits less than zero.
430 *
431 * @see Thread#interrupt
432 *
433 */
434 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
435 throws InterruptedException {
436 if (permits < 0) throw new IllegalArgumentException();
437 Node node = null;
438 long nanos = unit.toNanos(timeout);
439 lock.lockInterruptibly();
440 try {
441 long remaining = count - permits;
442 if (remaining >= 0) {
443 count = remaining;
444 return true;
445 }
446 if (nanos <= 0)
447 return false;
448 count = 0;
449 node = enq(-remaining);
450 }
451 finally {
452 lock.unlock();
453 }
454
455 node.lock();
456 try {
457 while (node.permitsNeeded > 0) {
458 nanos = node.done.awaitNanos(nanos);
459 if (nanos <= 0) {
460 release(permits - node.permitsNeeded);
461 return false;
462 }
463 }
464 return true;
465 }
466 catch(InterruptedException ie) {
467 if (node.permitsNeeded > 0) {
468 node.cancelled = true;
469 release(permits - node.permitsNeeded);
470 throw ie;
471 }
472 else { // ignore interrupt
473 Thread.currentThread().interrupt();
474 return true;
475 }
476 }
477 finally {
478 node.unlock();
479 }
480 }
481
482
483 /**
484 * Releases the given number of permits, returning them to the semaphore.
485 * <p>Releases the given number of permits, increasing the number of
486 * available permits by that amount.
487 * If any threads are blocking trying to acquire permits, then the
488 * one that has been waiting the longest
489 * is selected and given the permits that were just released.
490 * If the number of available permits satisfies that thread's request
491 * then that thread is re-enabled for thread scheduling purposes; otherwise
492 * the thread continues to wait. If there are still permits available
493 * after the first thread's request has been satisfied, then those permits
494 * are assigned to the next waiting thread. If it is satisfied then it is
495 * re-enabled for thread scheduling purposes. This continues until there
496 * are insufficient permits to satisfy the next waiting thread, or there
497 * are no more waiting threads.
498 *
499 * <p>There is no requirement that a thread that releases a permit must
500 * have acquired that permit by calling {@link Semaphore#acquire acquire}.
501 * Correct usage of a semaphore is established by programming convention
502 * in the application.
503 *
504 * @param permits the number of permits to release
505 * @throws IllegalArgumentException if permits less than zero.
506 */
507 public void release(long permits) {
508 if (permits < 0) throw new IllegalArgumentException();
509 if (permits == 0)
510 return;
511 // Even when using fair locks, releases should try to barge in.
512 if (!lock.tryLock())
513 lock.lock();
514 try {
515 long p = permits;
516 while (p > 0) {
517 Node node = head;
518 if (node == null) {
519 count += p;
520 p = 0;
521 }
522 else {
523 node.lock();
524 try {
525 if (node.cancelled)
526 removeFirst();
527 else if (node.permitsNeeded > p) {
528 node.permitsNeeded -= p;
529 p = 0;
530 }
531 else {
532 p -= node.permitsNeeded;
533 node.permitsNeeded = 0;
534 node.done.signal();
535 removeFirst();
536 }
537 }
538 finally {
539 node.unlock();
540 }
541 }
542 }
543 }
544 finally {
545 lock.unlock();
546 }
547 }
548 }