ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FairSemaphore.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/FairSemaphore.java (file contents):
Revision 1.5 by dl, Wed Jul 9 23:23:17 2003 UTC vs.
Revision 1.6 by dl, Fri Jul 11 13:12:06 2003 UTC

# Line 31 | Line 31 | import java.util.concurrent.locks.*;
31   public class FairSemaphore extends Semaphore {
32  
33      /*
34 <     * This differs from Semaphore only in that it uses a Fair
35 <     * ReentrantLock.  Because the Fair ReentrantLock guarantees FIFO
36 <     * queuing, we don't need to do anything fancy to prevent
37 <     * overtaking etc.  for the multiple-permit methods. The only
38 <     * minor downside is that multi-permit acquires will sometimes
39 <     * repeatedly wake up finding that they must re-wait. A custom
40 <     * solution could minimize this, at the expense of being slower
41 <     * and more complex for the typical case.
34 >     * Basic algorithm is a variant of the one described
35 >     * in CPJ book 2nd edition, section 3.7.3
36       */
37  
38      /**
39 <     * Construct a <tt>FiFoSemaphore</tt> with the given number of
39 >     * Nodes of the wait queue. Opportunistically subclasses ReentrantLock.
40 >     */
41 >    private static class Node extends ReentrantLock {
42 >        /** The condition to wait on */
43 >        Condition done = newCondition();
44 >        /** True if interrupted before released */
45 >        boolean cancelled;
46 >        /** Number of permits remaining until can release */
47 >        long permitsNeeded;
48 >        /** Next node of queue */
49 >        Node next;
50 >
51 >        Node(long n) { permitsNeeded = n; }
52 >    }
53 >
54 >    /** Head of the wait queue */
55 >    private Node head;
56 >
57 >    /** Pointer to last node of the wait queue */
58 >    private Node last;
59 >
60 >    /** Add a new node to wait queue with given number of permits needed */
61 >    private Node enq(long n) {
62 >        Node p = new Node(n);
63 >        if (last == null)
64 >            last = head = p;
65 >        else
66 >            last = last.next = p;
67 >        return p;
68 >    }
69 >
70 >    /** Remove first node from wait queue */
71 >    private void removeFirst() {
72 >        Node p = head;
73 >        assert p != null;
74 >        if (p != null && (head = p.next) == null)
75 >            last = null;
76 >    }
77 >
78 >    /**
79 >     * Construct a <tt>FairSemaphore</tt> with the given number of
80       * permits.
81       * @param permits the initial number of permits available
82       */
# Line 51 | Line 85 | public class FairSemaphore extends Semap
85      }
86  
87      /**
88 +     * Acquires a permit from this semaphore, blocking until one is
89 +     * available, or the thread is {@link Thread#interrupt interrupted}.
90 +     *
91 +     * <p>Acquires a permit, if one is available and returns immediately,
92 +     * reducing the number of available permits by one.
93 +     * <p>If no permit is available then the current thread becomes
94 +     * disabled for thread scheduling purposes and lies dormant until
95 +     * one of two things happens:
96 +     * <ul>
97 +     * <li>Some other thread invokes the {@link #release} method for this
98 +     * semaphore and the current thread happens to be chosen as the
99 +     * thread to receive the permit; or
100 +     * <li>Some other thread {@link Thread#interrupt interrupts} the current
101 +     * thread.
102 +     * </ul>
103 +     *
104 +     * <p>If the current thread:
105 +     * <ul>
106 +     * <li>has its interrupted status set on entry to this method; or
107 +     * <li>is {@link Thread#interrupt interrupted} while waiting
108 +     * for a permit,
109 +     * </ul>
110 +     * then {@link InterruptedException} is thrown and the current thread's
111 +     * interrupted status is cleared.
112 +     *
113 +     * @throws InterruptedException if the current thread is interrupted
114 +     *
115 +     * @see Thread#interrupt
116 +     */
117 +    public void acquire() throws InterruptedException {
118 +        acquire(1L);
119 +    }
120 +
121 +    /**
122 +     * Acquires a permit from this semaphore, blocking until one is
123 +     * available.
124 +     *
125 +     * <p>Acquires a permit, if one is available and returns immediately,
126 +     * reducing the number of available permits by one.
127 +     * <p>If no permit is available then the current thread becomes
128 +     * disabled for thread scheduling purposes and lies dormant until
129 +     * some other thread invokes the {@link #release} method for this
130 +     * semaphore and the current thread happens to be chosen as the
131 +     * thread to receive the permit.
132 +     *
133 +     * <p>If the current thread
134 +     * is {@link Thread#interrupt interrupted} while waiting
135 +     * for a permit then it will continue to wait, but the time at which
136 +     * the thread is assigned a permit may change compared to the time it
137 +     * would have received the permit had no interruption occurred. When the
138 +     * thread does return from this method its interrupt status will be set.
139 +     *
140 +     */
141 +    public void acquireUninterruptibly() {
142 +        acquireUninterruptibly(1L);
143 +    }
144 +
145 +    /**
146 +     * Acquires a permit from this semaphore, only if one is available at the
147 +     * time of invocation.
148 +     * <p>Acquires a permit, if one is available and returns immediately,
149 +     * with the value <tt>true</tt>,
150 +     * reducing the number of available permits by one.
151 +     *
152 +     * <p>If no permit is available then this method will return
153 +     * immediately with the value <tt>false</tt>.
154 +     *
155 +     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
156 +     * otherwise.
157 +     */
158 +    public boolean tryAcquire() {
159 +        return tryAcquire(1L);
160 +    }
161 +
162 +    /**
163 +     * Acquires a permit from this semaphore, if one becomes available
164 +     * within the given waiting time and the
165 +     * current thread has not been {@link Thread#interrupt interrupted}.
166 +     * <p>Acquires a permit, if one is available and returns immediately,
167 +     * with the value <tt>true</tt>,
168 +     * reducing the number of available permits by one.
169 +     * <p>If no permit is available then
170 +     * the current thread becomes disabled for thread scheduling
171 +     * purposes and lies dormant until one of three things happens:
172 +     * <ul>
173 +     * <li>Some other thread invokes the {@link #release} method for this
174 +     * semaphore and the current thread happens to be chosen as the
175 +     * thread to receive the permit; or
176 +     * <li>Some other thread {@link Thread#interrupt interrupts} the current
177 +     * thread; or
178 +     * <li>The specified waiting time elapses.
179 +     * </ul>
180 +     * <p>If a permit is acquired then the value <tt>true</tt> is returned.
181 +     * <p>If the current thread:
182 +     * <ul>
183 +     * <li>has its interrupted status set on entry to this method; or
184 +     * <li>is {@link Thread#interrupt interrupted} while waiting to acquire
185 +     * a permit,
186 +     * </ul>
187 +     * then {@link InterruptedException} is thrown and the current thread's
188 +     * interrupted status is cleared.
189 +     * <p>If the specified waiting time elapses then the value <tt>false</tt>
190 +     * is returned.
191 +     * The given waiting time is a best-effort lower bound. If the time is
192 +     * less than or equal to zero, the method will not wait at all.
193 +     *
194 +     * @param timeout the maximum time to wait for a permit
195 +     * @param unit the time unit of the <tt>timeout</tt> argument.
196 +     * @return <tt>true</tt> if a permit was acquired and <tt>false</tt>
197 +     * if the waiting time elapsed before a permit was acquired.
198 +     *
199 +     * @throws InterruptedException if the current thread is interrupted
200 +     *
201 +     * @see Thread#interrupt
202 +     *
203 +     */
204 +    public boolean tryAcquire(long timeout, TimeUnit unit)
205 +        throws InterruptedException {
206 +        return tryAcquire(1L, timeout, unit);
207 +    }
208 +
209 +    /**
210 +     * Releases a permit, returning it to the semaphore.
211 +     * <p>Releases a permit, increasing the number of available permits
212 +     * by one.
213 +     * If any threads are blocking trying to acquire a permit, then one
214 +     * is selected and given the permit that was just released.
215 +     * That thread is re-enabled for thread scheduling purposes.
216 +     * <p>There is no requirement that a thread that releases a permit must
217 +     * have acquired that permit by calling {@link #acquire}.
218 +     * Correct usage of a semaphore is established by programming convention
219 +     * in the application.
220 +     */
221 +    public void release() {
222 +        release(1L);
223 +    }
224 +
225 +      
226 +    /**
227       * Acquires the given number of permits from this semaphore,
228       * blocking until all are available,
229       * or the thread is {@link Thread#interrupt interrupted}.
# Line 90 | Line 263 | public class FairSemaphore extends Semap
263       */
264      public void acquire(long permits) throws InterruptedException {
265          if (permits < 0) throw new IllegalArgumentException();
266 <
266 >        Node node = null;
267          lock.lockInterruptibly();
268          try {
269 <            while (count - permits < 0)
270 <                available.await();
271 <            count -= permits;
272 <        }
273 <        catch (InterruptedException ie) {
274 <            available.signal();
275 <            throw ie;
269 >            long remaining = count - permits;
270 >            if (remaining >= 0) {
271 >                count = remaining;
272 >                return;
273 >            }
274 >            count = 0;
275 >            node = enq(-remaining);
276          }
277          finally {
278              lock.unlock();
279          }
280 <        
280 >
281 >        node.lock();
282 >        try {
283 >            while (node.permitsNeeded > 0)
284 >                node.done.await();
285 >        }
286 >        catch(InterruptedException ie) {
287 >            if (node.permitsNeeded > 0) {
288 >                node.cancelled = true;
289 >                release(permits - node.permitsNeeded);
290 >                throw ie;
291 >            }
292 >            else { // ignore interrupt
293 >                Thread.currentThread().interrupt();
294 >            }
295 >        }
296 >        finally {
297 >            node.unlock();
298 >        }
299      }
300  
301      /**
# Line 133 | Line 324 | public class FairSemaphore extends Semap
324       */
325      public void acquireUninterruptibly(long permits) {
326          if (permits < 0) throw new IllegalArgumentException();
327 <
327 >        Node node = null;
328          lock.lock();
329          try {
330 <            while (count - permits < 0)
331 <                available.awaitUninterruptibly();
332 <            count -= permits;
330 >            long remaining = count - permits;
331 >            if (remaining >= 0) {
332 >                count = remaining;
333 >                return;
334 >            }
335 >            count = 0;
336 >            node = enq(-remaining);
337          }
338          finally {
339              lock.unlock();
340          }
341 +
342 +        node.lock();
343 +        try {
344 +            while (node.permitsNeeded > 0)
345 +                node.done.awaitUninterruptibly();
346 +        }
347 +        finally {
348 +            node.unlock();
349 +        }
350      }
351  
352      /**
# Line 164 | Line 368 | public class FairSemaphore extends Semap
368       */
369      public boolean tryAcquire(long permits) {
370          if (permits < 0) throw new IllegalArgumentException();
167
371          lock.lock();
372          try {
373 <            if (count - permits >= 0) {
374 <                count -= permits;
373 >            long remaining = count - permits;
374 >            if (remaining >= 0) {
375 >                count = remaining;
376                  return true;
377              }
378              return false;
# Line 217 | Line 421 | public class FairSemaphore extends Semap
421       *
422       * @param permits the number of permits to acquire
423       * @param timeout the maximum time to wait for the permits
424 <     * @param granularity the time unit of the <tt>timeout</tt> argument.
424 >     * @param unit the time unit of the <tt>timeout</tt> argument.
425       * @return <tt>true</tt> if all permits were acquired and <tt>false</tt>
426       * if the waiting time elapsed before all permits were acquired.
427       *
# Line 227 | Line 431 | public class FairSemaphore extends Semap
431       * @see Thread#interrupt
432       *
433       */
434 <    public boolean tryAcquire(long permits, long timeout, TimeUnit granularity)
434 >    public boolean tryAcquire(long permits, long timeout, TimeUnit unit)
435          throws InterruptedException {
436          if (permits < 0) throw new IllegalArgumentException();
437 +        Node node = null;
438 +        long nanos = unit.toNanos(timeout);
439          lock.lockInterruptibly();
234        long nanos = granularity.toNanos(timeout);
440          try {
441 <            for (;;) {
442 <                if (count - permits >= 0) {
443 <                    count -= permits;
444 <                    return true;
445 <                }
446 <                if (nanos <= 0)
441 >            long remaining = count - permits;
442 >            if (remaining >= 0) {
443 >                count = remaining;
444 >                return true;
445 >            }
446 >            if (nanos <= 0)
447 >                return false;
448 >            count = 0;
449 >            node = enq(-remaining);
450 >        }
451 >        finally {
452 >            lock.unlock();
453 >        }
454 >        
455 >        node.lock();
456 >        try {
457 >            while (node.permitsNeeded > 0) {
458 >                nanos = node.done.awaitNanos(nanos);
459 >                if (nanos <= 0) {
460 >                    release(permits - node.permitsNeeded);
461                      return false;
462 <                nanos = available.awaitNanos(nanos);
462 >                }
463              }
464 +            return true;
465          }
466 <        catch (InterruptedException ie) {
467 <            available.signal();
468 <            throw ie;
466 >        catch(InterruptedException ie) {
467 >            if (node.permitsNeeded > 0) {
468 >                node.cancelled = true;
469 >                release(permits - node.permitsNeeded);
470 >                throw ie;
471 >            }
472 >            else { // ignore interrupt
473 >                Thread.currentThread().interrupt();
474 >                return true;
475 >            }
476          }
477          finally {
478 <            lock.unlock();
478 >            node.unlock();
479          }
253
480      }
481  
482  
257
483      /**
484       * Releases the given number of permits, returning them to the semaphore.
485       * <p>Releases the given number of permits, increasing the number of
# Line 281 | Line 506 | public class FairSemaphore extends Semap
506       */
507      public void release(long permits) {
508          if (permits < 0) throw new IllegalArgumentException();
509 <        // Even if using fair locks, releases should try to barge in.
509 >        if (permits == 0)
510 >            return;
511 >        // Even when using fair locks, releases should try to barge in.
512          if (!lock.tryLock())
513              lock.lock();
514          try {
515 <            count += permits;
516 <            for (int i = 0; i < permits; ++i)
517 <                available.signal();
515 >            long p = permits;
516 >            while (p > 0) {
517 >                Node node = head;
518 >                if (node == null) {
519 >                    count += p;
520 >                    p = 0;
521 >                }
522 >                else {
523 >                    node.lock();
524 >                    try {
525 >                        if (node.cancelled)
526 >                            removeFirst();
527 >                        else if (node.permitsNeeded > p) {
528 >                            node.permitsNeeded -= p;
529 >                            p = 0;
530 >                        }
531 >                        else {
532 >                            p -= node.permitsNeeded;
533 >                            node.permitsNeeded = 0;
534 >                            node.done.signal();
535 >                            removeFirst();
536 >                        }
537 >                    }
538 >                    finally {
539 >                        node.unlock();
540 >                    }
541 >                }
542 >            }
543          }
544          finally {
545              lock.unlock();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines