ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
Revision: 1.7
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.6: +16 -31 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9
10 /**
11 * A semaphore guaranteeing that threads invoking any of the {@link
12 * #acquire() acquire} methods are allocated permits in the order in
13 * which their invocation of those methods was processed
14 * (first-in-first-out; FIFO). Note that FIFO ordering necessarily
15 * applies to specific internal points of execution within these
16 * methods. So, it is possible for one thread to invoke
17 * <tt>acquire</tt> before another, but reach the ordering point after
18 * the other, and similarly upon return from the method.
19 *
20 * <p>Because permits are allocated in-order, this class also provides
21 * convenience methods to {@link #acquire(long) acquire} and {@link
22 * #release(long) release} multiple permits at a time.
23 *
24 * @since 1.5
25 * @spec JSR-166
26 * @revised $Date: 2003/07/11 13:12:06 $
27 * @editor $Author: dl $
28 * @author Doug Lea
29 *
30 */
31 public class FairSemaphore extends Semaphore {
32
33 /*
34 * Basic algorithm is a variant of the one described
35 * in CPJ book 2nd edition, section 3.7.3
36 */
37
38 /**
39 * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
40 */
41 private static class Node extends ReentrantLock {
42 /** The condition to wait on */
43 Condition done = newCondition();
44 /** True if interrupted before released */
45 boolean cancelled;
46 /** Number of permits remaining until can release */
47 long permitsNeeded;
48 /** Next node of queue */
49 Node next;
50
51 Node(long n) { permitsNeeded = n; }
52 }
53
54 /** Head of the wait queue */
55 private Node head;
56
57 /** Pointer to last node of the wait queue */
58 private Node last;
59
60 /** Add a new node to wait queue with given number of permits needed */
61 private Node enq(long n) {
62 Node p = new Node(n);
63 if (last == null)
64 last = head = p;
65 else
66 last = last.next = p;
67 return p;
68 }
69
70 /** Remove first node from wait queue */
71 private void removeFirst() {
72 Node p = head;
73 assert p != null;
74 if (p != null && (head = p.next) == null)
75 last = null;
76 }
77
78 /**
79 * Construct a <tt>FairSemaphore</tt> with the given number of
80 * permits.
81 * @param permits the initial number of permits available
82 */
83 public FairSemaphore(long permits) {
84 super(permits, new ReentrantLock(true));
85 }
86
87 /**
88 * Acquires a permit from this semaphore, blocking until one is
89 * available, or the thread is {@link Thread#interrupt interrupted}.
90 *
91 * <p>Acquires a permit, if one is available and returns immediately,
92 * reducing the number of available permits by one.
93 * <p>If no permit is available then the current thread becomes
94 * disabled for thread scheduling purposes and lies dormant until
95 * one of two things happens:
96 * <ul>
97 * <li>Some other thread invokes the {@link #release} method for this
98 * semaphore and the current thread happens to be chosen as the
99 * thread to receive the permit; or
100 * <li>Some other thread {@link Thread#interrupt interrupts} the current
101 * thread.
102 * </ul>
103 *
104 * <p>If the current thread:
105 * <ul>
106 * <li>has its interrupted status set on entry to this method; or
107 * <li>is {@link Thread#interrupt interrupted} while waiting
108 * for a permit,
109 * </ul>
110 * then {@link InterruptedException} is thrown and the current thread's
111 * interrupted status is cleared.
112 *
113 * @throws InterruptedException if the current thread is interrupted
114 *
115 * @see Thread#interrupt
116 */
117 public void acquire() throws InterruptedException {
118 acquire(1L);
119 }
120
121 /**
122 * Acquires a permit from this semaphore, blocking until one is
123 * available.
124 *
125 * <p>Acquires a permit, if one is available and returns immediately,
126 * reducing the number of available permits by one.
127 * <p>If no permit is available then the current thread becomes
128 * disabled for thread scheduling purposes and lies dormant until
129 * some other thread invokes the {@link #release} method for this
130 * semaphore and the current thread happens to be chosen as the
131 * thread to receive the permit.
132 *
133 * <p>If the current thread
134 * is {@link Thread#interrupt interrupted} while waiting
135 * for a permit then it will continue to wait, but the time at which
136 * the thread is assigned a permit may change compared to the time it
137 * would have received the permit had no interruption occurred. When the
138 * thread does return from this method its interrupt status will be set.
139 *
140 */
141 public void acquireUninterruptibly() {
142 acquireUninterruptibly(1L);
143 }
144
145 /**
146 * Acquires a permit from this semaphore, only if one is available at the
147 * time of invocation.
148 * <p>Acquires a permit, if one is available and returns immediately,
149 * with the value <tt>true</tt>,
150 * reducing the number of available permits by one.
151 *
152 * <p>If no permit is available then this method will return
153 * immediately with the value <tt>false</tt>.
154 *
155 * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
156 * otherwise.
157 */
158 public boolean tryAcquire() {
159 return tryAcquire(1L);
160 }
161
162 /**
163 * Acquires a permit from this semaphore, if one becomes available
164 * within the given waiting time and the
165 * current thread has not been {@link Thread#interrupt interrupted}.
166 * <p>Acquires a permit, if one is available and returns immediately,
167 * with the value <tt>true</tt>,
168 * reducing the number of available permits by one.
169 * <p>If no permit is available then
170 * the current thread becomes disabled for thread scheduling
171 * purposes and lies dormant until one of three things happens:
172 * <ul>
173 * <li>Some other thread invokes the {@link #release} method for this
174 * semaphore and the current thread happens to be chosen as the
175 * thread to receive the permit; or
176 * <li>Some other thread {@link Thread#interrupt interrupts} the current
177 * thread; or
178 * <li>The specified waiting time elapses.
179 * </ul>
180 * <p>If a permit is acquired then the value <tt>true</tt> is returned.
181 * <p>If the current thread:
182 * <ul>
183 * <li>has its interrupted status set on entry to this method; or
184 * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
185 * a permit,
186 * </ul>
187 * then {@link InterruptedException} is thrown and the current thread's
188 * interrupted status is cleared.
189 * <p>If the specified waiting time elapses then the value <tt>false</tt>
190 * is returned.
191 * The given waiting time is a best-effort lower bound. If the time is
192 * less than or equal to zero, the method will not wait at all.
193 *
194 * @param timeout the maximum time to wait for a permit
195 * @param unit the time unit of the <tt>timeout</tt> argument.
196 * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
197 * if the waiting time elapsed before a permit was acquired.
198 *
199 * @throws InterruptedException if the current thread is interrupted
200 *
201 * @see Thread#interrupt
202 *
203 */
204 public boolean tryAcquire(long timeout, TimeUnit unit)
205 throws InterruptedException {
206 return tryAcquire(1L, timeout, unit);
207 }
208
209 /**
210 * Releases a permit, returning it to the semaphore.
211 * <p>Releases a permit, increasing the number of available permits
212 * by one.
213 * If any threads are blocking trying to acquire a permit, then one
214 * is selected and given the permit that was just released.
215 * That thread is re-enabled for thread scheduling purposes.
216 * <p>There is no requirement that a thread that releases a permit must
217 * have acquired that permit by calling {@link #acquire}.
218 * Correct usage of a semaphore is established by programming convention
219 * in the application.
220 */
221 public void release() {
222 release(1L);
223 }
224
225
226 /**
227 * Acquires the given number of permits from this semaphore,
228 * blocking until all are available,
229 * or the thread is {@link Thread#interrupt interrupted}.
230 *
231 * <p>Acquires the given number of permits, if they are available,
232 * and returns immediately,
233 * reducing the number of available permits by the given amount.
234 *
235 * <p>If insufficient permits are available then the current thread becomes
236 * disabled for thread scheduling purposes and lies dormant until
237 * one of two things happens:
238 * <ul>
239 * <li>Some other thread invokes one of the {@link #release() release}
240 * methods for this semaphore, the current thread is next to be assigned
241 * permits and the number of available permits satisfies this request; or
242 * <li>Some other thread {@link Thread#interrupt interrupts} the current
243 * thread.
244 * </ul>
245 *
246 * <p>If the current thread:
247 * <ul>
248 * <li>has its interrupted status set on entry to this method; or
249 * <li>is {@link Thread#interrupt interrupted} while waiting
250 * for a permit,
251 * </ul>
252 * then {@link InterruptedException} is thrown and the current thread's
253 * interrupted status is cleared.
254 * Any available permits are assigned to the next waiting thread as if
255 * they had been made available by a call to {@link #release()}.
256 *
257 * @param permits the number of permits to acquire
258 *
259 * @throws InterruptedException if the current thread is interrupted
260 * @throws IllegalArgumentException if permits less than zero.
261 *
262 * @see Thread#interrupt
263 */
264 public void acquire(long permits) throws InterruptedException {
265 if (permits < 0) throw new IllegalArgumentException();
266 Node node = null;
267 lock.lockInterruptibly();
268 try {
269 long remaining = count - permits;
270 if (remaining >= 0) {
271 count = remaining;
272 return;
273 }
274 count = 0;
275 node = enq(-remaining);
276 } finally {
277 lock.unlock();
278 }
279
280 node.lock();
281 try {
282 while (node.permitsNeeded > 0)
283 node.done.await();
284 } catch(InterruptedException ie) {
285 if (node.permitsNeeded > 0) {
286 node.cancelled = true;
287 release(permits - node.permitsNeeded);
288 throw ie;
289 } else { // ignore interrupt
290 Thread.currentThread().interrupt();
291 }
292 } finally {
293 node.unlock();
294 }
295 }
296
297 /**
298 * Acquires the given number of permits from this semaphore,
299 * blocking until all are available.
300 *
301 * <p>Acquires the given number of permits, if they are available,
302 * and returns immediately,
303 * reducing the number of available permits by the given amount.
304 *
305 * <p>If insufficient permits are available then the current thread becomes
306 * disabled for thread scheduling purposes and lies dormant until
307 * some other thread invokes one of the {@link #release() release}
308 * methods for this semaphore, the current thread is next to be assigned
309 * permits and the number of available permits satisfies this request.
310 *
311 * <p>If the current thread
312 * is {@link Thread#interrupt interrupted} while waiting
313 * for permits then it will continue to wait and its position in the
314 * queue is not affected. When the
315 * thread does return from this method its interrupt status will be set.
316 *
317 * @param permits the number of permits to acquire
318 * @throws IllegalArgumentException if permits less than zero.
319 *
320 */
321 public void acquireUninterruptibly(long permits) {
322 if (permits < 0) throw new IllegalArgumentException();
323 Node node = null;
324 lock.lock();
325 try {
326 long remaining = count - permits;
327 if (remaining >= 0) {
328 count = remaining;
329 return;
330 }
331 count = 0;
332 node = enq(-remaining);
333 } finally {
334 lock.unlock();
335 }
336
337 node.lock();
338 try {
339 while (node.permitsNeeded > 0)
340 node.done.awaitUninterruptibly();
341 } finally {
342 node.unlock();
343 }
344 }
345
346 /**
347 * Acquires the given number of permits from this semaphore, only if
348 * all are available at the time of invocation.
349 * <p>Acquires the given number of permits, if they are available, and
350 * returns immediately, with the value <tt>true</tt>,
351 * reducing the number of available permits by the given amount.
352 *
353 * <p>If insufficient permits are available then this method will return
354 * immediately with the value <tt>false</tt> and the number of available
355 * permits is unchanged.
356 *
357 * @param permits the number of permits to acquire
358 *
359 * @return <tt>true</tt> if the permits were acquired and <tt>false</tt>
360 * otherwise.
361 * @throws IllegalArgumentException if permits less than zero.
362 */
363 public boolean tryAcquire(long permits) {
364 if (permits < 0) throw new IllegalArgumentException();
365 lock.lock();
366 try {
367 long remaining = count - permits;
368 if (remaining >= 0) {
369 count = remaining;
370 return true;
371 }
372 return false;
373 } finally {
374 lock.unlock();
375 }
376 }
377
378 /**
379 * Acquires the given number of permits from this semaphore, if all
380 * become available within the given waiting time and the
381 * current thread has not been {@link Thread#interrupt interrupted}.
382 * <p>Acquires the given number of permits, if they are available and
383 * returns immediately, with the value <tt>true</tt>,
384 * reducing the number of available permits by the given amount.
385 * <p>If insufficient permits are available then
386 * the current thread becomes disabled for thread scheduling
387 * purposes and lies dormant until one of three things happens:
388 * <ul>
389 * <li>Some other thread invokes one of the {@link #release() release}
390 * methods for this semaphore, the current thread is next to be assigned
391 * permits and the number of available permits satisfies this request; or
392 * <li>Some other thread {@link Thread#interrupt interrupts} the current
393 * thread; or
394 * <li>The specified waiting time elapses.
395 * </ul>
396 * <p>If the permits are acquired then the value <tt>true</tt> is returned.
397 * <p>If the current thread:
398 * <ul>
399 * <li>has its interrupted status set on entry to this method; or
400 * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
401 * the permits,
402 * </ul>
403 * then {@link InterruptedException} is thrown and the current thread's
404 * interrupted status is cleared.
405 * Any available permits are assigned to the next waiting thread as if
406 * they had been made available by a call to {@link #release()}.
407 *
408 * <p>If the specified waiting time elapses then the value <tt>false</tt>
409 * is returned.
410 * The given waiting time is a best-effort lower bound. If the time is
411 * less than or equal to zero, the method will not wait at all.
412 * Any available permits are assigned to the next waiting thread as if
413 * they had been made available by a call to {@link #release()}.
414 *
415 * @param permits the number of permits to acquire
416 * @param timeout the maximum time to wait for the permits
417 * @param unit the time unit of the <tt>timeout</tt> argument.
418 * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
419 * if the waiting time elapsed before all permits were acquired.
420 *
421 * @throws InterruptedException if the current thread is interrupted
422 * @throws IllegalArgumentException if permits less than zero.
423 *
424 * @see Thread#interrupt
425 *
426 */
427 public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
428 throws InterruptedException {
429 if (permits < 0) throw new IllegalArgumentException();
430 Node node = null;
431 long nanos = unit.toNanos(timeout);
432 lock.lockInterruptibly();
433 try {
434 long remaining = count - permits;
435 if (remaining >= 0) {
436 count = remaining;
437 return true;
438 }
439 if (nanos <= 0)
440 return false;
441 count = 0;
442 node = enq(-remaining);
443 } finally {
444 lock.unlock();
445 }
446
447 node.lock();
448 try {
449 while (node.permitsNeeded > 0) {
450 nanos = node.done.awaitNanos(nanos);
451 if (nanos <= 0) {
452 release(permits - node.permitsNeeded);
453 return false;
454 }
455 }
456 return true;
457 } catch(InterruptedException ie) {
458 if (node.permitsNeeded > 0) {
459 node.cancelled = true;
460 release(permits - node.permitsNeeded);
461 throw ie;
462 } else { // ignore interrupt
463 Thread.currentThread().interrupt();
464 return true;
465 }
466 } finally {
467 node.unlock();
468 }
469 }
470
471
472 /**
473 * Releases the given number of permits, returning them to the semaphore.
474 * <p>Releases the given number of permits, increasing the number of
475 * available permits by that amount.
476 * If any threads are blocking trying to acquire permits, then the
477 * one that has been waiting the longest
478 * is selected and given the permits that were just released.
479 * If the number of available permits satisfies that thread's request
480 * then that thread is re-enabled for thread scheduling purposes; otherwise
481 * the thread continues to wait. If there are still permits available
482 * after the first thread's request has been satisfied, then those permits
483 * are assigned to the next waiting thread. If it is satisfied then it is
484 * re-enabled for thread scheduling purposes. This continues until there
485 * are insufficient permits to satisfy the next waiting thread, or there
486 * are no more waiting threads.
487 *
488 * <p>There is no requirement that a thread that releases a permit must
489 * have acquired that permit by calling {@link Semaphore#acquire acquire}.
490 * Correct usage of a semaphore is established by programming convention
491 * in the application.
492 *
493 * @param permits the number of permits to release
494 * @throws IllegalArgumentException if permits less than zero.
495 */
496 public void release(long permits) {
497 if (permits < 0) throw new IllegalArgumentException();
498 if (permits == 0)
499 return;
500 // Even when using fair locks, releases should try to barge in.
501 if (!lock.tryLock())
502 lock.lock();
503 try {
504 long p = permits;
505 while (p > 0) {
506 Node node = head;
507 if (node == null) {
508 count += p;
509 p = 0;
510 } else {
511 node.lock();
512 try {
513 if (node.cancelled)
514 removeFirst();
515 else if (node.permitsNeeded > p) {
516 node.permitsNeeded -= p;
517 p = 0;
518 } else {
519 p -= node.permitsNeeded;
520 node.permitsNeeded = 0;
521 node.done.signal();
522 removeFirst();
523 }
524 } finally {
525 node.unlock();
526 }
527 }
528 }
529 } finally {
530 lock.unlock();
531 }
532 }
533 }