ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
Revision: 1.14
Committed: Sun Apr 18 12:54:57 2010 UTC (14 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.13: +38 -21 lines
Log Message:
Sync with jsr166y versions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.io.Serializable;
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.RandomAccess;
14 import java.util.Map;
15 import java.util.WeakHashMap;
16
17 /**
18 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
19 * A {@code ForkJoinTask} is a thread-like entity that is much
20 * lighter weight than a normal thread. Huge numbers of tasks and
21 * subtasks may be hosted by a small number of actual threads in a
22 * ForkJoinPool, at the price of some usage limitations.
23 *
24 * <p>A "main" {@code ForkJoinTask} begins execution when submitted
25 * to a {@link ForkJoinPool}. Once started, it will usually in turn
26 * start other subtasks. As indicated by the name of this class,
27 * many programs using {@code ForkJoinTask} employ only methods
28 * {@link #fork} and {@link #join}, or derivatives such as {@link
29 * #invokeAll}. However, this class also provides a number of other
30 * methods that can come into play in advanced usages, as well as
31 * extension mechanics that allow support of new forms of fork/join
32 * processing.
33 *
34 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
35 * The efficiency of {@code ForkJoinTask}s stems from a set of
36 * restrictions (that are only partially statically enforceable)
37 * reflecting their intended use as computational tasks calculating
38 * pure functions or operating on purely isolated objects. The
39 * primary coordination mechanisms are {@link #fork}, that arranges
40 * asynchronous execution, and {@link #join}, that doesn't proceed
41 * until the task's result has been computed. Computations should
42 * avoid {@code synchronized} methods or blocks, and should minimize
43 * other blocking synchronization apart from joining other tasks or
44 * using synchronizers such as Phasers that are advertised to
45 * cooperate with fork/join scheduling. Tasks should also not perform
46 * blocking IO, and should ideally access variables that are
47 * completely independent of those accessed by other running
48 * tasks. Minor breaches of these restrictions, for example using
49 * shared output streams, may be tolerable in practice, but frequent
50 * use may result in poor performance, and the potential to
51 * indefinitely stall if the number of threads not waiting for IO or
52 * other external synchronization becomes exhausted. This usage
53 * restriction is in part enforced by not permitting checked
54 * exceptions such as {@code IOExceptions} to be thrown. However,
55 * computations may still encounter unchecked exceptions, that are
56 * rethrown to callers attempting to join them. These exceptions may
57 * additionally include {@link RejectedExecutionException} stemming
58 * from internal resource exhaustion, such as failure to allocate
59 * internal task queues.
60 *
61 * <p>The primary method for awaiting completion and extracting
62 * results of a task is {@link #join}, but there are several variants:
63 * The {@link Future#get} methods support interruptible and/or timed
64 * waits for completion and report results using {@code Future}
65 * conventions. Method {@link #helpJoin} enables callers to actively
66 * execute other tasks while awaiting joins, which is sometimes more
67 * efficient but only applies when all subtasks are known to be
68 * strictly tree-structured. Method {@link #invoke} is semantically
69 * equivalent to {@code fork(); join()} but always attempts to begin
70 * execution in the current thread. The "<em>quiet</em>" forms of
71 * these methods do not extract results or report exceptions. These
72 * may be useful when a set of tasks are being executed, and you need
73 * to delay processing of results or exceptions until all complete.
74 * Method {@code invokeAll} (available in multiple versions)
75 * performs the most common form of parallel invocation: forking a set
76 * of tasks and joining them all.
77 *
78 * <p>The execution status of tasks may be queried at several levels
79 * of detail: {@link #isDone} is true if a task completed in any way
80 * (including the case where a task was cancelled without executing);
81 * {@link #isCompletedNormally} is true if a task completed without
82 * cancellation or encountering an exception; {@link #isCancelled} is
83 * true if the task was cancelled (in which case {@link #getException}
84 * returns a {@link java.util.concurrent.CancellationException}); and
85 * {@link #isCompletedAbnormally} is true if a task was either
86 * cancelled or encountered an exception, in which case {@link
87 * #getException} will return either the encountered exception or
88 * {@link java.util.concurrent.CancellationException}.
89 *
90 * <p>The ForkJoinTask class is not usually directly subclassed.
91 * Instead, you subclass one of the abstract classes that support a
92 * particular style of fork/join processing, typically {@link
93 * RecursiveAction} for computations that do not return results, or
94 * {@link RecursiveTask} for those that do. Normally, a concrete
95 * ForkJoinTask subclass declares fields comprising its parameters,
96 * established in a constructor, and then defines a {@code compute}
97 * method that somehow uses the control methods supplied by this base
98 * class. While these methods have {@code public} access (to allow
99 * instances of different task subclasses to call each other's
100 * methods), some of them may only be called from within other
101 * ForkJoinTasks (as may be determined using method {@link
102 * #inForkJoinPool}). Attempts to invoke them in other contexts
103 * result in exceptions or errors, possibly including
104 * ClassCastException.
105 *
106 * <p>Most base support methods are {@code final}, to prevent
107 * overriding of implementations that are intrinsically tied to the
108 * underlying lightweight task scheduling framework. Developers
109 * creating new basic styles of fork/join processing should minimally
110 * implement {@code protected} methods {@link #exec}, {@link
111 * #setRawResult}, and {@link #getRawResult}, while also introducing
112 * an abstract computational method that can be implemented in its
113 * subclasses, possibly relying on other {@code protected} methods
114 * provided by this class.
115 *
116 * <p>ForkJoinTasks should perform relatively small amounts of
117 * computation. Large tasks should be split into smaller subtasks,
118 * usually via recursive decomposition. As a very rough rule of thumb,
119 * a task should perform more than 100 and less than 10000 basic
120 * computational steps. If tasks are too big, then parallelism cannot
121 * improve throughput. If too small, then memory and internal task
122 * maintenance overhead may overwhelm processing.
123 *
124 * <p>This class provides {@code adapt} methods for {@link Runnable}
125 * and {@link Callable}, that may be of use when mixing execution of
126 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks
127 * are of this form, consider using a pool in
128 * {@linkplain ForkJoinPool#setAsyncMode async mode}.
129 *
130 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
131 * used in extensions such as remote execution frameworks. It is
132 * sensible to serialize tasks only before or after, but not during,
133 * execution. Serialization is not relied on during execution itself.
134 *
135 * @since 1.7
136 * @author Doug Lea
137 */
138 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
139
140 /*
141 * See the internal documentation of class ForkJoinPool for a
142 * general implementation overview. ForkJoinTasks are mainly
143 * responsible for maintaining their "status" field amidst relays
144 * to methods in ForkJoinWorkerThread and ForkJoinPool. The
145 * methods of this class are more-or-less layered into (1) basic
146 * status maintenance (2) execution and awaiting completion (3)
147 * user-level methods that additionally report results. This is
148 * sometimes hard to see because this file orders exported methods
149 * in a way that flows well in javadocs.
150 */
151
152 /**
153 * Run control status bits packed into a single int to minimize
154 * footprint and to ensure atomicity (via CAS). Status is
155 * initially zero, and takes on nonnegative values until
156 * completed, upon which status holds COMPLETED. CANCELLED, or
157 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
158 * blocking waits by other threads have SIGNAL_MASK bits set --
159 * bit 15 for external (nonFJ) waits, and the rest a count of
160 * waiting FJ threads. (This representation relies on
161 * ForkJoinPool max thread limits). Signal counts are not directly
162 * incremented by ForkJoinTask methods, but instead via a call to
163 * requestSignal within ForkJoinPool.preJoin, once their need is
164 * established.
165 *
166 * Completion of a stolen task with SIGNAL_MASK bits set awakens
167 * any waiters via notifyAll. Even though suboptimal for some
168 * purposes, we use basic builtin wait/notify to take advantage of
169 * "monitor inflation" in JVMs that we would otherwise need to
170 * emulate to avoid adding further per-task bookkeeping overhead.
171 * We want these monitors to be "fat", i.e., not use biasing or
172 * thin-lock techniques, so use some odd coding idioms that tend
173 * to avoid them.
174 *
175 * Note that bits 16-28 are currently unused. Also value
176 * 0x80000000 is available as spare completion value.
177 */
178 volatile int status; // accessed directly by pool and workers
179
180 private static final int COMPLETION_MASK = 0xe0000000;
181 private static final int NORMAL = 0xe0000000; // == mask
182 private static final int CANCELLED = 0xc0000000;
183 private static final int EXCEPTIONAL = 0xa0000000;
184 private static final int SIGNAL_MASK = 0x0000ffff;
185 private static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
186 private static final int EXTERNAL_SIGNAL = 0x00008000;
187
188 /**
189 * Table of exceptions thrown by tasks, to enable reporting by
190 * callers. Because exceptions are rare, we don't directly keep
191 * them with task objects, but instead use a weak ref table. Note
192 * that cancellation exceptions don't appear in the table, but are
193 * instead recorded as status values.
194 * TODO: Use ConcurrentReferenceHashMap
195 */
196 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
197 Collections.synchronizedMap
198 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
199
200 // Maintaining completion status
201
202 /**
203 * Marks completion and wakes up threads waiting to join this task,
204 * also clearing signal request bits.
205 *
206 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
207 */
208 private void setCompletion(int completion) {
209 int s;
210 while ((s = status) >= 0) {
211 if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
212 if ((s & SIGNAL_MASK) != 0) {
213 Thread t = Thread.currentThread();
214 if (t instanceof ForkJoinWorkerThread)
215 ((ForkJoinWorkerThread) t).pool.updateRunningCount
216 (s & INTERNAL_SIGNAL_MASK);
217 synchronized (this) { notifyAll(); }
218 }
219 return;
220 }
221 }
222 }
223
224 /**
225 * Record exception and set exceptional completion
226 */
227 private void setDoneExceptionally(Throwable rex) {
228 exceptionMap.put(this, rex);
229 setCompletion(EXCEPTIONAL);
230 }
231
232 /**
233 * Main internal execution method: Unless done, calls exec and
234 * records completion.
235 *
236 * @return true if ran and completed normally
237 */
238 final boolean tryExec() {
239 try {
240 if (status < 0 || !exec())
241 return false;
242 } catch (Throwable rex) {
243 setDoneExceptionally(rex);
244 return false;
245 }
246 setCompletion(NORMAL); // must be outside try block
247 return true;
248 }
249
250 /**
251 * Increments internal signal count (thus requesting signal upon
252 * completion) unless already done. Call only once per join.
253 * Used by ForkJoinPool.preJoin.
254 *
255 * @return status
256 */
257 final int requestSignal() {
258 int s;
259 do {} while ((s = status) >= 0 &&
260 !UNSAFE.compareAndSwapInt(this, statusOffset, s, s + 1));
261 return s;
262 }
263
264 /**
265 * Sets external signal request unless already done.
266 *
267 * @return status
268 */
269 private int requestExternalSignal() {
270 int s;
271 do {} while ((s = status) >= 0 &&
272 !UNSAFE.compareAndSwapInt(this, statusOffset,
273 s, s | EXTERNAL_SIGNAL));
274 return s;
275 }
276
277 /*
278 * Awaiting completion. The four versions, internal vs external X
279 * untimed vs timed, have the same overall structure but differ
280 * from each other enough to defy simple integration.
281 */
282
283 /**
284 * Blocks a worker until this task is done, also maintaining pool
285 * and signal counts
286 */
287 private void awaitDone(ForkJoinWorkerThread w) {
288 if (status >= 0) {
289 w.pool.preJoin(this);
290 while (status >= 0) {
291 try { // minimize lock scope
292 synchronized(this) {
293 if (status >= 0)
294 wait();
295 else { // help release; also helps avoid lock-biasing
296 notifyAll();
297 break;
298 }
299 }
300 } catch (InterruptedException ie) {
301 cancelIfTerminating();
302 }
303 }
304 }
305 }
306
307 /**
308 * Blocks a non-ForkJoin thread until this task is done.
309 */
310 private void externalAwaitDone() {
311 if (requestExternalSignal() >= 0) {
312 boolean interrupted = false;
313 while (status >= 0) {
314 try {
315 synchronized(this) {
316 if (status >= 0)
317 wait();
318 else {
319 notifyAll();
320 break;
321 }
322 }
323 } catch (InterruptedException ie) {
324 interrupted = true;
325 }
326 }
327 if (interrupted)
328 Thread.currentThread().interrupt();
329 }
330 }
331
332 /**
333 * Blocks a worker until this task is done or timeout elapses
334 */
335 private void timedAwaitDone(ForkJoinWorkerThread w, long nanos) {
336 if (status >= 0) {
337 long startTime = System.nanoTime();
338 ForkJoinPool pool = w.pool;
339 pool.preJoin(this);
340 while (status >= 0) {
341 long nt = nanos - (System.nanoTime() - startTime);
342 if (nt > 0) {
343 long ms = nt / 1000000;
344 int ns = (int) (nt % 1000000);
345 try {
346 synchronized(this) { if (status >= 0) wait(ms, ns); }
347 } catch (InterruptedException ie) {
348 cancelIfTerminating();
349 }
350 }
351 else {
352 int s; // adjust running count on timeout
353 while ((s = status) >= 0 &&
354 (s & INTERNAL_SIGNAL_MASK) != 0) {
355 if (UNSAFE.compareAndSwapInt(this, statusOffset,
356 s, s - 1)) {
357 pool.updateRunningCount(1);
358 break;
359 }
360 }
361 break;
362 }
363 }
364 }
365 }
366
367 /**
368 * Blocks a non-ForkJoin thread until this task is done or timeout elapses
369 */
370 private void externalTimedAwaitDone(long nanos) {
371 if (requestExternalSignal() >= 0) {
372 long startTime = System.nanoTime();
373 boolean interrupted = false;
374 while (status >= 0) {
375 long nt = nanos - (System.nanoTime() - startTime);
376 if (nt <= 0)
377 break;
378 long ms = nt / 1000000;
379 int ns = (int) (nt % 1000000);
380 try {
381 synchronized(this) { if (status >= 0) wait(ms, ns); }
382 } catch (InterruptedException ie) {
383 interrupted = true;
384 }
385 }
386 if (interrupted)
387 Thread.currentThread().interrupt();
388 }
389 }
390
391 // reporting results
392
393 /**
394 * Returns result or throws the exception associated with status.
395 * Uses Unsafe as a workaround for javac not allowing rethrow of
396 * unchecked exceptions.
397 */
398 private V reportResult() {
399 if ((status & COMPLETION_MASK) < NORMAL) {
400 Throwable ex = getException();
401 if (ex != null)
402 UNSAFE.throwException(ex);
403 }
404 return getRawResult();
405 }
406
407 /**
408 * Returns result or throws exception using j.u.c.Future conventions.
409 * Only call when {@code isDone} known to be true or thread known
410 * to be interrupted.
411 */
412 private V reportFutureResult()
413 throws InterruptedException, ExecutionException {
414 if (Thread.interrupted())
415 throw new InterruptedException();
416 int s = status & COMPLETION_MASK;
417 if (s < NORMAL) {
418 Throwable ex;
419 if (s == CANCELLED)
420 throw new CancellationException();
421 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
422 throw new ExecutionException(ex);
423 }
424 return getRawResult();
425 }
426
427 /**
428 * Returns result or throws exception using j.u.c.Future conventions
429 * with timeouts.
430 */
431 private V reportTimedFutureResult()
432 throws InterruptedException, ExecutionException, TimeoutException {
433 if (Thread.interrupted())
434 throw new InterruptedException();
435 Throwable ex;
436 int s = status & COMPLETION_MASK;
437 if (s == NORMAL)
438 return getRawResult();
439 else if (s == CANCELLED)
440 throw new CancellationException();
441 else if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
442 throw new ExecutionException(ex);
443 else
444 throw new TimeoutException();
445 }
446
447 // public methods
448
449 /**
450 * Arranges to asynchronously execute this task. While it is not
451 * necessarily enforced, it is a usage error to fork a task more
452 * than once unless it has completed and been reinitialized.
453 * Subsequent modifications to the state of this task or any data
454 * it operates on are not necessarily consistently observable by
455 * any thread other than the one executing it unless preceded by a
456 * call to {@link #join} or related methods, or a call to {@link
457 * #isDone} returning {@code true}.
458 *
459 * <p>This method may be invoked only from within {@code
460 * ForkJoinTask} computations (as may be determined using method
461 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
462 * result in exceptions or errors, possibly including {@code
463 * ClassCastException}.
464 *
465 * @return {@code this}, to simplify usage
466 */
467 public final ForkJoinTask<V> fork() {
468 ((ForkJoinWorkerThread) Thread.currentThread())
469 .pushTask(this);
470 return this;
471 }
472
473 /**
474 * Returns the result of the computation when it {@link #isDone is done}.
475 * This method differs from {@link #get()} in that
476 * abnormal completion results in {@code RuntimeException} or
477 * {@code Error}, not {@code ExecutionException}.
478 *
479 * @return the computed result
480 */
481 public final V join() {
482 quietlyJoin();
483 return reportResult();
484 }
485
486 /**
487 * Commences performing this task, awaits its completion if
488 * necessary, and return its result, or throws an (unchecked)
489 * exception if the underlying computation did so.
490 *
491 * @return the computed result
492 */
493 public final V invoke() {
494 if (!tryExec())
495 quietlyJoin();
496 return reportResult();
497 }
498
499 /**
500 * Forks the given tasks, returning when {@code isDone} holds for
501 * each task or an (unchecked) exception is encountered, in which
502 * case the exception is rethrown. If either task encounters an
503 * exception, the other one may be, but is not guaranteed to be,
504 * cancelled. If both tasks throw an exception, then this method
505 * throws one of them. The individual status of each task may be
506 * checked using {@link #getException()} and related methods.
507 *
508 * <p>This method may be invoked only from within {@code
509 * ForkJoinTask} computations (as may be determined using method
510 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
511 * result in exceptions or errors, possibly including {@code
512 * ClassCastException}.
513 *
514 * @param t1 the first task
515 * @param t2 the second task
516 * @throws NullPointerException if any task is null
517 */
518 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
519 t2.fork();
520 t1.invoke();
521 t2.join();
522 }
523
524 /**
525 * Forks the given tasks, returning when {@code isDone} holds for
526 * each task or an (unchecked) exception is encountered, in which
527 * case the exception is rethrown. If any task encounters an
528 * exception, others may be, but are not guaranteed to be,
529 * cancelled. If more than one task encounters an exception, then
530 * this method throws any one of these exceptions. The individual
531 * status of each task may be checked using {@link #getException()}
532 * and related methods.
533 *
534 * <p>This method may be invoked only from within {@code
535 * ForkJoinTask} computations (as may be determined using method
536 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
537 * result in exceptions or errors, possibly including {@code
538 * ClassCastException}.
539 *
540 * @param tasks the tasks
541 * @throws NullPointerException if any task is null
542 */
543 public static void invokeAll(ForkJoinTask<?>... tasks) {
544 Throwable ex = null;
545 int last = tasks.length - 1;
546 for (int i = last; i >= 0; --i) {
547 ForkJoinTask<?> t = tasks[i];
548 if (t == null) {
549 if (ex == null)
550 ex = new NullPointerException();
551 }
552 else if (i != 0)
553 t.fork();
554 else {
555 t.quietlyInvoke();
556 if (ex == null)
557 ex = t.getException();
558 }
559 }
560 for (int i = 1; i <= last; ++i) {
561 ForkJoinTask<?> t = tasks[i];
562 if (t != null) {
563 if (ex != null)
564 t.cancel(false);
565 else {
566 t.quietlyJoin();
567 if (ex == null)
568 ex = t.getException();
569 }
570 }
571 }
572 if (ex != null)
573 UNSAFE.throwException(ex);
574 }
575
576 /**
577 * Forks all tasks in the specified collection, returning when
578 * {@code isDone} holds for each task or an (unchecked) exception
579 * is encountered. If any task encounters an exception, others
580 * may be, but are not guaranteed to be, cancelled. If more than
581 * one task encounters an exception, then this method throws any
582 * one of these exceptions. The individual status of each task
583 * may be checked using {@link #getException()} and related
584 * methods. The behavior of this operation is undefined if the
585 * specified collection is modified while the operation is in
586 * progress.
587 *
588 * <p>This method may be invoked only from within {@code
589 * ForkJoinTask} computations (as may be determined using method
590 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
591 * result in exceptions or errors, possibly including {@code
592 * ClassCastException}.
593 *
594 * @param tasks the collection of tasks
595 * @return the tasks argument, to simplify usage
596 * @throws NullPointerException if tasks or any element are null
597 */
598 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
599 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
600 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
601 return tasks;
602 }
603 @SuppressWarnings("unchecked")
604 List<? extends ForkJoinTask<?>> ts =
605 (List<? extends ForkJoinTask<?>>) tasks;
606 Throwable ex = null;
607 int last = ts.size() - 1;
608 for (int i = last; i >= 0; --i) {
609 ForkJoinTask<?> t = ts.get(i);
610 if (t == null) {
611 if (ex == null)
612 ex = new NullPointerException();
613 }
614 else if (i != 0)
615 t.fork();
616 else {
617 t.quietlyInvoke();
618 if (ex == null)
619 ex = t.getException();
620 }
621 }
622 for (int i = 1; i <= last; ++i) {
623 ForkJoinTask<?> t = ts.get(i);
624 if (t != null) {
625 if (ex != null)
626 t.cancel(false);
627 else {
628 t.quietlyJoin();
629 if (ex == null)
630 ex = t.getException();
631 }
632 }
633 }
634 if (ex != null)
635 UNSAFE.throwException(ex);
636 return tasks;
637 }
638
639 /**
640 * Attempts to cancel execution of this task. This attempt will
641 * fail if the task has already completed, has already been
642 * cancelled, or could not be cancelled for some other reason. If
643 * successful, and this task has not started when cancel is
644 * called, execution of this task is suppressed, {@link
645 * #isCancelled} will report true, and {@link #join} will result
646 * in a {@code CancellationException} being thrown.
647 *
648 * <p>This method may be overridden in subclasses, but if so, must
649 * still ensure that these minimal properties hold. In particular,
650 * the {@code cancel} method itself must not throw exceptions.
651 *
652 * <p>This method is designed to be invoked by <em>other</em>
653 * tasks. To terminate the current task, you can just return or
654 * throw an unchecked exception from its computation method, or
655 * invoke {@link #completeExceptionally}.
656 *
657 * @param mayInterruptIfRunning this value is ignored in the
658 * default implementation because tasks are not
659 * cancelled via interruption
660 *
661 * @return {@code true} if this task is now cancelled
662 */
663 public boolean cancel(boolean mayInterruptIfRunning) {
664 setCompletion(CANCELLED);
665 return (status & COMPLETION_MASK) == CANCELLED;
666 }
667
668 /**
669 * Cancels, ignoring any exceptions it throws. Used during worker
670 * and pool shutdown.
671 */
672 final void cancelIgnoringExceptions() {
673 try {
674 cancel(false);
675 } catch (Throwable ignore) {
676 }
677 }
678
679 /**
680 * Cancels ignoring exceptions if worker is terminating
681 */
682 private void cancelIfTerminating() {
683 Thread t = Thread.currentThread();
684 if ((t instanceof ForkJoinWorkerThread) &&
685 ((ForkJoinWorkerThread) t).isTerminating()) {
686 try {
687 cancel(false);
688 } catch (Throwable ignore) {
689 }
690 }
691 }
692
693 public final boolean isDone() {
694 return status < 0;
695 }
696
697 public final boolean isCancelled() {
698 return (status & COMPLETION_MASK) == CANCELLED;
699 }
700
701 /**
702 * Returns {@code true} if this task threw an exception or was cancelled.
703 *
704 * @return {@code true} if this task threw an exception or was cancelled
705 */
706 public final boolean isCompletedAbnormally() {
707 return (status & COMPLETION_MASK) < NORMAL;
708 }
709
710 /**
711 * Returns {@code true} if this task completed without throwing an
712 * exception and was not cancelled.
713 *
714 * @return {@code true} if this task completed without throwing an
715 * exception and was not cancelled
716 */
717 public final boolean isCompletedNormally() {
718 return (status & COMPLETION_MASK) == NORMAL;
719 }
720
721 /**
722 * Returns the exception thrown by the base computation, or a
723 * {@code CancellationException} if cancelled, or {@code null} if
724 * none or if the method has not yet completed.
725 *
726 * @return the exception, or {@code null} if none
727 */
728 public final Throwable getException() {
729 int s = status & COMPLETION_MASK;
730 return ((s >= NORMAL) ? null :
731 (s == CANCELLED) ? new CancellationException() :
732 exceptionMap.get(this));
733 }
734
735 /**
736 * Completes this task abnormally, and if not already aborted or
737 * cancelled, causes it to throw the given exception upon
738 * {@code join} and related operations. This method may be used
739 * to induce exceptions in asynchronous tasks, or to force
740 * completion of tasks that would not otherwise complete. Its use
741 * in other situations is discouraged. This method is
742 * overridable, but overridden versions must invoke {@code super}
743 * implementation to maintain guarantees.
744 *
745 * @param ex the exception to throw. If this exception is not a
746 * {@code RuntimeException} or {@code Error}, the actual exception
747 * thrown will be a {@code RuntimeException} with cause {@code ex}.
748 */
749 public void completeExceptionally(Throwable ex) {
750 setDoneExceptionally((ex instanceof RuntimeException) ||
751 (ex instanceof Error) ? ex :
752 new RuntimeException(ex));
753 }
754
755 /**
756 * Completes this task, and if not already aborted or cancelled,
757 * returning a {@code null} result upon {@code join} and related
758 * operations. This method may be used to provide results for
759 * asynchronous tasks, or to provide alternative handling for
760 * tasks that would not otherwise complete normally. Its use in
761 * other situations is discouraged. This method is
762 * overridable, but overridden versions must invoke {@code super}
763 * implementation to maintain guarantees.
764 *
765 * @param value the result value for this task
766 */
767 public void complete(V value) {
768 try {
769 setRawResult(value);
770 } catch (Throwable rex) {
771 setDoneExceptionally(rex);
772 return;
773 }
774 setCompletion(NORMAL);
775 }
776
777 public final V get() throws InterruptedException, ExecutionException {
778 quietlyJoin();
779 return reportFutureResult();
780 }
781
782 public final V get(long timeout, TimeUnit unit)
783 throws InterruptedException, ExecutionException, TimeoutException {
784 long nanos = unit.toNanos(timeout);
785 Thread t = Thread.currentThread();
786 if (t instanceof ForkJoinWorkerThread) {
787 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
788 if (!w.unpushTask(this) || !tryExec())
789 timedAwaitDone(w, nanos);
790 }
791 else
792 externalTimedAwaitDone(nanos);
793 return reportTimedFutureResult();
794 }
795
796 /**
797 * Possibly executes other tasks until this task {@link #isDone is
798 * done}, then returns the result of the computation. This method
799 * may be more efficient than {@code join}, but is only applicable
800 * when there are no potential dependencies between continuation
801 * of the current task and that of any other task that might be
802 * executed while helping. (This usually holds for pure
803 * divide-and-conquer tasks).
804 *
805 * <p>This method may be invoked only from within {@code
806 * ForkJoinTask} computations (as may be determined using method
807 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
808 * result in exceptions or errors, possibly including {@code
809 * ClassCastException}.
810 *
811 * @return the computed result
812 */
813 public final V helpJoin() {
814 quietlyHelpJoin();
815 return reportResult();
816 }
817
818 /**
819 * Possibly executes other tasks until this task {@link #isDone is
820 * done}. This method may be useful when processing collections
821 * of tasks when some have been cancelled or otherwise known to
822 * have aborted.
823 *
824 * <p>This method may be invoked only from within {@code
825 * ForkJoinTask} computations (as may be determined using method
826 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
827 * result in exceptions or errors, possibly including {@code
828 * ClassCastException}.
829 */
830 public final void quietlyHelpJoin() {
831 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
832 if (!w.unpushTask(this) || !tryExec()) {
833 for (;;) {
834 ForkJoinTask<?> t;
835 if (status < 0)
836 return;
837 else if ((t = w.scanWhileJoining(this)) != null)
838 t.tryExec();
839 else if (status < 0)
840 return;
841 else if (w.pool.preBlockHelpingJoin(this)) {
842 while (status >= 0) { // variant of awaitDone
843 try {
844 synchronized(this) {
845 if (status >= 0)
846 wait();
847 else {
848 notifyAll();
849 break;
850 }
851 }
852 } catch (InterruptedException ie) {
853 cancelIfTerminating();
854 }
855 }
856 return;
857 }
858 }
859 }
860 }
861
862 /**
863 * Joins this task, without returning its result or throwing an
864 * exception. This method may be useful when processing
865 * collections of tasks when some have been cancelled or otherwise
866 * known to have aborted.
867 */
868 public final void quietlyJoin() {
869 Thread t = Thread.currentThread();
870 if (t instanceof ForkJoinWorkerThread) {
871 ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
872 if (!w.unpushTask(this) || !tryExec())
873 awaitDone(w);
874 }
875 else
876 externalAwaitDone();
877 }
878
879 /**
880 * Commences performing this task and awaits its completion if
881 * necessary, without returning its result or throwing an
882 * exception. This method may be useful when processing
883 * collections of tasks when some have been cancelled or otherwise
884 * known to have aborted.
885 */
886 public final void quietlyInvoke() {
887 if (!tryExec())
888 quietlyJoin();
889 }
890
891 /**
892 * Possibly executes tasks until the pool hosting the current task
893 * {@link ForkJoinPool#isQuiescent is quiescent}. This method may
894 * be of use in designs in which many tasks are forked, but none
895 * are explicitly joined, instead executing them until all are
896 * processed.
897 *
898 * <p>This method may be invoked only from within {@code
899 * ForkJoinTask} computations (as may be determined using method
900 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
901 * result in exceptions or errors, possibly including {@code
902 * ClassCastException}.
903 */
904 public static void helpQuiesce() {
905 ((ForkJoinWorkerThread) Thread.currentThread())
906 .helpQuiescePool();
907 }
908
909 /**
910 * Resets the internal bookkeeping state of this task, allowing a
911 * subsequent {@code fork}. This method allows repeated reuse of
912 * this task, but only if reuse occurs when this task has either
913 * never been forked, or has been forked, then completed and all
914 * outstanding joins of this task have also completed. Effects
915 * under any other usage conditions are not guaranteed.
916 * This method may be useful when executing
917 * pre-constructed trees of subtasks in loops.
918 */
919 public void reinitialize() {
920 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
921 exceptionMap.remove(this);
922 status = 0;
923 }
924
925 /**
926 * Returns the pool hosting the current task execution, or null
927 * if this task is executing outside of any ForkJoinPool.
928 *
929 * @see #inForkJoinPool
930 * @return the pool, or {@code null} if none
931 */
932 public static ForkJoinPool getPool() {
933 Thread t = Thread.currentThread();
934 return (t instanceof ForkJoinWorkerThread) ?
935 ((ForkJoinWorkerThread) t).pool : null;
936 }
937
938 /**
939 * Returns {@code true} if the current thread is executing as a
940 * ForkJoinPool computation.
941 *
942 * @return {@code true} if the current thread is executing as a
943 * ForkJoinPool computation, or false otherwise
944 */
945 public static boolean inForkJoinPool() {
946 return Thread.currentThread() instanceof ForkJoinWorkerThread;
947 }
948
949 /**
950 * Tries to unschedule this task for execution. This method will
951 * typically succeed if this task is the most recently forked task
952 * by the current thread, and has not commenced executing in
953 * another thread. This method may be useful when arranging
954 * alternative local processing of tasks that could have been, but
955 * were not, stolen.
956 *
957 * <p>This method may be invoked only from within {@code
958 * ForkJoinTask} computations (as may be determined using method
959 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
960 * result in exceptions or errors, possibly including {@code
961 * ClassCastException}.
962 *
963 * @return {@code true} if unforked
964 */
965 public boolean tryUnfork() {
966 return ((ForkJoinWorkerThread) Thread.currentThread())
967 .unpushTask(this);
968 }
969
970 /**
971 * Returns an estimate of the number of tasks that have been
972 * forked by the current worker thread but not yet executed. This
973 * value may be useful for heuristic decisions about whether to
974 * fork other tasks.
975 *
976 * <p>This method may be invoked only from within {@code
977 * ForkJoinTask} computations (as may be determined using method
978 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
979 * result in exceptions or errors, possibly including {@code
980 * ClassCastException}.
981 *
982 * @return the number of tasks
983 */
984 public static int getQueuedTaskCount() {
985 return ((ForkJoinWorkerThread) Thread.currentThread())
986 .getQueueSize();
987 }
988
989 /**
990 * Returns an estimate of how many more locally queued tasks are
991 * held by the current worker thread than there are other worker
992 * threads that might steal them. This value may be useful for
993 * heuristic decisions about whether to fork other tasks. In many
994 * usages of ForkJoinTasks, at steady state, each worker should
995 * aim to maintain a small constant surplus (for example, 3) of
996 * tasks, and to process computations locally if this threshold is
997 * exceeded.
998 *
999 * <p>This method may be invoked only from within {@code
1000 * ForkJoinTask} computations (as may be determined using method
1001 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1002 * result in exceptions or errors, possibly including {@code
1003 * ClassCastException}.
1004 *
1005 * @return the surplus number of tasks, which may be negative
1006 */
1007 public static int getSurplusQueuedTaskCount() {
1008 return ((ForkJoinWorkerThread) Thread.currentThread())
1009 .getEstimatedSurplusTaskCount();
1010 }
1011
1012 // Extension methods
1013
1014 /**
1015 * Returns the result that would be returned by {@link #join}, even
1016 * if this task completed abnormally, or {@code null} if this task
1017 * is not known to have been completed. This method is designed
1018 * to aid debugging, as well as to support extensions. Its use in
1019 * any other context is discouraged.
1020 *
1021 * @return the result, or {@code null} if not completed
1022 */
1023 public abstract V getRawResult();
1024
1025 /**
1026 * Forces the given value to be returned as a result. This method
1027 * is designed to support extensions, and should not in general be
1028 * called otherwise.
1029 *
1030 * @param value the value
1031 */
1032 protected abstract void setRawResult(V value);
1033
1034 /**
1035 * Immediately performs the base action of this task. This method
1036 * is designed to support extensions, and should not in general be
1037 * called otherwise. The return value controls whether this task
1038 * is considered to be done normally. It may return false in
1039 * asynchronous actions that require explicit invocations of
1040 * {@link #complete} to become joinable. It may also throw an
1041 * (unchecked) exception to indicate abnormal exit.
1042 *
1043 * @return {@code true} if completed normally
1044 */
1045 protected abstract boolean exec();
1046
1047 /**
1048 * Returns, but does not unschedule or execute, a task queued by
1049 * the current thread but not yet executed, if one is immediately
1050 * available. There is no guarantee that this task will actually
1051 * be polled or executed next. Conversely, this method may return
1052 * null even if a task exists but cannot be accessed without
1053 * contention with other threads. This method is designed
1054 * primarily to support extensions, and is unlikely to be useful
1055 * otherwise.
1056 *
1057 * <p>This method may be invoked only from within {@code
1058 * ForkJoinTask} computations (as may be determined using method
1059 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1060 * result in exceptions or errors, possibly including {@code
1061 * ClassCastException}.
1062 *
1063 * @return the next task, or {@code null} if none are available
1064 */
1065 protected static ForkJoinTask<?> peekNextLocalTask() {
1066 return ((ForkJoinWorkerThread) Thread.currentThread())
1067 .peekTask();
1068 }
1069
1070 /**
1071 * Unschedules and returns, without executing, the next task
1072 * queued by the current thread but not yet executed. This method
1073 * is designed primarily to support extensions, and is unlikely to
1074 * be useful otherwise.
1075 *
1076 * <p>This method may be invoked only from within {@code
1077 * ForkJoinTask} computations (as may be determined using method
1078 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1079 * result in exceptions or errors, possibly including {@code
1080 * ClassCastException}.
1081 *
1082 * @return the next task, or {@code null} if none are available
1083 */
1084 protected static ForkJoinTask<?> pollNextLocalTask() {
1085 return ((ForkJoinWorkerThread) Thread.currentThread())
1086 .pollLocalTask();
1087 }
1088
1089 /**
1090 * Unschedules and returns, without executing, the next task
1091 * queued by the current thread but not yet executed, if one is
1092 * available, or if not available, a task that was forked by some
1093 * other thread, if available. Availability may be transient, so a
1094 * {@code null} result does not necessarily imply quiescence
1095 * of the pool this task is operating in. This method is designed
1096 * primarily to support extensions, and is unlikely to be useful
1097 * otherwise.
1098 *
1099 * <p>This method may be invoked only from within {@code
1100 * ForkJoinTask} computations (as may be determined using method
1101 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1102 * result in exceptions or errors, possibly including {@code
1103 * ClassCastException}.
1104 *
1105 * @return a task, or {@code null} if none are available
1106 */
1107 protected static ForkJoinTask<?> pollTask() {
1108 return ((ForkJoinWorkerThread) Thread.currentThread())
1109 .pollTask();
1110 }
1111
1112 /**
1113 * Adaptor for Runnables. This implements RunnableFuture
1114 * to be compliant with AbstractExecutorService constraints
1115 * when used in ForkJoinPool.
1116 */
1117 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1118 implements RunnableFuture<T> {
1119 final Runnable runnable;
1120 final T resultOnCompletion;
1121 T result;
1122 AdaptedRunnable(Runnable runnable, T result) {
1123 if (runnable == null) throw new NullPointerException();
1124 this.runnable = runnable;
1125 this.resultOnCompletion = result;
1126 }
1127 public T getRawResult() { return result; }
1128 public void setRawResult(T v) { result = v; }
1129 public boolean exec() {
1130 runnable.run();
1131 result = resultOnCompletion;
1132 return true;
1133 }
1134 public void run() { invoke(); }
1135 private static final long serialVersionUID = 5232453952276885070L;
1136 }
1137
1138 /**
1139 * Adaptor for Callables
1140 */
1141 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1142 implements RunnableFuture<T> {
1143 final Callable<? extends T> callable;
1144 T result;
1145 AdaptedCallable(Callable<? extends T> callable) {
1146 if (callable == null) throw new NullPointerException();
1147 this.callable = callable;
1148 }
1149 public T getRawResult() { return result; }
1150 public void setRawResult(T v) { result = v; }
1151 public boolean exec() {
1152 try {
1153 result = callable.call();
1154 return true;
1155 } catch (Error err) {
1156 throw err;
1157 } catch (RuntimeException rex) {
1158 throw rex;
1159 } catch (Exception ex) {
1160 throw new RuntimeException(ex);
1161 }
1162 }
1163 public void run() { invoke(); }
1164 private static final long serialVersionUID = 2838392045355241008L;
1165 }
1166
1167 /**
1168 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1169 * method of the given {@code Runnable} as its action, and returns
1170 * a null result upon {@link #join}.
1171 *
1172 * @param runnable the runnable action
1173 * @return the task
1174 */
1175 public static ForkJoinTask<?> adapt(Runnable runnable) {
1176 return new AdaptedRunnable<Void>(runnable, null);
1177 }
1178
1179 /**
1180 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1181 * method of the given {@code Runnable} as its action, and returns
1182 * the given result upon {@link #join}.
1183 *
1184 * @param runnable the runnable action
1185 * @param result the result upon completion
1186 * @return the task
1187 */
1188 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1189 return new AdaptedRunnable<T>(runnable, result);
1190 }
1191
1192 /**
1193 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1194 * method of the given {@code Callable} as its action, and returns
1195 * its result upon {@link #join}, translating any checked exceptions
1196 * encountered into {@code RuntimeException}.
1197 *
1198 * @param callable the callable action
1199 * @return the task
1200 */
1201 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1202 return new AdaptedCallable<T>(callable);
1203 }
1204
1205 // Serialization support
1206
1207 private static final long serialVersionUID = -7721805057305804111L;
1208
1209 /**
1210 * Saves the state to a stream.
1211 *
1212 * @serialData the current run status and the exception thrown
1213 * during execution, or {@code null} if none
1214 * @param s the stream
1215 */
1216 private void writeObject(java.io.ObjectOutputStream s)
1217 throws java.io.IOException {
1218 s.defaultWriteObject();
1219 s.writeObject(getException());
1220 }
1221
1222 /**
1223 * Reconstitutes the instance from a stream.
1224 *
1225 * @param s the stream
1226 */
1227 private void readObject(java.io.ObjectInputStream s)
1228 throws java.io.IOException, ClassNotFoundException {
1229 s.defaultReadObject();
1230 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1231 status |= EXTERNAL_SIGNAL; // conservatively set external signal
1232 Object ex = s.readObject();
1233 if (ex != null)
1234 setDoneExceptionally((Throwable) ex);
1235 }
1236
1237 // Unsafe mechanics
1238
1239 private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
1240 private static final long statusOffset =
1241 objectFieldOffset("status", ForkJoinTask.class);
1242
1243 private static long objectFieldOffset(String field, Class<?> klazz) {
1244 try {
1245 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
1246 } catch (NoSuchFieldException e) {
1247 // Convert Exception to corresponding Error
1248 NoSuchFieldError error = new NoSuchFieldError(field);
1249 error.initCause(e);
1250 throw error;
1251 }
1252 }
1253 }