ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.24
Committed: Sun Sep 20 17:03:22 2015 UTC (8 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.23: +1 -1 lines
Log Message:
Terminate javadoc with a period.

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.8 * Expert Group and released to the public domain, as explained at
4 jsr166 1.20 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8    
9     /**
10 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
11 dl 1.7 * to execute tasks. This class arranges that submitted tasks are,
12 jsr166 1.18 * upon completion, placed on a queue accessible using {@code take}.
13 dl 1.7 * The class is lightweight enough to be suitable for transient use
14     * when processing groups of tasks.
15 dl 1.5 *
16     * <p>
17     *
18     * <b>Usage Examples.</b>
19 dl 1.9 *
20     * Suppose you have a set of solvers for a certain problem, each
21 jsr166 1.18 * returning a value of some type {@code Result}, and would like to
22 dl 1.9 * run them concurrently, processing the results of each of them that
23 jsr166 1.18 * return a non-null value, in some method {@code use(Result r)}. You
24 dl 1.9 * could write this as:
25 dl 1.5 *
26 jsr166 1.18 * <pre> {@code
27     * void solve(Executor e,
28     * Collection<Callable<Result>> solvers)
29 jsr166 1.15 * throws InterruptedException, ExecutionException {
30 jsr166 1.22 * CompletionService<Result> ecs
31     * = new ExecutorCompletionService<Result>(e);
32     * for (Callable<Result> s : solvers)
33     * ecs.submit(s);
34     * int n = solvers.size();
35     * for (int i = 0; i < n; ++i) {
36     * Result r = ecs.take().get();
37     * if (r != null)
38     * use(r);
39     * }
40 jsr166 1.18 * }}</pre>
41 dl 1.5 *
42     * Suppose instead that you would like to use the first non-null result
43 dl 1.9 * of the set of tasks, ignoring any that encounter exceptions,
44 dl 1.6 * and cancelling all other tasks when the first one is ready:
45 dl 1.5 *
46 jsr166 1.18 * <pre> {@code
47     * void solve(Executor e,
48     * Collection<Callable<Result>> solvers)
49 jsr166 1.15 * throws InterruptedException {
50 jsr166 1.22 * CompletionService<Result> ecs
51     * = new ExecutorCompletionService<Result>(e);
52     * int n = solvers.size();
53     * List<Future<Result>> futures = new ArrayList<>(n);
54     * Result result = null;
55     * try {
56     * for (Callable<Result> s : solvers)
57     * futures.add(ecs.submit(s));
58     * for (int i = 0; i < n; ++i) {
59     * try {
60     * Result r = ecs.take().get();
61     * if (r != null) {
62     * result = r;
63     * break;
64 jsr166 1.18 * }
65 jsr166 1.22 * } catch (ExecutionException ignore) {}
66 jsr166 1.18 * }
67 jsr166 1.22 * }
68     * finally {
69     * for (Future<Result> f : futures)
70     * f.cancel(true);
71     * }
72 jsr166 1.18 *
73 jsr166 1.22 * if (result != null)
74     * use(result);
75 jsr166 1.18 * }}</pre>
76 dl 1.1 */
77     public class ExecutorCompletionService<V> implements CompletionService<V> {
78 jozart 1.2 private final Executor executor;
79 dl 1.13 private final AbstractExecutorService aes;
80 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
81 dl 1.1
82     /**
83 jsr166 1.24 * FutureTask extension to enqueue upon completion.
84 dl 1.1 */
85 jsr166 1.23 private static class QueueingFuture<V> extends FutureTask<Void> {
86     QueueingFuture(RunnableFuture<V> task,
87     BlockingQueue<Future<V>> completionQueue) {
88 dl 1.13 super(task, null);
89     this.task = task;
90 jsr166 1.23 this.completionQueue = completionQueue;
91 dl 1.13 }
92 jsr166 1.23 private final Future<V> task;
93     private final BlockingQueue<Future<V>> completionQueue;
94 dl 1.13 protected void done() { completionQueue.add(task); }
95     }
96    
97     private RunnableFuture<V> newTaskFor(Callable<V> task) {
98     if (aes == null)
99     return new FutureTask<V>(task);
100     else
101     return aes.newTaskFor(task);
102     }
103    
104     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
105     if (aes == null)
106     return new FutureTask<V>(task, result);
107     else
108     return aes.newTaskFor(task, result);
109 dl 1.1 }
110    
111     /**
112     * Creates an ExecutorCompletionService using the supplied
113 dl 1.3 * executor for base task execution and a
114     * {@link LinkedBlockingQueue} as a completion queue.
115 jsr166 1.15 *
116 dl 1.5 * @param executor the executor to use
117 jsr166 1.18 * @throws NullPointerException if executor is {@code null}
118 dl 1.1 */
119 jozart 1.2 public ExecutorCompletionService(Executor executor) {
120 jsr166 1.12 if (executor == null)
121 dl 1.1 throw new NullPointerException();
122     this.executor = executor;
123 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
124     (AbstractExecutorService) executor : null;
125 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
126 dl 1.1 }
127    
128     /**
129 dl 1.3 * Creates an ExecutorCompletionService using the supplied
130     * executor for base task execution and the supplied queue as its
131     * completion queue.
132 jsr166 1.15 *
133 dl 1.5 * @param executor the executor to use
134 dl 1.7 * @param completionQueue the queue to use as the completion queue
135 jsr166 1.18 * normally one dedicated for use by this service. This
136     * queue is treated as unbounded -- failed attempted
137 jsr166 1.21 * {@code Queue.add} operations for completed tasks cause
138 jsr166 1.18 * them not to be retrievable.
139     * @throws NullPointerException if executor or completionQueue are {@code null}
140 dl 1.1 */
141 dl 1.3 public ExecutorCompletionService(Executor executor,
142     BlockingQueue<Future<V>> completionQueue) {
143 jsr166 1.12 if (executor == null || completionQueue == null)
144 dl 1.3 throw new NullPointerException();
145     this.executor = executor;
146 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
147     (AbstractExecutorService) executor : null;
148 dl 1.3 this.completionQueue = completionQueue;
149 dl 1.1 }
150    
151     public Future<V> submit(Callable<V> task) {
152 dl 1.7 if (task == null) throw new NullPointerException();
153 dl 1.13 RunnableFuture<V> f = newTaskFor(task);
154 jsr166 1.23 executor.execute(new QueueingFuture<V>(f, completionQueue));
155 dl 1.1 return f;
156     }
157    
158     public Future<V> submit(Runnable task, V result) {
159 dl 1.7 if (task == null) throw new NullPointerException();
160 dl 1.13 RunnableFuture<V> f = newTaskFor(task, result);
161 jsr166 1.23 executor.execute(new QueueingFuture<V>(f, completionQueue));
162 dl 1.1 return f;
163     }
164    
165     public Future<V> take() throws InterruptedException {
166 dl 1.3 return completionQueue.take();
167 dl 1.1 }
168    
169     public Future<V> poll() {
170 dl 1.3 return completionQueue.poll();
171 dl 1.1 }
172    
173 jsr166 1.19 public Future<V> poll(long timeout, TimeUnit unit)
174     throws InterruptedException {
175 dl 1.3 return completionQueue.poll(timeout, unit);
176 dl 1.1 }
177 dl 1.5
178 dl 1.1 }