ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.28
Committed: Sun Nov 6 22:15:01 2016 UTC (7 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.27: +2 -2 lines
Log Message:
elide parens in unary lambdas

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 /**
10 * A {@link CompletionService} that uses a supplied {@link Executor}
11 * to execute tasks. This class arranges that submitted tasks are,
12 * upon completion, placed on a queue accessible using {@code take}.
13 * The class is lightweight enough to be suitable for transient use
14 * when processing groups of tasks.
15 *
16 * <p>
17 *
18 * <b>Usage Examples.</b>
19 *
20 * Suppose you have a set of solvers for a certain problem, each
21 * returning a value of some type {@code Result}, and would like to
22 * run them concurrently, processing the results of each of them that
23 * return a non-null value, in some method {@code use(Result r)}. You
24 * could write this as:
25 *
26 * <pre> {@code
27 * void solve(Executor e,
28 * Collection<Callable<Result>> solvers)
29 * throws InterruptedException, ExecutionException {
30 * CompletionService<Result> cs
31 * = new ExecutorCompletionService<>(e);
32 * solvers.forEach(cs::submit);
33 * for (int i = solvers.size(); i > 0; i--) {
34 * Result r = cs.take().get();
35 * if (r != null)
36 * use(r);
37 * }
38 * }}</pre>
39 *
40 * Suppose instead that you would like to use the first non-null result
41 * of the set of tasks, ignoring any that encounter exceptions,
42 * and cancelling all other tasks when the first one is ready:
43 *
44 * <pre> {@code
45 * void solve(Executor e,
46 * Collection<Callable<Result>> solvers)
47 * throws InterruptedException {
48 * CompletionService<Result> cs
49 * = new ExecutorCompletionService<>(e);
50 * int n = solvers.size();
51 * List<Future<Result>> futures = new ArrayList<>(n);
52 * Result result = null;
53 * try {
54 * solvers.forEach(solver -> futures.add(cs.submit(solver)));
55 * for (int i = n; i > 0; i--) {
56 * try {
57 * Result r = cs.take().get();
58 * if (r != null) {
59 * result = r;
60 * break;
61 * }
62 * } catch (ExecutionException ignore) {}
63 * }
64 * } finally {
65 * futures.forEach(future -> future.cancel(true));
66 * }
67 *
68 * if (result != null)
69 * use(result);
70 * }}</pre>
71 */
72 public class ExecutorCompletionService<V> implements CompletionService<V> {
73 private final Executor executor;
74 private final AbstractExecutorService aes;
75 private final BlockingQueue<Future<V>> completionQueue;
76
77 /**
78 * FutureTask extension to enqueue upon completion.
79 */
80 private static class QueueingFuture<V> extends FutureTask<Void> {
81 QueueingFuture(RunnableFuture<V> task,
82 BlockingQueue<Future<V>> completionQueue) {
83 super(task, null);
84 this.task = task;
85 this.completionQueue = completionQueue;
86 }
87 private final Future<V> task;
88 private final BlockingQueue<Future<V>> completionQueue;
89 protected void done() { completionQueue.add(task); }
90 }
91
92 private RunnableFuture<V> newTaskFor(Callable<V> task) {
93 if (aes == null)
94 return new FutureTask<V>(task);
95 else
96 return aes.newTaskFor(task);
97 }
98
99 private RunnableFuture<V> newTaskFor(Runnable task, V result) {
100 if (aes == null)
101 return new FutureTask<V>(task, result);
102 else
103 return aes.newTaskFor(task, result);
104 }
105
106 /**
107 * Creates an ExecutorCompletionService using the supplied
108 * executor for base task execution and a
109 * {@link LinkedBlockingQueue} as a completion queue.
110 *
111 * @param executor the executor to use
112 * @throws NullPointerException if executor is {@code null}
113 */
114 public ExecutorCompletionService(Executor executor) {
115 if (executor == null)
116 throw new NullPointerException();
117 this.executor = executor;
118 this.aes = (executor instanceof AbstractExecutorService) ?
119 (AbstractExecutorService) executor : null;
120 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
121 }
122
123 /**
124 * Creates an ExecutorCompletionService using the supplied
125 * executor for base task execution and the supplied queue as its
126 * completion queue.
127 *
128 * @param executor the executor to use
129 * @param completionQueue the queue to use as the completion queue
130 * normally one dedicated for use by this service. This
131 * queue is treated as unbounded -- failed attempted
132 * {@code Queue.add} operations for completed tasks cause
133 * them not to be retrievable.
134 * @throws NullPointerException if executor or completionQueue are {@code null}
135 */
136 public ExecutorCompletionService(Executor executor,
137 BlockingQueue<Future<V>> completionQueue) {
138 if (executor == null || completionQueue == null)
139 throw new NullPointerException();
140 this.executor = executor;
141 this.aes = (executor instanceof AbstractExecutorService) ?
142 (AbstractExecutorService) executor : null;
143 this.completionQueue = completionQueue;
144 }
145
146 public Future<V> submit(Callable<V> task) {
147 if (task == null) throw new NullPointerException();
148 RunnableFuture<V> f = newTaskFor(task);
149 executor.execute(new QueueingFuture<V>(f, completionQueue));
150 return f;
151 }
152
153 public Future<V> submit(Runnable task, V result) {
154 if (task == null) throw new NullPointerException();
155 RunnableFuture<V> f = newTaskFor(task, result);
156 executor.execute(new QueueingFuture<V>(f, completionQueue));
157 return f;
158 }
159
160 public Future<V> take() throws InterruptedException {
161 return completionQueue.take();
162 }
163
164 public Future<V> poll() {
165 return completionQueue.poll();
166 }
167
168 public Future<V> poll(long timeout, TimeUnit unit)
169 throws InterruptedException {
170 return completionQueue.poll(timeout, unit);
171 }
172
173 }