ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/SynchronousQueue.java (file contents):
Revision 1.1 by tim, Wed May 14 21:30:48 2003 UTC vs.
Revision 1.2 by dl, Tue May 27 18:14:40 2003 UTC

# Line 1 | Line 1
1 < package java.util.concurrent;
1 > /*
2 > * Written by Doug Lea with assistance from members of JCP JSR-166
3 > * Expert Group and released to the public domain. Use, modify, and
4 > * redistribute this code in any way without acknowledgement.
5 > */
6  
7 + package java.util.concurrent;
8   import java.util.*;
9  
10   /**
# Line 10 | Line 15 | import java.util.*;
15   * in another thread in order to hand it some information, event, or
16   * task.
17   **/
18 < public class SynchronousQueue<E> extends AbstractCollection<E>
18 > public class SynchronousQueue<E> extends AbstractQueue<E>
19          implements BlockingQueue<E>, java.io.Serializable {
20  
21 <    public SynchronousQueue() {}
22 <    public void put(E x) throws InterruptedException {
21 >    /*
22 >      This implementation divides actions into two cases for puts:
23 >
24 >      * An arriving putter that does not already have a waiting taker
25 >      creates a node holding item, and then waits for a taker to take it.
26 >      * An arriving putter that does already have a waiting taker fills
27 >      the slot node created by the taker, and notifies it to continue.
28 >
29 >      And symmetrically, two for takes:
30 >
31 >      * An arriving taker that does not already have a waiting putter
32 >      creates an empty slot node, and then waits for a putter to fill it.
33 >      * An arriving taker that does already have a waiting putter takes
34 >      item from the node created by the putter, and notifies it to continue.
35 >
36 >      This requires keeping two simple queues: waitingPuts and waitingTakes.
37 >  
38 >      When a put or take waiting for the actions of its counterpart
39 >      aborts due to interruption or timeout, it marks the node
40 >      it created as "CANCELLED", which causes its counterpart to retry
41 >      the entire put or take sequence.
42 >    */
43 >
44 >    /**
45 >     * Special marker used in queue nodes to indicate that
46 >     * the thread waiting for a change in the node has timed out
47 >     * or been interrupted.
48 >     **/
49 >    private static final Object CANCELLED = new Object();
50 >
51 >    /*
52 >     * Note that all fields are transient final, so there is
53 >     * no explicit serialization code.
54 >     */
55 >
56 >    private transient final WaitQueue waitingPuts = new WaitQueue();
57 >    private transient final WaitQueue waitingTakes = new WaitQueue();
58 >    private transient final ReentrantLock qlock = new ReentrantLock();
59 >
60 >    /**
61 >     * Nodes each maintain an item and handle waits and signals for
62 >     * getting and setting it. The class opportunistically extends
63 >     * ReentrantLock to save an extra object allocation per
64 >     * rendezvous.
65 >     */
66 >    private static class Node extends ReentrantLock {
67 >        Condition done;
68 >        Object item;
69 >        Node next;
70 >        Node(Object x) { item = x; }
71 >
72 >        /**
73 >         * Fill in the slot created by the taker and signal taker to
74 >         * continue.
75 >         */
76 >        boolean set(Object x) {
77 >            this.lock();
78 >            try {
79 >                if (item != CANCELLED) {
80 >                    item = x;
81 >                    if (done != null)
82 >                        done.signal();
83 >                    return true;
84 >                }
85 >                else // taker has cancelled
86 >                    return false;
87 >            }
88 >            finally {
89 >                this.unlock();
90 >            }
91 >        }
92 >
93 >        /**
94 >         * Remove item from slot created by putter and signal putter
95 >         * to continue.
96 >         */
97 >        Object get() {
98 >            this.lock();
99 >            try {
100 >                Object x = item;
101 >                if (x != CANCELLED) {
102 >                    item = null;
103 >                    next = null;
104 >                    if (done != null)
105 >                        done.signal();
106 >                    return x;
107 >                }
108 >                else
109 >                    return null;
110 >            }
111 >            finally {
112 >                this.unlock();
113 >            }
114 >        }
115 >
116 >
117 >        /**
118 >         * Wait for a taker to take item placed by putter, or time out.
119 >         */
120 >        boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
121 >            this.lock();
122 >            try {
123 >                for (;;) {
124 >                    if (item == null)
125 >                        return true;
126 >                    if (done == null)
127 >                        done = this.newCondition();
128 >                    if (timed) {
129 >                        if (nanos <= 0) {
130 >                            item = CANCELLED;
131 >                            return false;
132 >                        }
133 >                        nanos = done.awaitNanos(nanos);
134 >                    }
135 >                    else
136 >                        done.await();
137 >                }
138 >            }
139 >            catch (InterruptedException ie) {
140 >                // If taken, return normally but set interrupt status
141 >                if (item == null) {
142 >                    Thread.currentThread().interrupt();
143 >                    return true;
144 >                }
145 >                else {
146 >                    item = CANCELLED;
147 >                    done.signal(); // propagate signal
148 >                    throw ie;
149 >                }
150 >            }
151 >            finally {
152 >                this.unlock();
153 >            }
154 >        }
155 >
156 >
157 >        /**
158 >         * Wait for a putter to put item placed by taker, or time out.
159 >         */
160 >        Object waitForPut(boolean timed, long nanos) throws InterruptedException {
161 >            this.lock();
162 >            try {
163 >                for (;;) {
164 >                    Object x = item;
165 >                    if (x != null) {
166 >                        item = null;
167 >                        next = null;
168 >                        return x;
169 >                    }
170 >                    if (done == null)
171 >                        done = this.newCondition();
172 >                    if (timed) {
173 >                        if (nanos <= 0) {
174 >                            item = CANCELLED;
175 >                            return null;
176 >                        }
177 >                        nanos = done.awaitNanos(nanos);
178 >                    }
179 >                    else
180 >                        done.await();
181 >                }
182 >            }
183 >            catch(InterruptedException ie) {
184 >                Object x = item;
185 >                if (x != null) {
186 >                    item = null;
187 >                    next = null;
188 >                    Thread.currentThread().interrupt();
189 >                    return x;
190 >                }
191 >                else {
192 >                    item = CANCELLED;
193 >                    done.signal(); // propagate signal
194 >                    throw ie;
195 >                }
196 >            }
197 >            finally {
198 >                this.unlock();
199 >            }
200 >        }
201      }
202 <    public E take() throws InterruptedException {
203 <        return null;
202 >
203 >    /**
204 >     * Simple FIFO queue class to hold waiting puts/takes.
205 >     **/
206 >    private static class WaitQueue<E> {
207 >        Node head;
208 >        Node last;
209 >
210 >        Node enq(Object x) {
211 >            Node p = new Node(x);
212 >            if (last == null)
213 >                last = head = p;
214 >            else
215 >                last = last.next = p;
216 >            return p;
217 >        }
218 >
219 >        Node deq() {
220 >            Node p = head;
221 >            if (p != null && (head = p.next) == null)
222 >                last = null;
223 >            return p;
224 >        }
225      }
226 <    public boolean add(E x) {
227 <        return false;
226 >
227 >    /**
228 >     * Main put algorithm, used by put, timed offer
229 >     */
230 >    private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
231 >        if (x == null) throw new IllegalArgumentException();
232 >        for (;;) {
233 >            Node node;
234 >            boolean mustWait;
235 >            
236 >            qlock.lockInterruptibly();
237 >            try {
238 >                node = waitingTakes.deq();
239 >                if ( (mustWait = (node == null)) )
240 >                    node = waitingPuts.enq(x);
241 >            }
242 >            finally {
243 >                qlock.unlock();
244 >            }
245 >
246 >            if (mustWait)
247 >                return node.waitForTake(timed, nanos);
248 >
249 >            else if (node.set(x))
250 >                return true;
251 >
252 >            // else taker cancelled, so retry
253 >        }
254      }
255 <    public boolean offer(E x) {
256 <        return false;
255 >
256 >    /**
257 >     * Main take algorithm, used by take, timed poll
258 >     */
259 >    private E doTake(boolean timed, long nanos) throws InterruptedException {
260 >        for (;;) {
261 >            Node node;
262 >            boolean mustWait;
263 >
264 >            qlock.lockInterruptibly();
265 >            try {
266 >                node = waitingPuts.deq();
267 >                if ( (mustWait = (node == null)) )
268 >                    node = waitingTakes.enq(null);
269 >            }
270 >            finally {
271 >                qlock.unlock();
272 >            }
273 >
274 >            if (mustWait)
275 >                return (E)node.waitForPut(timed, nanos);
276 >
277 >            else {
278 >                E x = (E)node.get();
279 >                if (x != null)
280 >                    return x;
281 >                // else cancelled, so retry
282 >            }
283 >        }
284      }
285 <    public boolean remove(Object x) {
286 <        return false;
285 >
286 >    public SynchronousQueue() {}
287 >
288 >    public boolean isEmpty() {
289 >        return true;
290      }
291 <    public E remove() {
292 <        return null;
291 >
292 >    public int size() {
293 >        return 0;
294      }
295 <    public Iterator<E> iterator() {
296 <      return null;
295 >
296 >    public int remainingCapacity() {
297 >        return 0;
298      }
299  
300 <    public E element() {
300 >    public E peek() {
301          return null;
302      }
303  
304 <    public E poll() {
305 <        return null;
304 >
305 >    public void put(E x) throws InterruptedException {
306 >        doPut(x, false, 0);
307      }
308 <    public boolean offer(E x, long timeout, TimeUnit granularity) throws InterruptedException {
309 <        return false;
308 >
309 >    public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
310 >        return doPut(x, true, unit.toNanos(timeout));
311      }
312 <    public E poll(long timeout, TimeUnit granularity) throws InterruptedException {
313 <        return null;
312 >
313 >
314 >
315 >    public E take() throws InterruptedException {
316 >        return doTake(false, 0);
317      }
318 <    public E peek() {
319 <        return null;
318 >
319 >    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
320 >        return doTake(true, unit.toNanos(timeout));
321      }
322 <    public boolean isEmpty() {
323 <        return false;
322 >
323 >    // Untimed nonblocking versions
324 >
325 >    public boolean offer(E x) {
326 >        if (x == null) throw new IllegalArgumentException();
327 >        
328 >        for (;;) {
329 >            qlock.lock();
330 >            Node node;
331 >            try {
332 >                node = waitingTakes.deq();
333 >            }
334 >            finally {
335 >                qlock.unlock();
336 >            }
337 >            if (node == null)
338 >                return false;
339 >            
340 >            else if (node.set(x))
341 >                return true;
342 >            // else retry
343 >        }
344      }
345 <    public int size() {
346 <        return 0;
345 >
346 >    public E poll() {
347 >        for (;;) {
348 >            Node node;
349 >            qlock.lock();
350 >            try {
351 >                node = waitingPuts.deq();
352 >            }
353 >            finally {
354 >                qlock.unlock();
355 >            }
356 >            if (node == null)
357 >                return null;
358 >
359 >            else {
360 >                Object x = node.get();
361 >                if (x != null)
362 >                    return (E)x;
363 >                // else retry
364 >            }
365 >        }
366      }
367 <    public Object[] toArray() {
368 <        return null;
367 >
368 >    public boolean remove(Object x) {
369 >        return false;
370      }
371  
372 <    public <T> T[] toArray(T[] array) {
373 <        return null;
372 >    static class EmptyIterator<E> implements Iterator {
373 >        public boolean hasNext() {
374 >            return false;
375 >        }
376 >        public E next() {
377 >            throw new NoSuchElementException();
378 >        }
379 >        public void remove() {
380 >            throw new UnsupportedOperationException();
381 >        }
382 >    }
383 >
384 >    public Iterator<E> iterator() {
385 >        return new EmptyIterator();
386      }
387  
388  
389 +    public E[] toArray() {
390 +        return new E[0];
391 +    }
392 +
393 +    public <T> T[] toArray(T[] a) {
394 +        if (a.length > 0)
395 +            a[0] = null;
396 +        return a;
397 +    }
398   }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines