ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
Revision: 1.15
Committed: Thu May 27 16:47:21 2010 UTC (14 years ago) by dl
Branch: MAIN
Changes since 1.14: +215 -279 lines
Log Message:
Sync with jsr166y

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.io.Serializable;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.RandomAccess;
14 import java.util.Map;
15 import java.util.WeakHashMap;
16
17 /**
18 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
19 * A {@code ForkJoinTask} is a thread-like entity that is much
20 * lighter weight than a normal thread. Huge numbers of tasks and
21 * subtasks may be hosted by a small number of actual threads in a
22 * ForkJoinPool, at the price of some usage limitations.
23 *
24 * <p>A "main" {@code ForkJoinTask} begins execution when submitted
25 * to a {@link ForkJoinPool}. Once started, it will usually in turn
26 * start other subtasks. As indicated by the name of this class,
27 * many programs using {@code ForkJoinTask} employ only methods
28 * {@link #fork} and {@link #join}, or derivatives such as {@link
29 * #invokeAll}. However, this class also provides a number of other
30 * methods that can come into play in advanced usages, as well as
31 * extension mechanics that allow support of new forms of fork/join
32 * processing.
33 *
34 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
35 * The efficiency of {@code ForkJoinTask}s stems from a set of
36 * restrictions (that are only partially statically enforceable)
37 * reflecting their intended use as computational tasks calculating
38 * pure functions or operating on purely isolated objects. The
39 * primary coordination mechanisms are {@link #fork}, that arranges
40 * asynchronous execution, and {@link #join}, that doesn't proceed
41 * until the task's result has been computed. Computations should
42 * avoid {@code synchronized} methods or blocks, and should minimize
43 * other blocking synchronization apart from joining other tasks or
44 * using synchronizers such as Phasers that are advertised to
45 * cooperate with fork/join scheduling. Tasks should also not perform
46 * blocking IO, and should ideally access variables that are
47 * completely independent of those accessed by other running
48 * tasks. Minor breaches of these restrictions, for example using
49 * shared output streams, may be tolerable in practice, but frequent
50 * use may result in poor performance, and the potential to
51 * indefinitely stall if the number of threads not waiting for IO or
52 * other external synchronization becomes exhausted. This usage
53 * restriction is in part enforced by not permitting checked
54 * exceptions such as {@code IOExceptions} to be thrown. However,
55 * computations may still encounter unchecked exceptions, that are
56 * rethrown to callers attempting to join them. These exceptions may
57 * additionally include {@link RejectedExecutionException} stemming
58 * from internal resource exhaustion, such as failure to allocate
59 * internal task queues.
60 *
61 * <p>The primary method for awaiting completion and extracting
62 * results of a task is {@link #join}, but there are several variants:
63 * The {@link Future#get} methods support interruptible and/or timed
64 * waits for completion and report results using {@code Future}
65 * conventions. Method {@link #helpJoin} enables callers to actively
66 * execute other tasks while awaiting joins, which is sometimes more
67 * efficient but only applies when all subtasks are known to be
68 * strictly tree-structured. Method {@link #invoke} is semantically
69 * equivalent to {@code fork(); join()} but always attempts to begin
70 * execution in the current thread. The "<em>quiet</em>" forms of
71 * these methods do not extract results or report exceptions. These
72 * may be useful when a set of tasks are being executed, and you need
73 * to delay processing of results or exceptions until all complete.
74 * Method {@code invokeAll} (available in multiple versions)
75 * performs the most common form of parallel invocation: forking a set
76 * of tasks and joining them all.
77 *
78 * <p>The execution status of tasks may be queried at several levels
79 * of detail: {@link #isDone} is true if a task completed in any way
80 * (including the case where a task was cancelled without executing);
81 * {@link #isCompletedNormally} is true if a task completed without
82 * cancellation or encountering an exception; {@link #isCancelled} is
83 * true if the task was cancelled (in which case {@link #getException}
84 * returns a {@link java.util.concurrent.CancellationException}); and
85 * {@link #isCompletedAbnormally} is true if a task was either
86 * cancelled or encountered an exception, in which case {@link
87 * #getException} will return either the encountered exception or
88 * {@link java.util.concurrent.CancellationException}.
89 *
90 * <p>The ForkJoinTask class is not usually directly subclassed.
91 * Instead, you subclass one of the abstract classes that support a
92 * particular style of fork/join processing, typically {@link
93 * RecursiveAction} for computations that do not return results, or
94 * {@link RecursiveTask} for those that do. Normally, a concrete
95 * ForkJoinTask subclass declares fields comprising its parameters,
96 * established in a constructor, and then defines a {@code compute}
97 * method that somehow uses the control methods supplied by this base
98 * class. While these methods have {@code public} access (to allow
99 * instances of different task subclasses to call each other's
100 * methods), some of them may only be called from within other
101 * ForkJoinTasks (as may be determined using method {@link
102 * #inForkJoinPool}). Attempts to invoke them in other contexts
103 * result in exceptions or errors, possibly including
104 * ClassCastException.
105 *
106 * <p>Most base support methods are {@code final}, to prevent
107 * overriding of implementations that are intrinsically tied to the
108 * underlying lightweight task scheduling framework. Developers
109 * creating new basic styles of fork/join processing should minimally
110 * implement {@code protected} methods {@link #exec}, {@link
111 * #setRawResult}, and {@link #getRawResult}, while also introducing
112 * an abstract computational method that can be implemented in its
113 * subclasses, possibly relying on other {@code protected} methods
114 * provided by this class.
115 *
116 * <p>ForkJoinTasks should perform relatively small amounts of
117 * computation. Large tasks should be split into smaller subtasks,
118 * usually via recursive decomposition. As a very rough rule of thumb,
119 * a task should perform more than 100 and less than 10000 basic
120 * computational steps. If tasks are too big, then parallelism cannot
121 * improve throughput. If too small, then memory and internal task
122 * maintenance overhead may overwhelm processing.
123 *
124 * <p>This class provides {@code adapt} methods for {@link Runnable}
125 * and {@link Callable}, that may be of use when mixing execution of
126 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
127 * are of this form, consider using a pool in
128 * {@linkplain ForkJoinPool#setAsyncMode async mode}.
129 *
130 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
131 * used in extensions such as remote execution frameworks. It is
132 * sensible to serialize tasks only before or after, but not during,
133 * execution. Serialization is not relied on during execution itself.
134 *
135 * @since 1.7
136 * @author Doug Lea
137 */
138 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
139
140 /*
141 * See the internal documentation of class ForkJoinPool for a
142 * general implementation overview. ForkJoinTasks are mainly
143 * responsible for maintaining their "status" field amidst relays
144 * to methods in ForkJoinWorkerThread and ForkJoinPool. The
145 * methods of this class are more-or-less layered into (1) basic
146 * status maintenance (2) execution and awaiting completion (3)
147 * user-level methods that additionally report results. This is
148 * sometimes hard to see because this file orders exported methods
149 * in a way that flows well in javadocs.
150 */
151
152 /**
153 * Run control status bits packed into a single int to minimize
154 * footprint and to ensure atomicity (via CAS). Status is
155 * initially zero, and takes on nonnegative values until
156 * completed, upon which status holds COMPLETED. CANCELLED, or
157 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
158 * blocking waits by other threads have the SIGNAL bit set.
159 *
160 * Completion of a stolen task with SIGNAL set awakens any waiters
161 * via notifyAll. Even though suboptimal for some purposes, we use
162 * basic builtin wait/notify to take advantage of "monitor
163 * inflation" in JVMs that we would otherwise need to emulate to
164 * avoid adding further per-task bookkeeping overhead. We want
165 * these monitors to be "fat", i.e., not use biasing or thin-lock
166 * techniques, so use some odd coding idioms that tend to avoid
167 * them.
168 *
169 * Note that bits 1-28 are currently unused. Also value
170 * 0x80000000 is available as spare completion value.
171 */
172 volatile int status; // accessed directly by pool and workers
173
174 private static final int COMPLETION_MASK = 0xe0000000;
175 private static final int NORMAL = 0xe0000000; // == mask
176 private static final int CANCELLED = 0xc0000000;
177 private static final int EXCEPTIONAL = 0xa0000000;
178 private static final int SIGNAL = 0x00000001;
179
180 /**
181 * Table of exceptions thrown by tasks, to enable reporting by
182 * callers. Because exceptions are rare, we don't directly keep
183 * them with task objects, but instead use a weak ref table. Note
184 * that cancellation exceptions don't appear in the table, but are
185 * instead recorded as status values.
186 * TODO: Use ConcurrentReferenceHashMap
187 */
188 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
189 Collections.synchronizedMap
190 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
191
192 // Maintaining completion status
193
194 /**
195 * Marks completion and wakes up threads waiting to join this task,
196 * also clearing signal request bits.
197 *
198 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
199 * @return status on exit
200 */
201 private int setCompletion(int completion) {
202 int s;
203 while ((s = status) >= 0) {
204 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
205 if ((s & SIGNAL) != 0)
206 synchronized (this) { notifyAll(); }
207 return completion;
208 }
209 }
210 return s;
211 }
212
213 /**
214 * Record exception and set exceptional completion
215 * @return status on exit
216 */
217 private int setExceptionalCompletion(Throwable rex) {
218 exceptionMap.put(this, rex);
219 return setCompletion(EXCEPTIONAL);
220 }
221
222 /**
223 * Blocks a worker thread until completion. Called only by pool.
224 */
225 final void internalAwaitDone() {
226 int s;
227 while ((s = status) >= 0) {
228 synchronized(this) {
229 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
230 do {
231 try {
232 wait();
233 } catch (InterruptedException ie) {
234 cancelIfTerminating();
235 }
236 } while (status >= 0);
237 break;
238 }
239 }
240 }
241 }
242
243 /**
244 * Blocks a non-worker-thread until completion.
245 * @return status on exit
246 */
247 private int externalAwaitDone() {
248 int s;
249 while ((s = status) >= 0) {
250 synchronized(this) {
251 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, s|SIGNAL)){
252 boolean interrupted = false;
253 do {
254 try {
255 wait();
256 } catch (InterruptedException ie) {
257 interrupted = true;
258 }
259 } while ((s = status) >= 0);
260 if (interrupted)
261 Thread.currentThread().interrupt();
262 break;
263 }
264 }
265 }
266 return s;
267 }
268
269 /**
270 * Unless done, calls exec and records status if completed, but
271 * doesn't wait for completion otherwise.
272 */
273 final void tryExec() {
274 try {
275 if (status < 0 || !exec())
276 return;
277 } catch (Throwable rex) {
278 setExceptionalCompletion(rex);
279 return;
280 }
281 setCompletion(NORMAL); // must be outside try block
282 }
283
284 /**
285 * If not done and this task is next in worker queue, runs it,
286 * else waits for it.
287 * @return status on exit
288 */
289 private int waitingJoin() {
290 int s = status;
291 if (s < 0)
292 return s;
293 Thread t = Thread.currentThread();
294 if (t instanceof ForkJoinWorkerThread) {
295 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
296 if (w.unpushTask(this)) {
297 boolean completed;
298 try {
299 completed = exec();
300 } catch (Throwable rex) {
301 return setExceptionalCompletion(rex);
302 }
303 if (completed)
304 return setCompletion(NORMAL);
305 }
306 return w.pool.awaitJoin(this);
307 }
308 else
309 return externalAwaitDone();
310 }
311
312 /**
313 * Unless done, calls exec and records status if completed, or
314 * waits for completion otherwise.
315 * @return status on exit
316 */
317 private int waitingInvoke() {
318 int s = status;
319 if (s < 0)
320 return s;
321 boolean completed;
322 try {
323 completed = exec();
324 } catch (Throwable rex) {
325 return setExceptionalCompletion(rex);
326 }
327 if (completed)
328 return setCompletion(NORMAL);
329 return waitingJoin();
330 }
331
332 /**
333 * If this task is next in worker queue, runs it, else processes other
334 * tasks until complete.
335 * @return status on exit
336 */
337 private int busyJoin() {
338 int s = status;
339 if (s < 0)
340 return s;
341 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
342 if (w.unpushTask(this)) {
343 boolean completed;
344 try {
345 completed = exec();
346 } catch (Throwable rex) {
347 return setExceptionalCompletion(rex);
348 }
349 if (completed)
350 return setCompletion(NORMAL);
351 }
352 return w.execWhileJoining(this);
353 }
354
355 /**
356 * Returns result or throws exception associated with given status.
357 * @param s the status
358 */
359 private V reportResult(int s) {
360 Throwable ex;
361 if (s < NORMAL && (ex = getException()) != null)
362 UNSAFE.throwException(ex);
363 return getRawResult();
364 }
365
366 // public methods
367
368 /**
369 * Arranges to asynchronously execute this task. While it is not
370 * necessarily enforced, it is a usage error to fork a task more
371 * than once unless it has completed and been reinitialized.
372 * Subsequent modifications to the state of this task or any data
373 * it operates on are not necessarily consistently observable by
374 * any thread other than the one executing it unless preceded by a
375 * call to {@link #join} or related methods, or a call to {@link
376 * #isDone} returning {@code true}.
377 *
378 * <p>This method may be invoked only from within {@code
379 * ForkJoinTask} computations (as may be determined using method
380 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
381 * result in exceptions or errors, possibly including {@code
382 * ClassCastException}.
383 *
384 * @return {@code this}, to simplify usage
385 */
386 public final ForkJoinTask<V> fork() {
387 ((ForkJoinWorkerThread) Thread.currentThread())
388 .pushTask(this);
389 return this;
390 }
391
392 /**
393 * Returns the result of the computation when it {@link #isDone is done}.
394 * This method differs from {@link #get()} in that
395 * abnormal completion results in {@code RuntimeException} or
396 * {@code Error}, not {@code ExecutionException}.
397 *
398 * @return the computed result
399 */
400 public final V join() {
401 return reportResult(waitingJoin());
402 }
403
404 /**
405 * Commences performing this task, awaits its completion if
406 * necessary, and return its result, or throws an (unchecked)
407 * exception if the underlying computation did so.
408 *
409 * @return the computed result
410 */
411 public final V invoke() {
412 return reportResult(waitingInvoke());
413 }
414
415 /**
416 * Forks the given tasks, returning when {@code isDone} holds for
417 * each task or an (unchecked) exception is encountered, in which
418 * case the exception is rethrown. If either task encounters an
419 * exception, the other one may be, but is not guaranteed to be,
420 * cancelled. If both tasks throw an exception, then this method
421 * throws one of them. The individual status of each task may be
422 * checked using {@link #getException()} and related methods.
423 *
424 * <p>This method may be invoked only from within {@code
425 * ForkJoinTask} computations (as may be determined using method
426 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
427 * result in exceptions or errors, possibly including {@code
428 * ClassCastException}.
429 *
430 * @param t1 the first task
431 * @param t2 the second task
432 * @throws NullPointerException if any task is null
433 */
434 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
435 t2.fork();
436 t1.invoke();
437 t2.join();
438 }
439
440 /**
441 * Forks the given tasks, returning when {@code isDone} holds for
442 * each task or an (unchecked) exception is encountered, in which
443 * case the exception is rethrown. If any task encounters an
444 * exception, others may be, but are not guaranteed to be,
445 * cancelled. If more than one task encounters an exception, then
446 * this method throws any one of these exceptions. The individual
447 * status of each task may be checked using {@link #getException()}
448 * and related methods.
449 *
450 * <p>This method may be invoked only from within {@code
451 * ForkJoinTask} computations (as may be determined using method
452 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
453 * result in exceptions or errors, possibly including {@code
454 * ClassCastException}.
455 *
456 * @param tasks the tasks
457 * @throws NullPointerException if any task is null
458 */
459 public static void invokeAll(ForkJoinTask<?>... tasks) {
460 Throwable ex = null;
461 int last = tasks.length - 1;
462 for (int i = last; i >= 0; --i) {
463 ForkJoinTask<?> t = tasks[i];
464 if (t == null) {
465 if (ex == null)
466 ex = new NullPointerException();
467 }
468 else if (i != 0)
469 t.fork();
470 else if (t.waitingInvoke() < NORMAL && ex == null)
471 ex = t.getException();
472 }
473 for (int i = 1; i <= last; ++i) {
474 ForkJoinTask<?> t = tasks[i];
475 if (t != null) {
476 if (ex != null)
477 t.cancel(false);
478 else if (t.waitingJoin() < NORMAL && ex == null)
479 ex = t.getException();
480 }
481 }
482 if (ex != null)
483 UNSAFE.throwException(ex);
484 }
485
486 /**
487 * Forks all tasks in the specified collection, returning when
488 * {@code isDone} holds for each task or an (unchecked) exception
489 * is encountered. If any task encounters an exception, others
490 * may be, but are not guaranteed to be, cancelled. If more than
491 * one task encounters an exception, then this method throws any
492 * one of these exceptions. The individual status of each task
493 * may be checked using {@link #getException()} and related
494 * methods. The behavior of this operation is undefined if the
495 * specified collection is modified while the operation is in
496 * progress.
497 *
498 * <p>This method may be invoked only from within {@code
499 * ForkJoinTask} computations (as may be determined using method
500 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
501 * result in exceptions or errors, possibly including {@code
502 * ClassCastException}.
503 *
504 * @param tasks the collection of tasks
505 * @return the tasks argument, to simplify usage
506 * @throws NullPointerException if tasks or any element are null
507 */
508 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
509 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
510 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
511 return tasks;
512 }
513 @SuppressWarnings("unchecked")
514 List<? extends ForkJoinTask<?>> ts =
515 (List<? extends ForkJoinTask<?>>) tasks;
516 Throwable ex = null;
517 int last = ts.size() - 1;
518 for (int i = last; i >= 0; --i) {
519 ForkJoinTask<?> t = ts.get(i);
520 if (t == null) {
521 if (ex == null)
522 ex = new NullPointerException();
523 }
524 else if (i != 0)
525 t.fork();
526 else if (t.waitingInvoke() < NORMAL && ex == null)
527 ex = t.getException();
528 }
529 for (int i = 1; i <= last; ++i) {
530 ForkJoinTask<?> t = ts.get(i);
531 if (t != null) {
532 if (ex != null)
533 t.cancel(false);
534 else if (t.waitingJoin() < NORMAL && ex == null)
535 ex = t.getException();
536 }
537 }
538 if (ex != null)
539 UNSAFE.throwException(ex);
540 return tasks;
541 }
542
543 /**
544 * Attempts to cancel execution of this task. This attempt will
545 * fail if the task has already completed, has already been
546 * cancelled, or could not be cancelled for some other reason. If
547 * successful, and this task has not started when cancel is
548 * called, execution of this task is suppressed, {@link
549 * #isCancelled} will report true, and {@link #join} will result
550 * in a {@code CancellationException} being thrown.
551 *
552 * <p>This method may be overridden in subclasses, but if so, must
553 * still ensure that these minimal properties hold. In particular,
554 * the {@code cancel} method itself must not throw exceptions.
555 *
556 * <p>This method is designed to be invoked by <em>other</em>
557 * tasks. To terminate the current task, you can just return or
558 * throw an unchecked exception from its computation method, or
559 * invoke {@link #completeExceptionally}.
560 *
561 * @param mayInterruptIfRunning this value is ignored in the
562 * default implementation because tasks are not
563 * cancelled via interruption
564 *
565 * @return {@code true} if this task is now cancelled
566 */
567 public boolean cancel(boolean mayInterruptIfRunning) {
568 setCompletion(CANCELLED);
569 return (status & COMPLETION_MASK) == CANCELLED;
570 }
571
572 /**
573 * Cancels, ignoring any exceptions it throws. Used during worker
574 * and pool shutdown.
575 */
576 final void cancelIgnoringExceptions() {
577 try {
578 cancel(false);
579 } catch (Throwable ignore) {
580 }
581 }
582
583 /**
584 * Cancels ignoring exceptions if worker is terminating
585 */
586 private void cancelIfTerminating() {
587 Thread t = Thread.currentThread();
588 if ((t instanceof ForkJoinWorkerThread) &&
589 ((ForkJoinWorkerThread) t).isTerminating()) {
590 try {
591 cancel(false);
592 } catch (Throwable ignore) {
593 }
594 }
595 }
596
597 public final boolean isDone() {
598 return status < 0;
599 }
600
601 public final boolean isCancelled() {
602 return (status & COMPLETION_MASK) == CANCELLED;
603 }
604
605 /**
606 * Returns {@code true} if this task threw an exception or was cancelled.
607 *
608 * @return {@code true} if this task threw an exception or was cancelled
609 */
610 public final boolean isCompletedAbnormally() {
611 return (status & COMPLETION_MASK) < NORMAL;
612 }
613
614 /**
615 * Returns {@code true} if this task completed without throwing an
616 * exception and was not cancelled.
617 *
618 * @return {@code true} if this task completed without throwing an
619 * exception and was not cancelled
620 */
621 public final boolean isCompletedNormally() {
622 return (status & COMPLETION_MASK) == NORMAL;
623 }
624
625 /**
626 * Returns the exception thrown by the base computation, or a
627 * {@code CancellationException} if cancelled, or {@code null} if
628 * none or if the method has not yet completed.
629 *
630 * @return the exception, or {@code null} if none
631 */
632 public final Throwable getException() {
633 int s = status & COMPLETION_MASK;
634 return ((s >= NORMAL) ? null :
635 (s == CANCELLED) ? new CancellationException() :
636 exceptionMap.get(this));
637 }
638
639 /**
640 * Completes this task abnormally, and if not already aborted or
641 * cancelled, causes it to throw the given exception upon
642 * {@code join} and related operations. This method may be used
643 * to induce exceptions in asynchronous tasks, or to force
644 * completion of tasks that would not otherwise complete. Its use
645 * in other situations is discouraged. This method is
646 * overridable, but overridden versions must invoke {@code super}
647 * implementation to maintain guarantees.
648 *
649 * @param ex the exception to throw. If this exception is not a
650 * {@code RuntimeException} or {@code Error}, the actual exception
651 * thrown will be a {@code RuntimeException} with cause {@code ex}.
652 */
653 public void completeExceptionally(Throwable ex) {
654 setExceptionalCompletion((ex instanceof RuntimeException) ||
655 (ex instanceof Error) ? ex :
656 new RuntimeException(ex));
657 }
658
659 /**
660 * Completes this task, and if not already aborted or cancelled,
661 * returning a {@code null} result upon {@code join} and related
662 * operations. This method may be used to provide results for
663 * asynchronous tasks, or to provide alternative handling for
664 * tasks that would not otherwise complete normally. Its use in
665 * other situations is discouraged. This method is
666 * overridable, but overridden versions must invoke {@code super}
667 * implementation to maintain guarantees.
668 *
669 * @param value the result value for this task
670 */
671 public void complete(V value) {
672 try {
673 setRawResult(value);
674 } catch (Throwable rex) {
675 setExceptionalCompletion(rex);
676 return;
677 }
678 setCompletion(NORMAL);
679 }
680
681 public final V get() throws InterruptedException, ExecutionException {
682 int s = waitingJoin() & COMPLETION_MASK;
683 if (Thread.interrupted())
684 throw new InterruptedException();
685 if (s < NORMAL) {
686 Throwable ex;
687 if (s == CANCELLED)
688 throw new CancellationException();
689 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
690 throw new ExecutionException(ex);
691 }
692 return getRawResult();
693 }
694
695 public final V get(long timeout, TimeUnit unit)
696 throws InterruptedException, ExecutionException, TimeoutException {
697 Thread t = Thread.currentThread();
698 ForkJoinPool pool;
699 if (t instanceof ForkJoinWorkerThread) {
700 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
701 if (status >= 0 && w.unpushTask(this))
702 tryExec();
703 pool = w.pool;
704 }
705 else
706 pool = null;
707 /*
708 * Timed wait loop intermixes cases for fj (pool != null) and
709 * non FJ threads. For FJ, decrement pool count but don't try
710 * for replacement; increment count on completion. For non-FJ,
711 * deal with interrupts. This is messy, but a little less so
712 * than is splitting the FJ and nonFJ cases.
713 */
714 boolean interrupted = false;
715 boolean dec = false; // true if pool count decremented
716 for (;;) {
717 if (Thread.interrupted() && pool == null) {
718 interrupted = true;
719 break;
720 }
721 int s = status;
722 if (s < 0)
723 break;
724 if (UNSAFE.compareAndSwapInt(this, statusOffset,
725 s, s | SIGNAL)) {
726 long startTime = System.nanoTime();
727 long nanos = unit.toNanos(timeout);
728 long nt; // wait time
729 while (status >= 0 &&
730 (nt = nanos - (System.nanoTime() - startTime)) > 0) {
731 if (pool != null && !dec)
732 dec = pool.tryDecrementRunningCount();
733 else {
734 long ms = nt / 1000000;
735 int ns = (int) (nt % 1000000);
736 try {
737 synchronized(this) {
738 if (status >= 0)
739 wait(ms, ns);
740 }
741 } catch (InterruptedException ie) {
742 if (pool != null)
743 cancelIfTerminating();
744 else {
745 interrupted = true;
746 break;
747 }
748 }
749 }
750 }
751 break;
752 }
753 }
754 if (pool != null && dec)
755 pool.updateRunningCount(1);
756 if (interrupted)
757 throw new InterruptedException();
758 int es = status & COMPLETION_MASK;
759 if (es != NORMAL) {
760 Throwable ex;
761 if (es == CANCELLED)
762 throw new CancellationException();
763 if (es == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
764 throw new ExecutionException(ex);
765 throw new TimeoutException();
766 }
767 return getRawResult();
768 }
769
770 /**
771 * Possibly executes other tasks until this task {@link #isDone is
772 * done}, then returns the result of the computation. This method
773 * may be more efficient than {@code join}, but is only applicable
774 * when there are no potential dependencies between continuation
775 * of the current task and that of any other task that might be
776 * executed while helping. (This usually holds for pure
777 * divide-and-conquer tasks).
778 *
779 * <p>This method may be invoked only from within {@code
780 * ForkJoinTask} computations (as may be determined using method
781 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
782 * result in exceptions or errors, possibly including {@code
783 * ClassCastException}.
784 *
785 * @return the computed result
786 */
787 public final V helpJoin() {
788 return reportResult(busyJoin());
789 }
790
791 /**
792 * Possibly executes other tasks until this task {@link #isDone is
793 * done}. This method may be useful when processing collections
794 * of tasks when some have been cancelled or otherwise known to
795 * have aborted.
796 *
797 * <p>This method may be invoked only from within {@code
798 * ForkJoinTask} computations (as may be determined using method
799 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
800 * result in exceptions or errors, possibly including {@code
801 * ClassCastException}.
802 */
803 public final void quietlyHelpJoin() {
804 busyJoin();
805 }
806
807 /**
808 * Joins this task, without returning its result or throwing an
809 * exception. This method may be useful when processing
810 * collections of tasks when some have been cancelled or otherwise
811 * known to have aborted.
812 */
813 public final void quietlyJoin() {
814 waitingJoin();
815 }
816
817 /**
818 * Commences performing this task and awaits its completion if
819 * necessary, without returning its result or throwing an
820 * exception. This method may be useful when processing
821 * collections of tasks when some have been cancelled or otherwise
822 * known to have aborted.
823 */
824 public final void quietlyInvoke() {
825 waitingInvoke();
826 }
827
828 /**
829 * Possibly executes tasks until the pool hosting the current task
830 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
831 * be of use in designs in which many tasks are forked, but none
832 * are explicitly joined, instead executing them until all are
833 * processed.
834 *
835 * <p>This method may be invoked only from within {@code
836 * ForkJoinTask} computations (as may be determined using method
837 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
838 * result in exceptions or errors, possibly including {@code
839 * ClassCastException}.
840 */
841 public static void helpQuiesce() {
842 ((ForkJoinWorkerThread) Thread.currentThread())
843 .helpQuiescePool();
844 }
845
846 /**
847 * Resets the internal bookkeeping state of this task, allowing a
848 * subsequent {@code fork}. This method allows repeated reuse of
849 * this task, but only if reuse occurs when this task has either
850 * never been forked, or has been forked, then completed and all
851 * outstanding joins of this task have also completed. Effects
852 * under any other usage conditions are not guaranteed.
853 * This method may be useful when executing
854 * pre-constructed trees of subtasks in loops.
855 */
856 public void reinitialize() {
857 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
858 exceptionMap.remove(this);
859 status = 0;
860 }
861
862 /**
863 * Returns the pool hosting the current task execution, or null
864 * if this task is executing outside of any ForkJoinPool.
865 *
866 * @see #inForkJoinPool
867 * @return the pool, or {@code null} if none
868 */
869 public static ForkJoinPool getPool() {
870 Thread t = Thread.currentThread();
871 return (t instanceof ForkJoinWorkerThread) ?
872 ((ForkJoinWorkerThread) t).pool : null;
873 }
874
875 /**
876 * Returns {@code true} if the current thread is executing as a
877 * ForkJoinPool computation.
878 *
879 * @return {@code true} if the current thread is executing as a
880 * ForkJoinPool computation, or false otherwise
881 */
882 public static boolean inForkJoinPool() {
883 return Thread.currentThread() instanceof ForkJoinWorkerThread;
884 }
885
886 /**
887 * Tries to unschedule this task for execution. This method will
888 * typically succeed if this task is the most recently forked task
889 * by the current thread, and has not commenced executing in
890 * another thread. This method may be useful when arranging
891 * alternative local processing of tasks that could have been, but
892 * were not, stolen.
893 *
894 * <p>This method may be invoked only from within {@code
895 * ForkJoinTask} computations (as may be determined using method
896 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
897 * result in exceptions or errors, possibly including {@code
898 * ClassCastException}.
899 *
900 * @return {@code true} if unforked
901 */
902 public boolean tryUnfork() {
903 return ((ForkJoinWorkerThread) Thread.currentThread())
904 .unpushTask(this);
905 }
906
907 /**
908 * Returns an estimate of the number of tasks that have been
909 * forked by the current worker thread but not yet executed. This
910 * value may be useful for heuristic decisions about whether to
911 * fork other tasks.
912 *
913 * <p>This method may be invoked only from within {@code
914 * ForkJoinTask} computations (as may be determined using method
915 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
916 * result in exceptions or errors, possibly including {@code
917 * ClassCastException}.
918 *
919 * @return the number of tasks
920 */
921 public static int getQueuedTaskCount() {
922 return ((ForkJoinWorkerThread) Thread.currentThread())
923 .getQueueSize();
924 }
925
926 /**
927 * Returns an estimate of how many more locally queued tasks are
928 * held by the current worker thread than there are other worker
929 * threads that might steal them. This value may be useful for
930 * heuristic decisions about whether to fork other tasks. In many
931 * usages of ForkJoinTasks, at steady state, each worker should
932 * aim to maintain a small constant surplus (for example, 3) of
933 * tasks, and to process computations locally if this threshold is
934 * exceeded.
935 *
936 * <p>This method may be invoked only from within {@code
937 * ForkJoinTask} computations (as may be determined using method
938 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
939 * result in exceptions or errors, possibly including {@code
940 * ClassCastException}.
941 *
942 * @return the surplus number of tasks, which may be negative
943 */
944 public static int getSurplusQueuedTaskCount() {
945 return ((ForkJoinWorkerThread) Thread.currentThread())
946 .getEstimatedSurplusTaskCount();
947 }
948
949 // Extension methods
950
951 /**
952 * Returns the result that would be returned by {@link #join}, even
953 * if this task completed abnormally, or {@code null} if this task
954 * is not known to have been completed. This method is designed
955 * to aid debugging, as well as to support extensions. Its use in
956 * any other context is discouraged.
957 *
958 * @return the result, or {@code null} if not completed
959 */
960 public abstract V getRawResult();
961
962 /**
963 * Forces the given value to be returned as a result. This method
964 * is designed to support extensions, and should not in general be
965 * called otherwise.
966 *
967 * @param value the value
968 */
969 protected abstract void setRawResult(V value);
970
971 /**
972 * Immediately performs the base action of this task. This method
973 * is designed to support extensions, and should not in general be
974 * called otherwise. The return value controls whether this task
975 * is considered to be done normally. It may return false in
976 * asynchronous actions that require explicit invocations of
977 * {@link #complete} to become joinable. It may also throw an
978 * (unchecked) exception to indicate abnormal exit.
979 *
980 * @return {@code true} if completed normally
981 */
982 protected abstract boolean exec();
983
984 /**
985 * Returns, but does not unschedule or execute, a task queued by
986 * the current thread but not yet executed, if one is immediately
987 * available. There is no guarantee that this task will actually
988 * be polled or executed next. Conversely, this method may return
989 * null even if a task exists but cannot be accessed without
990 * contention with other threads. This method is designed
991 * primarily to support extensions, and is unlikely to be useful
992 * otherwise.
993 *
994 * <p>This method may be invoked only from within {@code
995 * ForkJoinTask} computations (as may be determined using method
996 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
997 * result in exceptions or errors, possibly including {@code
998 * ClassCastException}.
999 *
1000 * @return the next task, or {@code null} if none are available
1001 */
1002 protected static ForkJoinTask<?> peekNextLocalTask() {
1003 return ((ForkJoinWorkerThread) Thread.currentThread())
1004 .peekTask();
1005 }
1006
1007 /**
1008 * Unschedules and returns, without executing, the next task
1009 * queued by the current thread but not yet executed. This method
1010 * is designed primarily to support extensions, and is unlikely to
1011 * be useful otherwise.
1012 *
1013 * <p>This method may be invoked only from within {@code
1014 * ForkJoinTask} computations (as may be determined using method
1015 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1016 * result in exceptions or errors, possibly including {@code
1017 * ClassCastException}.
1018 *
1019 * @return the next task, or {@code null} if none are available
1020 */
1021 protected static ForkJoinTask<?> pollNextLocalTask() {
1022 return ((ForkJoinWorkerThread) Thread.currentThread())
1023 .pollLocalTask();
1024 }
1025
1026 /**
1027 * Unschedules and returns, without executing, the next task
1028 * queued by the current thread but not yet executed, if one is
1029 * available, or if not available, a task that was forked by some
1030 * other thread, if available. Availability may be transient, so a
1031 * {@code null} result does not necessarily imply quiescence
1032 * of the pool this task is operating in. This method is designed
1033 * primarily to support extensions, and is unlikely to be useful
1034 * otherwise.
1035 *
1036 * <p>This method may be invoked only from within {@code
1037 * ForkJoinTask} computations (as may be determined using method
1038 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1039 * result in exceptions or errors, possibly including {@code
1040 * ClassCastException}.
1041 *
1042 * @return a task, or {@code null} if none are available
1043 */
1044 protected static ForkJoinTask<?> pollTask() {
1045 return ((ForkJoinWorkerThread) Thread.currentThread())
1046 .pollTask();
1047 }
1048
1049 /**
1050 * Adaptor for Runnables. This implements RunnableFuture
1051 * to be compliant with AbstractExecutorService constraints
1052 * when used in ForkJoinPool.
1053 */
1054 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1055 implements RunnableFuture<T> {
1056 final Runnable runnable;
1057 final T resultOnCompletion;
1058 T result;
1059 AdaptedRunnable(Runnable runnable, T result) {
1060 if (runnable == null) throw new NullPointerException();
1061 this.runnable = runnable;
1062 this.resultOnCompletion = result;
1063 }
1064 public T getRawResult() { return result; }
1065 public void setRawResult(T v) { result = v; }
1066 public boolean exec() {
1067 runnable.run();
1068 result = resultOnCompletion;
1069 return true;
1070 }
1071 public void run() { invoke(); }
1072 private static final long serialVersionUID = 5232453952276885070L;
1073 }
1074
1075 /**
1076 * Adaptor for Callables
1077 */
1078 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1079 implements RunnableFuture<T> {
1080 final Callable<? extends T> callable;
1081 T result;
1082 AdaptedCallable(Callable<? extends T> callable) {
1083 if (callable == null) throw new NullPointerException();
1084 this.callable = callable;
1085 }
1086 public T getRawResult() { return result; }
1087 public void setRawResult(T v) { result = v; }
1088 public boolean exec() {
1089 try {
1090 result = callable.call();
1091 return true;
1092 } catch (Error err) {
1093 throw err;
1094 } catch (RuntimeException rex) {
1095 throw rex;
1096 } catch (Exception ex) {
1097 throw new RuntimeException(ex);
1098 }
1099 }
1100 public void run() { invoke(); }
1101 private static final long serialVersionUID = 2838392045355241008L;
1102 }
1103
1104 /**
1105 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1106 * method of the given {@code Runnable} as its action, and returns
1107 * a null result upon {@link #join}.
1108 *
1109 * @param runnable the runnable action
1110 * @return the task
1111 */
1112 public static ForkJoinTask<?> adapt(Runnable runnable) {
1113 return new AdaptedRunnable<Void>(runnable, null);
1114 }
1115
1116 /**
1117 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1118 * method of the given {@code Runnable} as its action, and returns
1119 * the given result upon {@link #join}.
1120 *
1121 * @param runnable the runnable action
1122 * @param result the result upon completion
1123 * @return the task
1124 */
1125 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1126 return new AdaptedRunnable<T>(runnable, result);
1127 }
1128
1129 /**
1130 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1131 * method of the given {@code Callable} as its action, and returns
1132 * its result upon {@link #join}, translating any checked exceptions
1133 * encountered into {@code RuntimeException}.
1134 *
1135 * @param callable the callable action
1136 * @return the task
1137 */
1138 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1139 return new AdaptedCallable<T>(callable);
1140 }
1141
1142 // Serialization support
1143
1144 private static final long serialVersionUID = -7721805057305804111L;
1145
1146 /**
1147 * Saves the state to a stream.
1148 *
1149 * @serialData the current run status and the exception thrown
1150 * during execution, or {@code null} if none
1151 * @param s the stream
1152 */
1153 private void writeObject(java.io.ObjectOutputStream s)
1154 throws java.io.IOException {
1155 s.defaultWriteObject();
1156 s.writeObject(getException());
1157 }
1158
1159 /**
1160 * Reconstitutes the instance from a stream.
1161 *
1162 * @param s the stream
1163 */
1164 private void readObject(java.io.ObjectInputStream s)
1165 throws java.io.IOException, ClassNotFoundException {
1166 s.defaultReadObject();
1167 status |= SIGNAL; // conservatively set external signal
1168 Object ex = s.readObject();
1169 if (ex != null)
1170 setExceptionalCompletion((Throwable) ex);
1171 }
1172
1173 // Unsafe mechanics
1174
1175 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1176 private static final long statusOffset =
1177 objectFieldOffset("status", ForkJoinTask.class);
1178
1179 private static long objectFieldOffset(String field, Class<?> klazz) {
1180 try {
1181 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1182 } catch (NoSuchFieldException e) {
1183 // Convert Exception to corresponding Error
1184 NoSuchFieldError error = new NoSuchFieldError(field);
1185 error.initCause(e);
1186 throw error;
1187 }
1188 }
1189 }