ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.28
Committed: Sun Nov 6 22:15:01 2016 UTC (7 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.27: +2 -2 lines
Log Message:
elide parens in unary lambdas

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.8 * Expert Group and released to the public domain, as explained at
4 jsr166 1.20 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8    
9     /**
10 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
11 dl 1.7 * to execute tasks. This class arranges that submitted tasks are,
12 jsr166 1.18 * upon completion, placed on a queue accessible using {@code take}.
13 dl 1.7 * The class is lightweight enough to be suitable for transient use
14     * when processing groups of tasks.
15 dl 1.5 *
16     * <p>
17     *
18     * <b>Usage Examples.</b>
19 dl 1.9 *
20     * Suppose you have a set of solvers for a certain problem, each
21 jsr166 1.18 * returning a value of some type {@code Result}, and would like to
22 dl 1.9 * run them concurrently, processing the results of each of them that
23 jsr166 1.18 * return a non-null value, in some method {@code use(Result r)}. You
24 dl 1.9 * could write this as:
25 dl 1.5 *
26 jsr166 1.18 * <pre> {@code
27     * void solve(Executor e,
28     * Collection<Callable<Result>> solvers)
29 jsr166 1.15 * throws InterruptedException, ExecutionException {
30 jsr166 1.26 * CompletionService<Result> cs
31 jsr166 1.25 * = new ExecutorCompletionService<>(e);
32 jsr166 1.27 * solvers.forEach(cs::submit);
33 jsr166 1.26 * for (int i = solvers.size(); i > 0; i--) {
34     * Result r = cs.take().get();
35 jsr166 1.22 * if (r != null)
36     * use(r);
37     * }
38 jsr166 1.18 * }}</pre>
39 dl 1.5 *
40     * Suppose instead that you would like to use the first non-null result
41 dl 1.9 * of the set of tasks, ignoring any that encounter exceptions,
42 dl 1.6 * and cancelling all other tasks when the first one is ready:
43 dl 1.5 *
44 jsr166 1.18 * <pre> {@code
45     * void solve(Executor e,
46     * Collection<Callable<Result>> solvers)
47 jsr166 1.15 * throws InterruptedException {
48 jsr166 1.26 * CompletionService<Result> cs
49 jsr166 1.25 * = new ExecutorCompletionService<>(e);
50 jsr166 1.22 * int n = solvers.size();
51     * List<Future<Result>> futures = new ArrayList<>(n);
52     * Result result = null;
53     * try {
54 jsr166 1.28 * solvers.forEach(solver -> futures.add(cs.submit(solver)));
55 jsr166 1.26 * for (int i = n; i > 0; i--) {
56 jsr166 1.22 * try {
57 jsr166 1.26 * Result r = cs.take().get();
58 jsr166 1.22 * if (r != null) {
59     * result = r;
60     * break;
61 jsr166 1.18 * }
62 jsr166 1.22 * } catch (ExecutionException ignore) {}
63 jsr166 1.18 * }
64 jsr166 1.26 * } finally {
65 jsr166 1.28 * futures.forEach(future -> future.cancel(true));
66 jsr166 1.22 * }
67 jsr166 1.18 *
68 jsr166 1.22 * if (result != null)
69     * use(result);
70 jsr166 1.18 * }}</pre>
71 dl 1.1 */
72     public class ExecutorCompletionService<V> implements CompletionService<V> {
73 jozart 1.2 private final Executor executor;
74 dl 1.13 private final AbstractExecutorService aes;
75 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
76 dl 1.1
77     /**
78 jsr166 1.24 * FutureTask extension to enqueue upon completion.
79 dl 1.1 */
80 jsr166 1.23 private static class QueueingFuture<V> extends FutureTask<Void> {
81     QueueingFuture(RunnableFuture<V> task,
82     BlockingQueue<Future<V>> completionQueue) {
83 dl 1.13 super(task, null);
84     this.task = task;
85 jsr166 1.23 this.completionQueue = completionQueue;
86 dl 1.13 }
87 jsr166 1.23 private final Future<V> task;
88     private final BlockingQueue<Future<V>> completionQueue;
89 dl 1.13 protected void done() { completionQueue.add(task); }
90     }
91    
92     private RunnableFuture<V> newTaskFor(Callable<V> task) {
93     if (aes == null)
94     return new FutureTask<V>(task);
95     else
96     return aes.newTaskFor(task);
97     }
98    
99     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
100     if (aes == null)
101     return new FutureTask<V>(task, result);
102     else
103     return aes.newTaskFor(task, result);
104 dl 1.1 }
105    
106     /**
107     * Creates an ExecutorCompletionService using the supplied
108 dl 1.3 * executor for base task execution and a
109     * {@link LinkedBlockingQueue} as a completion queue.
110 jsr166 1.15 *
111 dl 1.5 * @param executor the executor to use
112 jsr166 1.18 * @throws NullPointerException if executor is {@code null}
113 dl 1.1 */
114 jozart 1.2 public ExecutorCompletionService(Executor executor) {
115 jsr166 1.12 if (executor == null)
116 dl 1.1 throw new NullPointerException();
117     this.executor = executor;
118 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
119     (AbstractExecutorService) executor : null;
120 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
121 dl 1.1 }
122    
123     /**
124 dl 1.3 * Creates an ExecutorCompletionService using the supplied
125     * executor for base task execution and the supplied queue as its
126     * completion queue.
127 jsr166 1.15 *
128 dl 1.5 * @param executor the executor to use
129 dl 1.7 * @param completionQueue the queue to use as the completion queue
130 jsr166 1.18 * normally one dedicated for use by this service. This
131     * queue is treated as unbounded -- failed attempted
132 jsr166 1.21 * {@code Queue.add} operations for completed tasks cause
133 jsr166 1.18 * them not to be retrievable.
134     * @throws NullPointerException if executor or completionQueue are {@code null}
135 dl 1.1 */
136 dl 1.3 public ExecutorCompletionService(Executor executor,
137     BlockingQueue<Future<V>> completionQueue) {
138 jsr166 1.12 if (executor == null || completionQueue == null)
139 dl 1.3 throw new NullPointerException();
140     this.executor = executor;
141 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
142     (AbstractExecutorService) executor : null;
143 dl 1.3 this.completionQueue = completionQueue;
144 dl 1.1 }
145    
146     public Future<V> submit(Callable<V> task) {
147 dl 1.7 if (task == null) throw new NullPointerException();
148 dl 1.13 RunnableFuture<V> f = newTaskFor(task);
149 jsr166 1.23 executor.execute(new QueueingFuture<V>(f, completionQueue));
150 dl 1.1 return f;
151     }
152    
153     public Future<V> submit(Runnable task, V result) {
154 dl 1.7 if (task == null) throw new NullPointerException();
155 dl 1.13 RunnableFuture<V> f = newTaskFor(task, result);
156 jsr166 1.23 executor.execute(new QueueingFuture<V>(f, completionQueue));
157 dl 1.1 return f;
158     }
159    
160     public Future<V> take() throws InterruptedException {
161 dl 1.3 return completionQueue.take();
162 dl 1.1 }
163    
164     public Future<V> poll() {
165 dl 1.3 return completionQueue.poll();
166 dl 1.1 }
167    
168 jsr166 1.19 public Future<V> poll(long timeout, TimeUnit unit)
169     throws InterruptedException {
170 dl 1.3 return completionQueue.poll(timeout, unit);
171 dl 1.1 }
172 dl 1.5
173 dl 1.1 }