ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SynchronousQueue.java (file contents):
Revision 1.34 by dl, Fri Jan 2 01:31:12 2004 UTC vs.
Revision 1.35 by dl, Fri Jan 2 21:02:32 2004 UTC

# Line 79 | Line 79 | public class SynchronousQueue<E> extends
79       *  0 for waiting, 1 for ack, -1 for cancelled.
80       */
81      private static final class Node extends AbstractQueuedSynchronizer {
82 +        /** Synchronization state value representing that node acked */
83 +        private static final int ACK    =  1;
84 +        /** Synchronization state value representing that node cancelled */
85 +        private static final int CANCEL = -1;
86 +
87          /** The item being transferred */
88          Object item;
89          /** Next node in wait queue */
90          Node next;
86        Node(Object x) { item = x; }
91  
92 <        private static final int WAITING   =  0;
93 <        private static final int ACKED     =  1;
90 <        private static final int CANCELLED = -1;
92 >        /** Create node with initial item */
93 >        Node(Object x) { item = x; }
94  
95          /**
96           * Implements AQS base acquire to succeed if not in WAITING state
97           */
98 <        protected boolean tryAcquireExclusiveState(boolean b, int ignore) {
99 <            return getState() != WAITING;
98 >        protected boolean tryAcquireExclusive(boolean b, int ignore) {
99 >            return getState() != 0;
100          }
101  
102          /**
103           * Implements AQS base release to signal if state changed
104           */
105 <        protected boolean releaseExclusiveState(int newState) {
106 <            return compareAndSetState(WAITING, newState);
104 <        }
105 <
106 <        /**
107 <         * Try to acknowledge; fail if not waiting
108 <         */
109 <        private boolean ack() {
110 <            return releaseExclusive(ACKED);
105 >        protected boolean tryReleaseExclusive(int newState) {
106 >            return compareAndSetState(0, newState);
107          }
108  
109          /**
110 <         * Try to cancel; fail if not waiting
115 <         */
116 <        private boolean cancel() {
117 <            return releaseExclusive(CANCELLED);
118 <        }
119 <
120 <        /**
121 <         * Take item and null out fields (for sake of GC)
110 >         * Take item and null out field (for sake of GC)
111           */
112          private Object extract() {
113              Object x = item;
114              item = null;
126            next = null;
115              return x;
116          }
117  
118          /**
119 +         * Try to cancel on interrupt; if so rethrowing,
120 +         * else setting interrupt state
121 +         */
122 +        private void checkCancellationOnInterrupt(InterruptedException ie)
123 +            throws InterruptedException {
124 +            if (releaseExclusive(CANCEL))
125 +                throw ie;
126 +            Thread.currentThread().interrupt();
127 +        }
128 +
129 +        /**
130           * Fill in the slot created by the taker and signal taker to
131           * continue.
132           */
133          boolean setItem(Object x) {
134 <            item = x;
135 <            return ack();
134 >            item = x; // can place in slot even if cancelled
135 >            return releaseExclusive(ACK);
136          }
137  
138          /**
# Line 141 | Line 140 | public class SynchronousQueue<E> extends
140           * to continue.
141           */
142          Object getItem() {
143 <            return (ack())? extract() : null;
143 >            return (releaseExclusive(ACK))? extract() : null;
144 >        }
145 >
146 >        /**
147 >         * Wait for a taker to take item placed by putter.
148 >         */
149 >        void waitForTake() throws InterruptedException {
150 >            try {
151 >                acquireExclusiveInterruptibly(0);
152 >            } catch (InterruptedException ie) {
153 >                checkCancellationOnInterrupt(ie);
154 >            }
155 >        }
156 >
157 >        /**
158 >         * Wait for a putter to put item placed by taker.
159 >         */
160 >        Object waitForPut() throws InterruptedException {
161 >            try {
162 >                acquireExclusiveInterruptibly(0);
163 >            } catch (InterruptedException ie) {
164 >                checkCancellationOnInterrupt(ie);
165 >            }
166 >            return extract();
167          }
168  
169          /**
170           * Wait for a taker to take item placed by putter or time out.
171           */
172 <        boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
172 >        boolean waitForTake(long nanos) throws InterruptedException {
173              try {
174 <                if (!timed)
175 <                    acquireExclusiveInterruptibly(0);
154 <                else if (!acquireExclusiveTimed(0, nanos) && cancel())
174 >                if (!acquireExclusiveTimed(0, nanos) &&
175 >                    releaseExclusive(CANCEL))
176                      return false;
156                return true;
177              } catch (InterruptedException ie) {
178 <                if (cancel())
159 <                    throw ie;
160 <                Thread.currentThread().interrupt();
161 <                return true;
178 >                checkCancellationOnInterrupt(ie);
179              }
180 +            return true;
181          }
182  
183          /**
184           * Wait for a putter to put item placed by taker, or time out.
185           */
186 <        Object waitForPut(boolean timed, long nanos) throws InterruptedException {
186 >        Object waitForPut(long nanos) throws InterruptedException {
187              try {
188 <                if (!timed)
189 <                    acquireExclusiveInterruptibly(0);
172 <                else if (!acquireExclusiveTimed(0, nanos) && cancel())
188 >                if (!acquireExclusiveTimed(0, nanos) &&
189 >                    releaseExclusive(CANCEL))
190                      return null;
174                return extract();
191              } catch (InterruptedException ie) {
192 <                if (cancel())
177 <                    throw ie;
178 <                Thread.currentThread().interrupt();
179 <                return extract();
192 >                checkCancellationOnInterrupt(ie);
193              }
194 +            return extract();
195          }
196  
197      }
# Line 200 | Line 214 | public class SynchronousQueue<E> extends
214  
215          Node deq() {
216              Node p = head;
217 <            if (p != null && (head = p.next) == null)
218 <                last = null;
217 >            if (p != null) {
218 >                if ((head = p.next) == null)
219 >                    last = null;
220 >                p.next = null;
221 >            }
222              return p;
223          }
224      }
225  
226      /**
227 <     * Main put algorithm, used by put, timed offer
227 >     * Creates a <tt>SynchronousQueue</tt>.
228       */
229 <    private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
213 <        if (x == null) throw new NullPointerException();
214 <        for (;;) {
215 <            Node node;
216 <            boolean mustWait;
217 <            final ReentrantLock qlock = this.qlock;
218 <            qlock.lockInterruptibly();
219 <            try {
220 <                node = waitingTakes.deq();
221 <                if ( (mustWait = (node == null)) )
222 <                    node = waitingPuts.enq(x);
223 <            } finally {
224 <                qlock.unlock();
225 <            }
226 <
227 <            if (mustWait)
228 <                return  node.waitForTake(timed, nanos);
229 <
230 <            else if (node.setItem(x))
231 <                return true;
229 >    public SynchronousQueue() {}
230  
233            // else taker cancelled, so retry
234        }
235    }
231  
232      /**
233 <     * Main take algorithm, used by take, timed poll
233 >     * Adds the specified element to this queue, waiting if necessary for
234 >     * another thread to receive it.
235 >     * @param o the element to add
236 >     * @throws InterruptedException if interrupted while waiting.
237 >     * @throws NullPointerException if the specified element is <tt>null</tt>.
238       */
239 <    private E doTake(boolean timed, long nanos) throws InterruptedException {
239 >    public void put(E o) throws InterruptedException {
240 >        if (o == null) throw new NullPointerException();
241 >        final ReentrantLock qlock = this.qlock;
242 >
243          for (;;) {
244              Node node;
245              boolean mustWait;
244
245            final ReentrantLock qlock = this.qlock;
246              qlock.lockInterruptibly();
247              try {
248 <                node = waitingPuts.deq();
248 >                node = waitingTakes.deq();
249                  if ( (mustWait = (node == null)) )
250 <                    node = waitingTakes.enq(null);
250 >                    node = waitingPuts.enq(o);
251              } finally {
252                  qlock.unlock();
253              }
254  
255              if (mustWait) {
256 <                Object x = node.waitForPut(timed, nanos);
257 <                return (E)x;
258 <            }
259 <            else {
260 <                Object x = node.getItem();
261 <                if (x != null)
262 <                    return (E)x;
263 <                // else cancelled, so retry
256 >                node.waitForTake();
257 >                return;
258              }
265        }
266    }
267
268    /**
269     * Creates a <tt>SynchronousQueue</tt>.
270     */
271    public SynchronousQueue() {}
259  
260 +            else if (node.setItem(o))
261 +                return;
262  
263 <    /**
264 <     * Adds the specified element to this queue, waiting if necessary for
276 <     * another thread to receive it.
277 <     * @param o the element to add
278 <     * @throws InterruptedException if interrupted while waiting.
279 <     * @throws NullPointerException if the specified element is <tt>null</tt>.
280 <     */
281 <    public void put(E o) throws InterruptedException {
282 <        doPut(o, false, 0);
263 >            // else taker cancelled, so retry
264 >        }
265      }
266  
267      /**
# Line 296 | Line 278 | public class SynchronousQueue<E> extends
278       * @throws NullPointerException if the specified element is <tt>null</tt>.
279       */
280      public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 <        return doPut(o, true, unit.toNanos(timeout));
281 >        if (o == null) throw new NullPointerException();
282 >        long nanos = unit.toNanos(timeout);
283 >        final ReentrantLock qlock = this.qlock;
284 >        for (;;) {
285 >            Node node;
286 >            boolean mustWait;
287 >            qlock.lockInterruptibly();
288 >            try {
289 >                node = waitingTakes.deq();
290 >                if ( (mustWait = (node == null)) )
291 >                    node = waitingPuts.enq(o);
292 >            } finally {
293 >                qlock.unlock();
294 >            }
295 >
296 >            if (mustWait)
297 >                return node.waitForTake(nanos);
298 >
299 >            else if (node.setItem(o))
300 >                return true;
301 >
302 >            // else taker cancelled, so retry
303 >        }
304 >
305      }
306  
307  
# Line 306 | Line 311 | public class SynchronousQueue<E> extends
311       * @return the head of this queue
312       */
313      public E take() throws InterruptedException {
314 <        return doTake(false, 0);
314 >        final ReentrantLock qlock = this.qlock;
315 >        for (;;) {
316 >            Node node;
317 >            boolean mustWait;
318 >
319 >            qlock.lockInterruptibly();
320 >            try {
321 >                node = waitingPuts.deq();
322 >                if ( (mustWait = (node == null)) )
323 >                    node = waitingTakes.enq(null);
324 >            } finally {
325 >                qlock.unlock();
326 >            }
327 >
328 >            if (mustWait)
329 >                return (E)node.waitForPut();
330 >
331 >            else {
332 >                Object x = node.getItem();
333 >                if (x != null)
334 >                    return (E)x;
335 >                // else cancelled, so retry
336 >            }
337 >        }
338      }
339  
340      /**
# Line 322 | Line 350 | public class SynchronousQueue<E> extends
350       * @throws InterruptedException if interrupted while waiting.
351       */
352      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
353 <        return doTake(true, unit.toNanos(timeout));
353 >        long nanos = unit.toNanos(timeout);
354 >        final ReentrantLock qlock = this.qlock;
355 >
356 >        for (;;) {
357 >            Node node;
358 >            boolean mustWait;
359 >
360 >            qlock.lockInterruptibly();
361 >            try {
362 >                node = waitingPuts.deq();
363 >                if ( (mustWait = (node == null)) )
364 >                    node = waitingTakes.enq(null);
365 >            } finally {
366 >                qlock.unlock();
367 >            }
368 >
369 >            if (mustWait)
370 >                return (E) node.waitForPut(nanos);
371 >
372 >            else {
373 >                Object x = node.getItem();
374 >                if (x != null)
375 >                    return (E)x;
376 >                // else cancelled, so retry
377 >            }
378 >        }
379      }
380  
381      // Untimed nonblocking versions

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines