ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ExecutorCompletionService.java
Revision: 1.30
Committed: Sun Sep 3 15:57:23 2017 UTC (6 years, 8 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.29: +8 -0 lines
Log Message:
use @inheritDoc for unchecked exceptions

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.8 * Expert Group and released to the public domain, as explained at
4 jsr166 1.20 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.1 */
6    
7     package java.util.concurrent;
8    
9     /**
10 jozart 1.2 * A {@link CompletionService} that uses a supplied {@link Executor}
11 dl 1.7 * to execute tasks. This class arranges that submitted tasks are,
12 jsr166 1.18 * upon completion, placed on a queue accessible using {@code take}.
13 dl 1.7 * The class is lightweight enough to be suitable for transient use
14     * when processing groups of tasks.
15 dl 1.5 *
16     * <p>
17     *
18     * <b>Usage Examples.</b>
19 dl 1.9 *
20     * Suppose you have a set of solvers for a certain problem, each
21 jsr166 1.18 * returning a value of some type {@code Result}, and would like to
22 dl 1.9 * run them concurrently, processing the results of each of them that
23 jsr166 1.18 * return a non-null value, in some method {@code use(Result r)}. You
24 dl 1.9 * could write this as:
25 dl 1.5 *
26 jsr166 1.18 * <pre> {@code
27     * void solve(Executor e,
28     * Collection<Callable<Result>> solvers)
29 jsr166 1.15 * throws InterruptedException, ExecutionException {
30 jsr166 1.26 * CompletionService<Result> cs
31 jsr166 1.25 * = new ExecutorCompletionService<>(e);
32 jsr166 1.27 * solvers.forEach(cs::submit);
33 jsr166 1.26 * for (int i = solvers.size(); i > 0; i--) {
34     * Result r = cs.take().get();
35 jsr166 1.22 * if (r != null)
36     * use(r);
37     * }
38 jsr166 1.18 * }}</pre>
39 dl 1.5 *
40     * Suppose instead that you would like to use the first non-null result
41 dl 1.9 * of the set of tasks, ignoring any that encounter exceptions,
42 dl 1.6 * and cancelling all other tasks when the first one is ready:
43 dl 1.5 *
44 jsr166 1.18 * <pre> {@code
45     * void solve(Executor e,
46     * Collection<Callable<Result>> solvers)
47 jsr166 1.15 * throws InterruptedException {
48 jsr166 1.26 * CompletionService<Result> cs
49 jsr166 1.25 * = new ExecutorCompletionService<>(e);
50 jsr166 1.22 * int n = solvers.size();
51     * List<Future<Result>> futures = new ArrayList<>(n);
52     * Result result = null;
53     * try {
54 jsr166 1.28 * solvers.forEach(solver -> futures.add(cs.submit(solver)));
55 jsr166 1.26 * for (int i = n; i > 0; i--) {
56 jsr166 1.22 * try {
57 jsr166 1.26 * Result r = cs.take().get();
58 jsr166 1.22 * if (r != null) {
59     * result = r;
60     * break;
61 jsr166 1.18 * }
62 jsr166 1.22 * } catch (ExecutionException ignore) {}
63 jsr166 1.18 * }
64 jsr166 1.26 * } finally {
65 jsr166 1.28 * futures.forEach(future -> future.cancel(true));
66 jsr166 1.22 * }
67 jsr166 1.18 *
68 jsr166 1.22 * if (result != null)
69     * use(result);
70 jsr166 1.18 * }}</pre>
71 jsr166 1.29 *
72     * @since 1.5
73 dl 1.1 */
74     public class ExecutorCompletionService<V> implements CompletionService<V> {
75 jozart 1.2 private final Executor executor;
76 dl 1.13 private final AbstractExecutorService aes;
77 dl 1.3 private final BlockingQueue<Future<V>> completionQueue;
78 dl 1.1
79     /**
80 jsr166 1.24 * FutureTask extension to enqueue upon completion.
81 dl 1.1 */
82 jsr166 1.23 private static class QueueingFuture<V> extends FutureTask<Void> {
83     QueueingFuture(RunnableFuture<V> task,
84     BlockingQueue<Future<V>> completionQueue) {
85 dl 1.13 super(task, null);
86     this.task = task;
87 jsr166 1.23 this.completionQueue = completionQueue;
88 dl 1.13 }
89 jsr166 1.23 private final Future<V> task;
90     private final BlockingQueue<Future<V>> completionQueue;
91 dl 1.13 protected void done() { completionQueue.add(task); }
92     }
93    
94     private RunnableFuture<V> newTaskFor(Callable<V> task) {
95     if (aes == null)
96     return new FutureTask<V>(task);
97     else
98     return aes.newTaskFor(task);
99     }
100    
101     private RunnableFuture<V> newTaskFor(Runnable task, V result) {
102     if (aes == null)
103     return new FutureTask<V>(task, result);
104     else
105     return aes.newTaskFor(task, result);
106 dl 1.1 }
107    
108     /**
109     * Creates an ExecutorCompletionService using the supplied
110 dl 1.3 * executor for base task execution and a
111     * {@link LinkedBlockingQueue} as a completion queue.
112 jsr166 1.15 *
113 dl 1.5 * @param executor the executor to use
114 jsr166 1.18 * @throws NullPointerException if executor is {@code null}
115 dl 1.1 */
116 jozart 1.2 public ExecutorCompletionService(Executor executor) {
117 jsr166 1.12 if (executor == null)
118 dl 1.1 throw new NullPointerException();
119     this.executor = executor;
120 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
121     (AbstractExecutorService) executor : null;
122 dl 1.3 this.completionQueue = new LinkedBlockingQueue<Future<V>>();
123 dl 1.1 }
124    
125     /**
126 dl 1.3 * Creates an ExecutorCompletionService using the supplied
127     * executor for base task execution and the supplied queue as its
128     * completion queue.
129 jsr166 1.15 *
130 dl 1.5 * @param executor the executor to use
131 dl 1.7 * @param completionQueue the queue to use as the completion queue
132 jsr166 1.18 * normally one dedicated for use by this service. This
133     * queue is treated as unbounded -- failed attempted
134 jsr166 1.21 * {@code Queue.add} operations for completed tasks cause
135 jsr166 1.18 * them not to be retrievable.
136     * @throws NullPointerException if executor or completionQueue are {@code null}
137 dl 1.1 */
138 dl 1.3 public ExecutorCompletionService(Executor executor,
139     BlockingQueue<Future<V>> completionQueue) {
140 jsr166 1.12 if (executor == null || completionQueue == null)
141 dl 1.3 throw new NullPointerException();
142     this.executor = executor;
143 dl 1.13 this.aes = (executor instanceof AbstractExecutorService) ?
144     (AbstractExecutorService) executor : null;
145 dl 1.3 this.completionQueue = completionQueue;
146 dl 1.1 }
147    
148 jsr166 1.30 /**
149     * @throws RejectedExecutionException {@inheritDoc}
150     * @throws NullPointerException {@inheritDoc}
151     */
152 dl 1.1 public Future<V> submit(Callable<V> task) {
153 dl 1.7 if (task == null) throw new NullPointerException();
154 dl 1.13 RunnableFuture<V> f = newTaskFor(task);
155 jsr166 1.23 executor.execute(new QueueingFuture<V>(f, completionQueue));
156 dl 1.1 return f;
157     }
158    
159 jsr166 1.30 /**
160     * @throws RejectedExecutionException {@inheritDoc}
161     * @throws NullPointerException {@inheritDoc}
162     */
163 dl 1.1 public Future<V> submit(Runnable task, V result) {
164 dl 1.7 if (task == null) throw new NullPointerException();
165 dl 1.13 RunnableFuture<V> f = newTaskFor(task, result);
166 jsr166 1.23 executor.execute(new QueueingFuture<V>(f, completionQueue));
167 dl 1.1 return f;
168     }
169    
170     public Future<V> take() throws InterruptedException {
171 dl 1.3 return completionQueue.take();
172 dl 1.1 }
173    
174     public Future<V> poll() {
175 dl 1.3 return completionQueue.poll();
176 dl 1.1 }
177    
178 jsr166 1.19 public Future<V> poll(long timeout, TimeUnit unit)
179     throws InterruptedException {
180 dl 1.3 return completionQueue.poll(timeout, unit);
181 dl 1.1 }
182 dl 1.5
183 dl 1.1 }