ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.17
Committed: Sat Dec 2 20:53:11 2006 UTC (17 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.16: +4 -1 lines
Log Message:
Clarify that queue should be unbounded

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.8 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.1 */
6    
7     package java.util.concurrent;
8    
9     /**
10 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
11 dl 1.7 * to execute tasks. This class arranges that submitted tasks are,
12     * upon completion, placed on a queue accessible using <tt>take</tt>.
13     * The class is lightweight enough to be suitable for transient use
14     * when processing groups of tasks.
15 dl 1.5 *
16     * <p>
17     *
18     * <b>Usage Examples.</b>
19 dl 1.9 *
20     * Suppose you have a set of solvers for a certain problem, each
21     * returning a value of some type <tt>Result</tt>, and would like to
22     * run them concurrently, processing the results of each of them that
23     * return a non-null value, in some method <tt>use(Result r)</tt>. You
24     * could write this as:
25 dl 1.5 *
26     * <pre>
27 jsr166 1.15 * void solve(Executor e,
28     * Collection&lt;Callable&lt;Result&gt;&gt; solvers)
29     * throws InterruptedException, ExecutionException {
30     * CompletionService&lt;Result&gt; ecs
31     * = new ExecutorCompletionService&lt;Result&gt;(e);
32     * for (Callable&lt;Result&gt; s : solvers)
33     * ecs.submit(s);
34     * int n = solvers.size();
35     * for (int i = 0; i &lt; n; ++i) {
36     * Result r = ecs.take().get();
37     * if (r != null)
38     * use(r);
39     * }
40     * }
41 dl 1.5 * </pre>
42     *
43     * Suppose instead that you would like to use the first non-null result
44 dl 1.9 * of the set of tasks, ignoring any that encounter exceptions,
45 dl 1.6 * and cancelling all other tasks when the first one is ready:
46 dl 1.5 *
47     * <pre>
48 jsr166 1.15 * void solve(Executor e,
49     * Collection&lt;Callable&lt;Result&gt;&gt; solvers)
50     * throws InterruptedException {
51     * CompletionService&lt;Result&gt; ecs
52     * = new ExecutorCompletionService&lt;Result&gt;(e);
53     * int n = solvers.size();
54     * List&lt;Future&lt;Result&gt;&gt; futures
55     * = new ArrayList&lt;Future&lt;Result&gt;&gt;(n);
56     * Result result = null;
57     * try {
58     * for (Callable&lt;Result&gt; s : solvers)
59     * futures.add(ecs.submit(s));
60     * for (int i = 0; i &lt; n; ++i) {
61     * try {
62     * Result r = ecs.take().get();
63     * if (r != null) {
64     * result = r;
65     * break;
66     * }
67     * } catch (ExecutionException ignore) {}
68     * }
69     * }
70     * finally {
71     * for (Future&lt;Result&gt; f : futures)
72     * f.cancel(true);
73     * }
74     *
75     * if (result != null)
76     * use(result);
77     * }
78 dl 1.5 * </pre>
79 dl 1.1 */
80     public class ExecutorCompletionService<V> implements CompletionService<V> {
81 jozart 1.2 private final Executor executor;
82 dl 1.13 private final AbstractExecutorService aes;
83 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
84 dl 1.1
85     /**
86     * FutureTask extension to enqueue upon completion
87     */
88 dl 1.13 private class QueueingFuture extends FutureTask<Void> {
89     QueueingFuture(RunnableFuture<V> task) {
90     super(task, null);
91     this.task = task;
92     }
93     protected void done() { completionQueue.add(task); }
94     private final Future<V> task;
95     }
96    
97     private RunnableFuture<V> newTaskFor(Callable<V> task) {
98     if (aes == null)
99     return new FutureTask<V>(task);
100     else
101     return aes.newTaskFor(task);
102     }
103    
104     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
105     if (aes == null)
106     return new FutureTask<V>(task, result);
107     else
108     return aes.newTaskFor(task, result);
109 dl 1.1 }
110    
111     /**
112     * Creates an ExecutorCompletionService using the supplied
113 dl 1.3 * executor for base task execution and a
114     * {@link LinkedBlockingQueue} as a completion queue.
115 jsr166 1.15 *
116 dl 1.5 * @param executor the executor to use
117 dl 1.10 * @throws NullPointerException if executor is <tt>null</tt>
118 dl 1.1 */
119 jozart 1.2 public ExecutorCompletionService(Executor executor) {
120 jsr166 1.12 if (executor == null)
121 dl 1.1 throw new NullPointerException();
122     this.executor = executor;
123 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
124     (AbstractExecutorService) executor : null;
125 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
126 dl 1.1 }
127    
128     /**
129 dl 1.3 * Creates an ExecutorCompletionService using the supplied
130     * executor for base task execution and the supplied queue as its
131     * completion queue.
132 jsr166 1.15 *
133 dl 1.5 * @param executor the executor to use
134 dl 1.7 * @param completionQueue the queue to use as the completion queue
135 dl 1.17 * normally one dedicated for use by this service. This queue is
136     * treated as unbounded -- failed attempted <tt>Queue.add</tt>
137     * operations for completed taskes cause them not to be
138     * retrievable.
139 dl 1.10 * @throws NullPointerException if executor or completionQueue are <tt>null</tt>
140 dl 1.1 */
141 dl 1.3 public ExecutorCompletionService(Executor executor,
142     BlockingQueue<Future<V>> completionQueue) {
143 jsr166 1.12 if (executor == null || completionQueue == null)
144 dl 1.3 throw new NullPointerException();
145     this.executor = executor;
146 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
147     (AbstractExecutorService) executor : null;
148 dl 1.3 this.completionQueue = completionQueue;
149 dl 1.1 }
150    
151     public Future<V> submit(Callable<V> task) {
152 dl 1.7 if (task == null) throw new NullPointerException();
153 dl 1.13 RunnableFuture<V> f = newTaskFor(task);
154     executor.execute(new QueueingFuture(f));
155 dl 1.1 return f;
156     }
157    
158     public Future<V> submit(Runnable task, V result) {
159 dl 1.7 if (task == null) throw new NullPointerException();
160 dl 1.13 RunnableFuture<V> f = newTaskFor(task, result);
161     executor.execute(new QueueingFuture(f));
162 dl 1.1 return f;
163     }
164    
165     public Future<V> take() throws InterruptedException {
166 dl 1.3 return completionQueue.take();
167 dl 1.1 }
168    
169     public Future<V> poll() {
170 dl 1.3 return completionQueue.poll();
171 dl 1.1 }
172    
173     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
174 dl 1.3 return completionQueue.poll(timeout, unit);
175 dl 1.1 }
176 dl 1.5
177 dl 1.1 }