ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.7
Committed: Sun Dec 21 12:24:48 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.6: +7 -2 lines
Log Message:
Documentation improvments; support two-arg submit

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8    
9    
10     /**
11 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
12 dl 1.7 * to execute tasks. This class arranges that submitted tasks are,
13     * upon completion, placed on a queue accessible using <tt>take</tt>.
14     * The class is lightweight enough to be suitable for transient use
15     * when processing groups of tasks.
16 dl 1.5 *
17     * <p>
18     *
19     * <b>Usage Examples.</b>
20 dl 1.6 * Suppose you have a set of solvers for a certain problem (each returning
21     * a value of some type <tt>Result</tt>),
22 dl 1.5 * and would like to run them concurrently, using the results of each of them
23     * that return a non-null value. You could write this as:
24     *
25     * <pre>
26 dl 1.6 * void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
27     * throws InterruptedException, ExecutionException {
28     * CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
29 dl 1.5 * for (Callable&lt;Result&gt; s : solvers)
30     * ecs.submit(s);
31     * int n = solvers.size();
32     * for (int i = 0; i &lt; n; ++i) {
33     * Result r = ecs.take().get();
34     * if (r != null)
35     * use(r);
36     * }
37     * }
38     * </pre>
39     *
40     * Suppose instead that you would like to use the first non-null result
41 dl 1.6 * of the set of tasks, ignoring any of those that encounter exceptions,
42     * and cancelling all other tasks when the first one is ready:
43 dl 1.5 *
44     * <pre>
45     * void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
46 dl 1.6 * throws InterruptedException {
47     * CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
48 dl 1.5 * int n = solvers.size();
49 dl 1.6 * List&lt;Future&lt;Result&gt;&gt; futures = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
50 dl 1.5 * Result result = null;
51     * try {
52     * for (Callable&lt;Result&gt; s : solvers)
53     * futures.add(ecs.submit(s));
54     * for (int i = 0; i &lt; n; ++i) {
55     * try {
56     * Result r = ecs.take().get();
57     * if (r != null) {
58     * result = r;
59     * break;
60     * }
61     * } catch(ExecutionException ignore) {}
62     * }
63     * }
64     * finally {
65     * for (Future&lt;Result&gt; f : futures)
66     * f.cancel(true);
67     * }
68     *
69     * if (result != null)
70     * use(result);
71     * }
72     * </pre>
73 dl 1.1 */
74     public class ExecutorCompletionService<V> implements CompletionService<V> {
75 jozart 1.2 private final Executor executor;
76 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
77 dl 1.1
78     /**
79     * FutureTask extension to enqueue upon completion
80     */
81 dl 1.4 private class QueueingFuture extends FutureTask<V> {
82     QueueingFuture(Callable<V> c) { super(c); }
83     QueueingFuture(Runnable t, V r) { super(t, r); }
84     protected void done() { completionQueue.add(this); }
85 dl 1.1 }
86    
87     /**
88     * Creates an ExecutorCompletionService using the supplied
89 dl 1.3 * executor for base task execution and a
90     * {@link LinkedBlockingQueue} as a completion queue.
91 dl 1.5 * @param executor the executor to use
92 dl 1.3 8 @throws NullPointerException if executor is <tt>null</tt>
93 dl 1.1 */
94 jozart 1.2 public ExecutorCompletionService(Executor executor) {
95 dl 1.1 if (executor == null)
96     throw new NullPointerException();
97     this.executor = executor;
98 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
99 dl 1.1 }
100    
101     /**
102 dl 1.3 * Creates an ExecutorCompletionService using the supplied
103     * executor for base task execution and the supplied queue as its
104     * completion queue.
105 dl 1.5 * @param executor the executor to use
106 dl 1.7 * @param completionQueue the queue to use as the completion queue
107 dl 1.3 * normally one dedicated for use by this service
108     8 @throws NullPointerException if executor or completionQueue are <tt>null</tt>
109 dl 1.1 */
110 dl 1.3 public ExecutorCompletionService(Executor executor,
111     BlockingQueue<Future<V>> completionQueue) {
112     if (executor == null || completionQueue == null)
113     throw new NullPointerException();
114     this.executor = executor;
115     this.completionQueue = completionQueue;
116 dl 1.1 }
117    
118     public Future<V> submit(Callable<V> task) {
119 dl 1.7 if (task == null) throw new NullPointerException();
120 dl 1.4 QueueingFuture f = new QueueingFuture(task);
121 dl 1.1 executor.execute(f);
122     return f;
123     }
124    
125     public Future<V> submit(Runnable task, V result) {
126 dl 1.7 if (task == null) throw new NullPointerException();
127 dl 1.4 QueueingFuture f = new QueueingFuture(task, result);
128 dl 1.1 executor.execute(f);
129     return f;
130     }
131    
132     public Future<V> take() throws InterruptedException {
133 dl 1.3 return completionQueue.take();
134 dl 1.1 }
135    
136     public Future<V> poll() {
137 dl 1.3 return completionQueue.poll();
138 dl 1.1 }
139    
140     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
141 dl 1.3 return completionQueue.poll(timeout, unit);
142 dl 1.1 }
143 dl 1.5
144 dl 1.1 }
145    
146