ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.9
Committed: Wed Jan 21 15:20:35 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.8: +7 -5 lines
Log Message:
doc improvements; consistent conventions for nested classes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.8 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8    
9    
10     /**
11 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
12 dl 1.7 * to execute tasks. This class arranges that submitted tasks are,
13     * upon completion, placed on a queue accessible using <tt>take</tt>.
14     * The class is lightweight enough to be suitable for transient use
15     * when processing groups of tasks.
16 dl 1.5 *
17     * <p>
18     *
19     * <b>Usage Examples.</b>
20 dl 1.9 *
21     * Suppose you have a set of solvers for a certain problem, each
22     * returning a value of some type <tt>Result</tt>, and would like to
23     * run them concurrently, processing the results of each of them that
24     * return a non-null value, in some method <tt>use(Result r)</tt>. You
25     * could write this as:
26 dl 1.5 *
27     * <pre>
28 dl 1.6 * void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
29     * throws InterruptedException, ExecutionException {
30     * CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
31 dl 1.5 * for (Callable&lt;Result&gt; s : solvers)
32     * ecs.submit(s);
33     * int n = solvers.size();
34     * for (int i = 0; i &lt; n; ++i) {
35     * Result r = ecs.take().get();
36     * if (r != null)
37     * use(r);
38     * }
39     * }
40     * </pre>
41     *
42     * Suppose instead that you would like to use the first non-null result
43 dl 1.9 * of the set of tasks, ignoring any that encounter exceptions,
44 dl 1.6 * and cancelling all other tasks when the first one is ready:
45 dl 1.5 *
46     * <pre>
47     * void solve(Executor e, Collection&lt;Callable&lt;Result&gt;&gt; solvers)
48 dl 1.6 * throws InterruptedException {
49     * CompletionService&lt;Result&gt; ecs = new ExecutorCompletionService&lt;Result&gt;(e);
50 dl 1.5 * int n = solvers.size();
51 dl 1.6 * List&lt;Future&lt;Result&gt;&gt; futures = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
52 dl 1.5 * Result result = null;
53     * try {
54     * for (Callable&lt;Result&gt; s : solvers)
55     * futures.add(ecs.submit(s));
56     * for (int i = 0; i &lt; n; ++i) {
57     * try {
58     * Result r = ecs.take().get();
59     * if (r != null) {
60     * result = r;
61     * break;
62     * }
63     * } catch(ExecutionException ignore) {}
64     * }
65     * }
66     * finally {
67     * for (Future&lt;Result&gt; f : futures)
68     * f.cancel(true);
69     * }
70     *
71     * if (result != null)
72     * use(result);
73     * }
74     * </pre>
75 dl 1.1 */
76     public class ExecutorCompletionService<V> implements CompletionService<V> {
77 jozart 1.2 private final Executor executor;
78 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
79 dl 1.1
80     /**
81     * FutureTask extension to enqueue upon completion
82     */
83 dl 1.4 private class QueueingFuture extends FutureTask<V> {
84     QueueingFuture(Callable<V> c) { super(c); }
85     QueueingFuture(Runnable t, V r) { super(t, r); }
86     protected void done() { completionQueue.add(this); }
87 dl 1.1 }
88    
89     /**
90     * Creates an ExecutorCompletionService using the supplied
91 dl 1.3 * executor for base task execution and a
92     * {@link LinkedBlockingQueue} as a completion queue.
93 dl 1.5 * @param executor the executor to use
94 dl 1.3 8 @throws NullPointerException if executor is <tt>null</tt>
95 dl 1.1 */
96 jozart 1.2 public ExecutorCompletionService(Executor executor) {
97 dl 1.1 if (executor == null)
98     throw new NullPointerException();
99     this.executor = executor;
100 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
101 dl 1.1 }
102    
103     /**
104 dl 1.3 * Creates an ExecutorCompletionService using the supplied
105     * executor for base task execution and the supplied queue as its
106     * completion queue.
107 dl 1.5 * @param executor the executor to use
108 dl 1.7 * @param completionQueue the queue to use as the completion queue
109 dl 1.3 * normally one dedicated for use by this service
110     8 @throws NullPointerException if executor or completionQueue are <tt>null</tt>
111 dl 1.1 */
112 dl 1.3 public ExecutorCompletionService(Executor executor,
113     BlockingQueue<Future<V>> completionQueue) {
114     if (executor == null || completionQueue == null)
115     throw new NullPointerException();
116     this.executor = executor;
117     this.completionQueue = completionQueue;
118 dl 1.1 }
119    
120     public Future<V> submit(Callable<V> task) {
121 dl 1.7 if (task == null) throw new NullPointerException();
122 dl 1.4 QueueingFuture f = new QueueingFuture(task);
123 dl 1.1 executor.execute(f);
124     return f;
125     }
126    
127     public Future<V> submit(Runnable task, V result) {
128 dl 1.7 if (task == null) throw new NullPointerException();
129 dl 1.4 QueueingFuture f = new QueueingFuture(task, result);
130 dl 1.1 executor.execute(f);
131     return f;
132     }
133    
134     public Future<V> take() throws InterruptedException {
135 dl 1.3 return completionQueue.take();
136 dl 1.1 }
137    
138     public Future<V> poll() {
139 dl 1.3 return completionQueue.poll();
140 dl 1.1 }
141    
142     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
143 dl 1.3 return completionQueue.poll(timeout, unit);
144 dl 1.1 }
145 dl 1.5
146 dl 1.1 }
147    
148