ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.5
Committed: Wed Dec 17 17:00:24 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.4: +64 -5 lines
Log Message:
Export delegation wrappers; fix/add documentation

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8    
9    
10     /**
11 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
12 dl 1.5 * to execute tasks. An <tt>ExecutorCompletionService</tt> can be
13     * useful as an add-on to solve task coordination problems.
14     *
15     * <p>
16     *
17     * <b>Usage Examples.</b>
18     * Suppose you have a set of solvers for a certain problem,
19     * and would like to run them concurrently, using the results of each of them
20     * that return a non-null value. You could write this as:
21     *
22     * <pre>
23     * void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
24     * throws InterruptedException, ExecutionException {
25     * ExecutorCompletionService&lt;Result&gt; ecs = new
26     * ExecutorCompletionService&lt;Result&gt;(e);
27     * for (Callable&lt;Result&gt; s : solvers)
28     * ecs.submit(s);
29     * int n = solvers.size();
30     * for (int i = 0; i &lt; n; ++i) {
31     * Result r = ecs.take().get();
32     * if (r != null)
33     * use(r);
34     * }
35     * }
36     * </pre>
37     *
38     * Suppose instead that you would like to use the first non-null result
39     * of a set of tasks, ignoring any of those that encounter exceptions
40     * and cancelling all of the other tasks when the first one is ready:
41     *
42     * <pre>
43     * void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
44     * throws InterruptedException {
45     * ExecutorCompletionService&lt;Result&gt; ecs =
46     * new ExecutorCompletionService&lt;Result&gt;(e);
47     * int n = solvers.size();
48     * ArrayList&lt;Future&lt;Result&gt;&gt; futures =
49     * new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
50     * Result result = null;
51     * try {
52     * for (Callable&lt;Result&gt; s : solvers)
53     * futures.add(ecs.submit(s));
54     * for (int i = 0; i &lt; n; ++i) {
55     * try {
56     * Result r = ecs.take().get();
57     * if (r != null) {
58     * result = r;
59     * break;
60     * }
61     * } catch(ExecutionException ignore) {}
62     * }
63     * }
64     * finally {
65     * for (Future&lt;Result&gt; f : futures)
66     * f.cancel(true);
67     * }
68     *
69     * if (result != null)
70     * use(result);
71     * }
72     * </pre>
73 dl 1.1 */
74     public class ExecutorCompletionService<V> implements CompletionService<V> {
75 jozart 1.2 private final Executor executor;
76 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
77 dl 1.1
78     /**
79     * FutureTask extension to enqueue upon completion
80     */
81 dl 1.4 private class QueueingFuture extends FutureTask<V> {
82     QueueingFuture(Callable<V> c) { super(c); }
83     QueueingFuture(Runnable t, V r) { super(t, r); }
84     protected void done() { completionQueue.add(this); }
85 dl 1.1 }
86    
87     /**
88     * Creates an ExecutorCompletionService using the supplied
89 dl 1.3 * executor for base task execution and a
90     * {@link LinkedBlockingQueue} as a completion queue.
91 dl 1.5 * @param executor the executor to use
92 dl 1.3 8 @throws NullPointerException if executor is <tt>null</tt>
93 dl 1.1 */
94 jozart 1.2 public ExecutorCompletionService(Executor executor) {
95 dl 1.1 if (executor == null)
96     throw new NullPointerException();
97     this.executor = executor;
98 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
99 dl 1.1 }
100    
101     /**
102 dl 1.3 * Creates an ExecutorCompletionService using the supplied
103     * executor for base task execution and the supplied queue as its
104     * completion queue.
105 dl 1.5 * @param executor the executor to use
106 dl 1.3 * @param completionQueue the queue to use as the completion queue;
107     * normally one dedicated for use by this service
108     8 @throws NullPointerException if executor or completionQueue are <tt>null</tt>
109 dl 1.1 */
110 dl 1.3 public ExecutorCompletionService(Executor executor,
111     BlockingQueue<Future<V>> completionQueue) {
112     if (executor == null || completionQueue == null)
113     throw new NullPointerException();
114     this.executor = executor;
115     this.completionQueue = completionQueue;
116 dl 1.1 }
117    
118     public Future<V> submit(Callable<V> task) {
119 dl 1.4 QueueingFuture f = new QueueingFuture(task);
120 dl 1.1 executor.execute(f);
121     return f;
122     }
123    
124     public Future<V> submit(Runnable task, V result) {
125 dl 1.4 QueueingFuture f = new QueueingFuture(task, result);
126 dl 1.1 executor.execute(f);
127     return f;
128     }
129    
130     public Future<V> take() throws InterruptedException {
131 dl 1.3 return completionQueue.take();
132 dl 1.1 }
133    
134     public Future<V> poll() {
135 dl 1.3 return completionQueue.poll();
136 dl 1.1 }
137    
138     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
139 dl 1.3 return completionQueue.poll(timeout, unit);
140 dl 1.1 }
141 dl 1.5
142 dl 1.1 }
143    
144