ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
Revision: 1.154
Committed: Sun Nov 14 16:19:13 2021 UTC (2 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.153: +24 -26 lines
Log Message:
Reduce static initialization; unify termination checks

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.io.Serializable;
10 import java.lang.reflect.Constructor;
11 import java.util.Collection;
12 import java.util.List;
13 import java.util.RandomAccess;
14 import java.util.concurrent.locks.LockSupport;
15 import jdk.internal.misc.Unsafe;
16
17 /**
18 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
19 * A {@code ForkJoinTask} is a thread-like entity that is much
20 * lighter weight than a normal thread. Huge numbers of tasks and
21 * subtasks may be hosted by a small number of actual threads in a
22 * ForkJoinPool, at the price of some usage limitations.
23 *
24 * <p>A "main" {@code ForkJoinTask} begins execution when it is
25 * explicitly submitted to a {@link ForkJoinPool}, or, if not already
26 * engaged in a ForkJoin computation, commenced in the {@link
27 * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
28 * related methods. Once started, it will usually in turn start other
29 * subtasks. As indicated by the name of this class, many programs
30 * using {@code ForkJoinTask} employ only methods {@link #fork} and
31 * {@link #join}, or derivatives such as {@link
32 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
33 * provides a number of other methods that can come into play in
34 * advanced usages, as well as extension mechanics that allow support
35 * of new forms of fork/join processing.
36 *
37 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
38 * The efficiency of {@code ForkJoinTask}s stems from a set of
39 * restrictions (that are only partially statically enforceable)
40 * reflecting their main use as computational tasks calculating pure
41 * functions or operating on purely isolated objects. The primary
42 * coordination mechanisms are {@link #fork}, that arranges
43 * asynchronous execution, and {@link #join}, that doesn't proceed
44 * until the task's result has been computed. Computations should
45 * ideally avoid {@code synchronized} methods or blocks, and should
46 * minimize other blocking synchronization apart from joining other
47 * tasks or using synchronizers such as Phasers that are advertised to
48 * cooperate with fork/join scheduling. Subdividable tasks should also
49 * not perform blocking I/O, and should ideally access variables that
50 * are completely independent of those accessed by other running
51 * tasks. These guidelines are loosely enforced by not permitting
52 * checked exceptions such as {@code IOExceptions} to be
53 * thrown. However, computations may still encounter unchecked
54 * exceptions, that are rethrown to callers attempting to join
55 * them. These exceptions may additionally include {@link
56 * RejectedExecutionException} stemming from internal resource
57 * exhaustion, such as failure to allocate internal task
58 * queues. Rethrown exceptions behave in the same way as regular
59 * exceptions, but, when possible, contain stack traces (as displayed
60 * for example using {@code ex.printStackTrace()}) of both the thread
61 * that initiated the computation as well as the thread actually
62 * encountering the exception; minimally only the latter.
63 *
64 * <p>It is possible to define and use ForkJoinTasks that may block,
65 * but doing so requires three further considerations: (1) Completion
66 * of few if any <em>other</em> tasks should be dependent on a task
67 * that blocks on external synchronization or I/O. Event-style async
68 * tasks that are never joined (for example, those subclassing {@link
69 * CountedCompleter}) often fall into this category. (2) To minimize
70 * resource impact, tasks should be small; ideally performing only the
71 * (possibly) blocking action. (3) Unless the {@link
72 * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
73 * blocked tasks is known to be less than the pool's {@link
74 * ForkJoinPool#getParallelism} level, the pool cannot guarantee that
75 * enough threads will be available to ensure progress or good
76 * performance.
77 *
78 * <p>The primary method for awaiting completion and extracting
79 * results of a task is {@link #join}, but there are several variants:
80 * The {@link Future#get} methods support interruptible and/or timed
81 * waits for completion and report results using {@code Future}
82 * conventions. Method {@link #invoke} is semantically
83 * equivalent to {@code fork(); join()} but always attempts to begin
84 * execution in the current thread. The "<em>quiet</em>" forms of
85 * these methods do not extract results or report exceptions. These
86 * may be useful when a set of tasks are being executed, and you need
87 * to delay processing of results or exceptions until all complete.
88 * Method {@code invokeAll} (available in multiple versions)
89 * performs the most common form of parallel invocation: forking a set
90 * of tasks and joining them all.
91 *
92 * <p>In the most typical usages, a fork-join pair act like a call
93 * (fork) and return (join) from a parallel recursive function. As is
94 * the case with other forms of recursive calls, returns (joins)
95 * should be performed innermost-first. For example, {@code a.fork();
96 * b.fork(); b.join(); a.join();} is likely to be substantially more
97 * efficient than joining {@code a} before {@code b}.
98 *
99 * <p>The execution status of tasks may be queried at several levels
100 * of detail: {@link #isDone} is true if a task completed in any way
101 * (including the case where a task was cancelled without executing);
102 * {@link #isCompletedNormally} is true if a task completed without
103 * cancellation or encountering an exception; {@link #isCancelled} is
104 * true if the task was cancelled (in which case {@link #getException}
105 * returns a {@link CancellationException}); and
106 * {@link #isCompletedAbnormally} is true if a task was either
107 * cancelled or encountered an exception, in which case {@link
108 * #getException} will return either the encountered exception or
109 * {@link CancellationException}.
110 *
111 * <p>The ForkJoinTask class is not usually directly subclassed.
112 * Instead, you subclass one of the abstract classes that support a
113 * particular style of fork/join processing, typically {@link
114 * RecursiveAction} for most computations that do not return results,
115 * {@link RecursiveTask} for those that do, and {@link
116 * CountedCompleter} for those in which completed actions trigger
117 * other actions. Normally, a concrete ForkJoinTask subclass declares
118 * fields comprising its parameters, established in a constructor, and
119 * then defines a {@code compute} method that somehow uses the control
120 * methods supplied by this base class.
121 *
122 * <p>Method {@link #join} and its variants are appropriate for use
123 * only when completion dependencies are acyclic; that is, the
124 * parallel computation can be described as a directed acyclic graph
125 * (DAG). Otherwise, executions may encounter a form of deadlock as
126 * tasks cyclically wait for each other. However, this framework
127 * supports other methods and techniques (for example the use of
128 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
129 * may be of use in constructing custom subclasses for problems that
130 * are not statically structured as DAGs. To support such usages, a
131 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
132 * value using {@link #setForkJoinTaskTag} or {@link
133 * #compareAndSetForkJoinTaskTag} and checked using {@link
134 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
135 * these {@code protected} methods or tags for any purpose, but they
136 * may be of use in the construction of specialized subclasses. For
137 * example, parallel graph traversals can use the supplied methods to
138 * avoid revisiting nodes/tasks that have already been processed.
139 * (Method names for tagging are bulky in part to encourage definition
140 * of methods that reflect their usage patterns.)
141 *
142 * <p>Most base support methods are {@code final}, to prevent
143 * overriding of implementations that are intrinsically tied to the
144 * underlying lightweight task scheduling framework. Developers
145 * creating new basic styles of fork/join processing should minimally
146 * implement {@code protected} methods {@link #exec}, {@link
147 * #setRawResult}, and {@link #getRawResult}, while also introducing
148 * an abstract computational method that can be implemented in its
149 * subclasses, possibly relying on other {@code protected} methods
150 * provided by this class.
151 *
152 * <p>ForkJoinTasks should perform relatively small amounts of
153 * computation. Large tasks should be split into smaller subtasks,
154 * usually via recursive decomposition. As a very rough rule of thumb,
155 * a task should perform more than 100 and less than 10000 basic
156 * computational steps, and should avoid indefinite looping. If tasks
157 * are too big, then parallelism cannot improve throughput. If too
158 * small, then memory and internal task maintenance overhead may
159 * overwhelm processing.
160 *
161 * <p>This class provides {@code adapt} methods for {@link Runnable}
162 * and {@link Callable}, that may be of use when mixing execution of
163 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
164 * of this form, consider using a pool constructed in <em>asyncMode</em>.
165 *
166 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
167 * used in extensions such as remote execution frameworks. It is
168 * sensible to serialize tasks only before or after, but not during,
169 * execution. Serialization is not relied on during execution itself.
170 *
171 * @since 1.7
172 * @author Doug Lea
173 */
174 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
175
176 /*
177 * See the internal documentation of class ForkJoinPool for a
178 * general implementation overview. ForkJoinTasks are mainly
179 * responsible for maintaining their "status" field amidst relays
180 * to methods in ForkJoinWorkerThread and ForkJoinPool.
181 *
182 * The methods of this class are more-or-less layered into
183 * (1) basic status maintenance
184 * (2) execution and awaiting completion
185 * (3) user-level methods that additionally report results.
186 * This is sometimes hard to see because this file orders exported
187 * methods in a way that flows well in javadocs.
188 *
189 * Revision notes: The use of "Aux" field replaces previous
190 * reliance on a table to hold exceptions and synchronized blocks
191 * and monitors to wait for completion. This class uses
192 * jdk-internal Unsafe for atomics and special memory modes,
193 * rather than VarHandles, to avoid initialization dependencies in
194 * other jdk components that require early parallelism.
195 */
196
197 /**
198 * Nodes for threads waiting for completion, or holding a thrown
199 * exception (never both). Waiting threads prepend nodes
200 * Treiber-stack-style. Signallers detach and unpark
201 * waiters. Cancelled waiters try to unsplice.
202 */
203 static final class Aux {
204 final Thread thread;
205 final Throwable ex; // null if a waiter
206 Aux next; // accessed only via memory-acquire chains
207 Aux(Thread thread, Throwable ex) {
208 this.thread = thread;
209 this.ex = ex;
210 }
211 @SuppressWarnings("removal")
212 final boolean casNext(Aux c, Aux v) { // used only in cancellation
213 return U.compareAndSetObject(this, NEXT, c, v);
214 }
215 private static final Unsafe U;
216 private static final long NEXT;
217 static {
218 U = Unsafe.getUnsafe();
219 NEXT = U.objectFieldOffset(Aux.class, "next");
220 }
221 }
222
223 /*
224 * The status field holds bits packed into a single int to ensure
225 * atomicity. Status is initially zero, and takes on nonnegative
226 * values until completed, upon which it holds (sign bit) DONE,
227 * possibly with ABNORMAL (cancelled or exceptional) and THROWN
228 * (in which case an exception has been stored). A value of
229 * ABNORMAL without DONE signifies an interrupted wait. These
230 * control bits occupy only (some of) the upper half (16 bits) of
231 * status field. The lower bits are used for user-defined tags.
232 */
233 private static final int DONE = 1 << 31; // must be negative
234 private static final int ABNORMAL = 1 << 16;
235 private static final int THROWN = 1 << 17;
236 private static final int SMASK = 0xffff; // short bits for tags
237 private static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
238
239 // Fields
240 volatile int status; // accessed directly by pool and workers
241 private transient volatile Aux aux; // either waiters or thrown Exception
242
243 // Support for atomic operations
244 private static final Unsafe U;
245 private static final long STATUS;
246 private static final long AUX;
247 private int getAndBitwiseOrStatus(int v) {
248 return U.getAndBitwiseOrInt(this, STATUS, v);
249 }
250 private boolean casStatus(int c, int v) {
251 return U.compareAndSetInt(this, STATUS, c, v);
252 }
253 @SuppressWarnings("removal")
254 private boolean casAux(Aux c, Aux v) {
255 return U.compareAndSetObject(this, AUX, c, v);
256 }
257
258 /** Removes and unparks waiters */
259 private void signalWaiters() {
260 for (Aux a; (a = aux) != null && a.ex == null; ) {
261 if (casAux(a, null)) { // detach entire list
262 for (Thread t; a != null; a = a.next) {
263 if ((t = a.thread) != Thread.currentThread() && t != null)
264 LockSupport.unpark(t); // don't self-signal
265 }
266 break;
267 }
268 }
269 }
270
271 /**
272 * Sets DONE status and wakes up threads waiting to join this task.
273 * @return status on exit
274 */
275 private int setDone() {
276 int s = getAndBitwiseOrStatus(DONE) | DONE;
277 signalWaiters();
278 return s;
279 }
280
281 /**
282 * Sets ABNORMAL DONE status unless already done, and wakes up threads
283 * waiting to join this task.
284 * @return status on exit
285 */
286 private int trySetCancelled() {
287 int s;
288 do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
289 signalWaiters();
290 return s;
291 }
292
293 /**
294 * Records exception and sets ABNORMAL THROWN DONE status unless
295 * already done, and wakes up threads waiting to join this task.
296 * If losing a race with setDone or trySetCancelled, the exception
297 * may be recorded but not reported.
298 *
299 * @return status on exit
300 */
301 final int trySetThrown(Throwable ex) {
302 Aux h = new Aux(Thread.currentThread(), ex), p = null;
303 boolean installed = false;
304 int s;
305 while ((s = status) >= 0) {
306 Aux a;
307 if (!installed && ((a = aux) == null || a.ex == null) &&
308 (installed = casAux(a, h)))
309 p = a; // list of waiters replaced by h
310 if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
311 break;
312 }
313 for (; p != null; p = p.next)
314 LockSupport.unpark(p.thread);
315 return s;
316 }
317
318 /**
319 * Records exception unless already done. Overridable in subclasses.
320 *
321 * @return status on exit
322 */
323 int trySetException(Throwable ex) {
324 return trySetThrown(ex);
325 }
326
327 /**
328 * Constructor for subclasses to call.
329 */
330 public ForkJoinTask() {}
331
332 static boolean isExceptionalStatus(int s) { // needed by subclasses
333 return (s & THROWN) != 0;
334 }
335
336 /**
337 * Unless done, calls exec and records status if completed, but
338 * doesn't wait for completion otherwise.
339 *
340 * @return status on exit from this method
341 */
342 final int doExec() {
343 int s; boolean completed;
344 if ((s = status) >= 0) {
345 try {
346 completed = exec();
347 } catch (Throwable rex) {
348 s = trySetException(rex);
349 completed = false;
350 }
351 if (completed)
352 s = setDone();
353 }
354 return s;
355 }
356
357 /**
358 * Helps and/or waits for completion from join, get, or invoke;
359 * called from either internal or external threads.
360 *
361 * @param pool if nonnull, known submitted pool, else assumes current pool
362 * @param ran true if task known to have been exec'd
363 * @param interruptible true if park interruptibly when external
364 * @param timed true if use timed wait
365 * @param nanos if timed, timeout value
366 * @return ABNORMAL if interrupted, else status on exit
367 */
368 private int awaitDone(ForkJoinPool pool, boolean ran,
369 boolean interruptible, boolean timed,
370 long nanos) {
371 ForkJoinPool p; boolean internal; int s; Thread t;
372 ForkJoinPool.WorkQueue q = null;
373 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
374 ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
375 p = wt.pool;
376 if (pool == null)
377 pool = p;
378 if (internal = (pool == p))
379 q = wt.workQueue;
380 }
381 else {
382 internal = false;
383 p = ForkJoinPool.commonPool();
384 if (pool == null)
385 pool = p;
386 if (pool == p && p != null)
387 q = p.externalQueue();
388 }
389 if (interruptible && Thread.interrupted())
390 return ABNORMAL;
391 if ((s = status) < 0)
392 return s;
393 long deadline = 0L;
394 if (timed) {
395 if (nanos <= 0L)
396 return 0;
397 else if ((deadline = nanos + System.nanoTime()) == 0L)
398 deadline = 1L;
399 }
400 boolean uncompensate = false;
401 if (q != null && p != null) { // try helping
402 // help even in timed mode if pool has no parallelism
403 boolean canHelp = !timed || (p.mode & SMASK) == 0;
404 if (canHelp) {
405 if ((this instanceof CountedCompleter) &&
406 (s = p.helpComplete(this, q, internal)) < 0)
407 return s;
408 if (!ran && ((!internal && q.externalTryUnpush(this)) ||
409 q.tryRemove(this, internal)) && (s = doExec()) < 0)
410 return s;
411 }
412 if (internal) {
413 if ((s = p.helpJoin(this, q, canHelp)) < 0)
414 return s;
415 if (s == UNCOMPENSATE)
416 uncompensate = true;
417 }
418 }
419 // block until done or cancelled wait
420 boolean interrupted = false, queued = false;
421 boolean parked = false, fail = false;
422 Aux node = null;
423 while ((s = status) >= 0) {
424 Aux a; long ns;
425 if (fail || (fail = (pool != null && pool.mode < 0)))
426 casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
427 else if (parked && Thread.interrupted()) {
428 if (interruptible) {
429 s = ABNORMAL;
430 break;
431 }
432 interrupted = true;
433 }
434 else if (queued) {
435 if (deadline != 0L) {
436 if ((ns = deadline - System.nanoTime()) <= 0L)
437 break;
438 LockSupport.parkNanos(ns);
439 }
440 else
441 LockSupport.park();
442 parked = true;
443 }
444 else if (node != null) {
445 if ((a = aux) != null && a.ex != null)
446 Thread.onSpinWait(); // exception in progress
447 else if (queued = casAux(node.next = a, node))
448 LockSupport.setCurrentBlocker(this);
449 }
450 else {
451 try {
452 node = new Aux(Thread.currentThread(), null);
453 } catch (Throwable ex) { // cannot create
454 fail = true;
455 }
456 }
457 }
458 if (pool != null && uncompensate)
459 pool.uncompensate();
460
461 if (queued) {
462 LockSupport.setCurrentBlocker(null);
463 if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
464 outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
465 for (Aux trail = null;;) {
466 Aux next = a.next;
467 if (a == node) {
468 if (trail != null)
469 trail.casNext(trail, next);
470 else if (casAux(a, next))
471 break outer; // cannot be re-encountered
472 break; // restart
473 } else {
474 trail = a;
475 if ((a = next) == null)
476 break outer;
477 }
478 }
479 }
480 }
481 else {
482 signalWaiters(); // help clean or signal
483 if (interrupted)
484 Thread.currentThread().interrupt();
485 }
486 }
487 return s;
488 }
489
490 /**
491 * Cancels, ignoring any exceptions thrown by cancel. Cancel is
492 * spec'ed not to throw any exceptions, but if it does anyway, we
493 * have no recourse, so guard against this case.
494 */
495 static final void cancelIgnoringExceptions(Future<?> t) {
496 if (t != null) {
497 try {
498 t.cancel(true);
499 } catch (Throwable ignore) {
500 }
501 }
502 }
503
504 /**
505 * Returns a rethrowable exception for this task, if available.
506 * To provide accurate stack traces, if the exception was not
507 * thrown by the current thread, we try to create a new exception
508 * of the same type as the one thrown, but with the recorded
509 * exception as its cause. If there is no such constructor, we
510 * instead try to use a no-arg constructor, followed by initCause,
511 * to the same effect. If none of these apply, or any fail due to
512 * other exceptions, we return the recorded exception, which is
513 * still correct, although it may contain a misleading stack
514 * trace.
515 *
516 * @return the exception, or null if none
517 */
518 private Throwable getThrowableException() {
519 Throwable ex; Aux a;
520 if ((a = aux) == null)
521 ex = null;
522 else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
523 try {
524 Constructor<?> noArgCtor = null, oneArgCtor = null;
525 for (Constructor<?> c : ex.getClass().getConstructors()) {
526 Class<?>[] ps = c.getParameterTypes();
527 if (ps.length == 0)
528 noArgCtor = c;
529 else if (ps.length == 1 && ps[0] == Throwable.class) {
530 oneArgCtor = c;
531 break;
532 }
533 }
534 if (oneArgCtor != null)
535 ex = (Throwable)oneArgCtor.newInstance(ex);
536 else if (noArgCtor != null) {
537 Throwable rx = (Throwable)noArgCtor.newInstance();
538 rx.initCause(ex);
539 ex = rx;
540 }
541 } catch (Exception ignore) {
542 }
543 }
544 return ex;
545 }
546
547 /**
548 * Returns exception associated with the given status, or null if none.
549 */
550 private Throwable getException(int s) {
551 Throwable ex = null;
552 if ((s & ABNORMAL) != 0 &&
553 ((s & THROWN) == 0 || (ex = getThrowableException()) == null))
554 ex = new CancellationException();
555 return ex;
556 }
557
558 /**
559 * Throws exception associated with the given status, or
560 * CancellationException if none recorded.
561 */
562 private void reportException(int s) {
563 ForkJoinTask.<RuntimeException>uncheckedThrow(
564 (s & THROWN) != 0 ? getThrowableException() : null);
565 }
566
567 /**
568 * Throws exception for (timed or untimed) get, wrapping if
569 * necessary in an ExecutionException.
570 */
571 private void reportExecutionException(int s) {
572 Throwable ex = null;
573 if (s == ABNORMAL)
574 ex = new InterruptedException();
575 else if (s >= 0)
576 ex = new TimeoutException();
577 else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null)
578 ex = new ExecutionException(ex);
579 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
580 }
581
582 /**
583 * A version of "sneaky throw" to relay exceptions in other
584 * contexts.
585 */
586 static void rethrow(Throwable ex) {
587 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
588 }
589
590 /**
591 * The sneaky part of sneaky throw, relying on generics
592 * limitations to evade compiler complaints about rethrowing
593 * unchecked exceptions. If argument null, throws
594 * CancellationException.
595 */
596 @SuppressWarnings("unchecked") static <T extends Throwable>
597 void uncheckedThrow(Throwable t) throws T {
598 if (t == null)
599 t = new CancellationException();
600 throw (T)t; // rely on vacuous cast
601 }
602
603 // public methods
604
605 /**
606 * Arranges to asynchronously execute this task in the pool the
607 * current task is running in, if applicable, or using the {@link
608 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
609 * it is not necessarily enforced, it is a usage error to fork a
610 * task more than once unless it has completed and been
611 * reinitialized. Subsequent modifications to the state of this
612 * task or any data it operates on are not necessarily
613 * consistently observable by any thread other than the one
614 * executing it unless preceded by a call to {@link #join} or
615 * related methods, or a call to {@link #isDone} returning {@code
616 * true}.
617 *
618 * @return {@code this}, to simplify usage
619 */
620 public final ForkJoinTask<V> fork() {
621 Thread t; ForkJoinWorkerThread w;
622 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
623 (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
624 else
625 ForkJoinPool.externalPushCommon(this);
626 return this;
627 }
628
629 /**
630 * Returns the result of the computation when it
631 * {@linkplain #isDone is done}.
632 * This method differs from {@link #get()} in that abnormal
633 * completion results in {@code RuntimeException} or {@code Error},
634 * not {@code ExecutionException}, and that interrupts of the
635 * calling thread do <em>not</em> cause the method to abruptly
636 * return by throwing {@code InterruptedException}.
637 *
638 * @return the computed result
639 */
640 public final V join() {
641 int s;
642 if ((s = status) >= 0)
643 s = awaitDone(null, false, false, false, 0L);
644 if ((s & ABNORMAL) != 0)
645 reportException(s);
646 return getRawResult();
647 }
648
649 /**
650 * Commences performing this task, awaits its completion if
651 * necessary, and returns its result, or throws an (unchecked)
652 * {@code RuntimeException} or {@code Error} if the underlying
653 * computation did so.
654 *
655 * @return the computed result
656 */
657 public final V invoke() {
658 int s;
659 if ((s = doExec()) >= 0)
660 s = awaitDone(null, true, false, false, 0L);
661 if ((s & ABNORMAL) != 0)
662 reportException(s);
663 return getRawResult();
664 }
665
666 /**
667 * Forks the given tasks, returning when {@code isDone} holds for
668 * each task or an (unchecked) exception is encountered, in which
669 * case the exception is rethrown. If more than one task
670 * encounters an exception, then this method throws any one of
671 * these exceptions. If any task encounters an exception, the
672 * other may be cancelled. However, the execution status of
673 * individual tasks is not guaranteed upon exceptional return. The
674 * status of each task may be obtained using {@link
675 * #getException()} and related methods to check if they have been
676 * cancelled, completed normally or exceptionally, or left
677 * unprocessed.
678 *
679 * @param t1 the first task
680 * @param t2 the second task
681 * @throws NullPointerException if any task is null
682 */
683 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
684 int s1, s2;
685 if (t1 == null || t2 == null)
686 throw new NullPointerException();
687 t2.fork();
688 if ((s1 = t1.doExec()) >= 0)
689 s1 = t1.awaitDone(null, true, false, false, 0L);
690 if ((s1 & ABNORMAL) != 0) {
691 cancelIgnoringExceptions(t2);
692 t1.reportException(s1);
693 }
694 else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
695 t2.reportException(s2);
696 }
697
698 /**
699 * Forks the given tasks, returning when {@code isDone} holds for
700 * each task or an (unchecked) exception is encountered, in which
701 * case the exception is rethrown. If more than one task
702 * encounters an exception, then this method throws any one of
703 * these exceptions. If any task encounters an exception, others
704 * may be cancelled. However, the execution status of individual
705 * tasks is not guaranteed upon exceptional return. The status of
706 * each task may be obtained using {@link #getException()} and
707 * related methods to check if they have been cancelled, completed
708 * normally or exceptionally, or left unprocessed.
709 *
710 * @param tasks the tasks
711 * @throws NullPointerException if any task is null
712 */
713 public static void invokeAll(ForkJoinTask<?>... tasks) {
714 Throwable ex = null;
715 int last = tasks.length - 1;
716 for (int i = last; i >= 0; --i) {
717 ForkJoinTask<?> t;
718 if ((t = tasks[i]) == null) {
719 ex = new NullPointerException();
720 break;
721 }
722 if (i == 0) {
723 int s;
724 if ((s = t.doExec()) >= 0)
725 s = t.awaitDone(null, true, false, false, 0L);
726 if ((s & ABNORMAL) != 0)
727 ex = t.getException(s);
728 break;
729 }
730 t.fork();
731 }
732 if (ex == null) {
733 for (int i = 1; i <= last; ++i) {
734 ForkJoinTask<?> t;
735 if ((t = tasks[i]) != null) {
736 int s;
737 if ((s = t.status) >= 0)
738 s = t.awaitDone(null, false, false, false, 0L);
739 if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
740 break;
741 }
742 }
743 }
744 if (ex != null) {
745 for (int i = 1; i <= last; ++i)
746 cancelIgnoringExceptions(tasks[i]);
747 rethrow(ex);
748 }
749 }
750
751 /**
752 * Forks all tasks in the specified collection, returning when
753 * {@code isDone} holds for each task or an (unchecked) exception
754 * is encountered, in which case the exception is rethrown. If
755 * more than one task encounters an exception, then this method
756 * throws any one of these exceptions. If any task encounters an
757 * exception, others may be cancelled. However, the execution
758 * status of individual tasks is not guaranteed upon exceptional
759 * return. The status of each task may be obtained using {@link
760 * #getException()} and related methods to check if they have been
761 * cancelled, completed normally or exceptionally, or left
762 * unprocessed.
763 *
764 * @param tasks the collection of tasks
765 * @param <T> the type of the values returned from the tasks
766 * @return the tasks argument, to simplify usage
767 * @throws NullPointerException if tasks or any element are null
768 */
769 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
770 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
771 invokeAll(tasks.toArray(new ForkJoinTask<?>[0]));
772 return tasks;
773 }
774 @SuppressWarnings("unchecked")
775 List<? extends ForkJoinTask<?>> ts =
776 (List<? extends ForkJoinTask<?>>) tasks;
777 Throwable ex = null;
778 int last = ts.size() - 1; // nearly same as array version
779 for (int i = last; i >= 0; --i) {
780 ForkJoinTask<?> t;
781 if ((t = ts.get(i)) == null) {
782 ex = new NullPointerException();
783 break;
784 }
785 if (i == 0) {
786 int s;
787 if ((s = t.doExec()) >= 0)
788 s = t.awaitDone(null, true, false, false, 0L);
789 if ((s & ABNORMAL) != 0)
790 ex = t.getException(s);
791 break;
792 }
793 t.fork();
794 }
795 if (ex == null) {
796 for (int i = 1; i <= last; ++i) {
797 ForkJoinTask<?> t;
798 if ((t = ts.get(i)) != null) {
799 int s;
800 if ((s = t.status) >= 0)
801 s = t.awaitDone(null, false, false, false, 0L);
802 if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
803 break;
804 }
805 }
806 }
807 if (ex != null) {
808 for (int i = 1; i <= last; ++i)
809 cancelIgnoringExceptions(ts.get(i));
810 rethrow(ex);
811 }
812 return tasks;
813 }
814
815 /**
816 * Attempts to cancel execution of this task. This attempt will
817 * fail if the task has already completed or could not be
818 * cancelled for some other reason. If successful, and this task
819 * has not started when {@code cancel} is called, execution of
820 * this task is suppressed. After this method returns
821 * successfully, unless there is an intervening call to {@link
822 * #reinitialize}, subsequent calls to {@link #isCancelled},
823 * {@link #isDone}, and {@code cancel} will return {@code true}
824 * and calls to {@link #join} and related methods will result in
825 * {@code CancellationException}.
826 *
827 * <p>This method may be overridden in subclasses, but if so, must
828 * still ensure that these properties hold. In particular, the
829 * {@code cancel} method itself must not throw exceptions.
830 *
831 * <p>This method is designed to be invoked by <em>other</em>
832 * tasks. To terminate the current task, you can just return or
833 * throw an unchecked exception from its computation method, or
834 * invoke {@link #completeExceptionally(Throwable)}.
835 *
836 * @param mayInterruptIfRunning this value has no effect in the
837 * default implementation because interrupts are not used to
838 * control cancellation.
839 *
840 * @return {@code true} if this task is now cancelled
841 */
842 public boolean cancel(boolean mayInterruptIfRunning) {
843 return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
844 }
845
846 public final boolean isDone() {
847 return status < 0;
848 }
849
850 public final boolean isCancelled() {
851 return (status & (ABNORMAL | THROWN)) == ABNORMAL;
852 }
853
854 /**
855 * Returns {@code true} if this task threw an exception or was cancelled.
856 *
857 * @return {@code true} if this task threw an exception or was cancelled
858 */
859 public final boolean isCompletedAbnormally() {
860 return (status & ABNORMAL) != 0;
861 }
862
863 /**
864 * Returns {@code true} if this task completed without throwing an
865 * exception and was not cancelled.
866 *
867 * @return {@code true} if this task completed without throwing an
868 * exception and was not cancelled
869 */
870 public final boolean isCompletedNormally() {
871 return (status & (DONE | ABNORMAL)) == DONE;
872 }
873
874 /**
875 * Returns the exception thrown by the base computation, or a
876 * {@code CancellationException} if cancelled, or {@code null} if
877 * none or if the method has not yet completed.
878 *
879 * @return the exception, or {@code null} if none
880 */
881 public final Throwable getException() {
882 return getException(status);
883 }
884
885 /**
886 * Completes this task abnormally, and if not already aborted or
887 * cancelled, causes it to throw the given exception upon
888 * {@code join} and related operations. This method may be used
889 * to induce exceptions in asynchronous tasks, or to force
890 * completion of tasks that would not otherwise complete. Its use
891 * in other situations is discouraged. This method is
892 * overridable, but overridden versions must invoke {@code super}
893 * implementation to maintain guarantees.
894 *
895 * @param ex the exception to throw. If this exception is not a
896 * {@code RuntimeException} or {@code Error}, the actual exception
897 * thrown will be a {@code RuntimeException} with cause {@code ex}.
898 */
899 public void completeExceptionally(Throwable ex) {
900 trySetException((ex instanceof RuntimeException) ||
901 (ex instanceof Error) ? ex :
902 new RuntimeException(ex));
903 }
904
905 /**
906 * Completes this task, and if not already aborted or cancelled,
907 * returning the given value as the result of subsequent
908 * invocations of {@code join} and related operations. This method
909 * may be used to provide results for asynchronous tasks, or to
910 * provide alternative handling for tasks that would not otherwise
911 * complete normally. Its use in other situations is
912 * discouraged. This method is overridable, but overridden
913 * versions must invoke {@code super} implementation to maintain
914 * guarantees.
915 *
916 * @param value the result value for this task
917 */
918 public void complete(V value) {
919 try {
920 setRawResult(value);
921 } catch (Throwable rex) {
922 trySetException(rex);
923 return;
924 }
925 setDone();
926 }
927
928 /**
929 * Completes this task normally without setting a value. The most
930 * recent value established by {@link #setRawResult} (or {@code
931 * null} by default) will be returned as the result of subsequent
932 * invocations of {@code join} and related operations.
933 *
934 * @since 1.8
935 */
936 public final void quietlyComplete() {
937 setDone();
938 }
939
940 /**
941 * Waits if necessary for the computation to complete, and then
942 * retrieves its result.
943 *
944 * @return the computed result
945 * @throws CancellationException if the computation was cancelled
946 * @throws ExecutionException if the computation threw an
947 * exception
948 * @throws InterruptedException if the current thread is not a
949 * member of a ForkJoinPool and was interrupted while waiting
950 */
951 public final V get() throws InterruptedException, ExecutionException {
952 int s = awaitDone(null, false, true, false, 0L);
953 if ((s & ABNORMAL) != 0)
954 reportExecutionException(s);
955 return getRawResult();
956 }
957
958 /**
959 * Waits if necessary for at most the given time for the computation
960 * to complete, and then retrieves its result, if available.
961 *
962 * @param timeout the maximum time to wait
963 * @param unit the time unit of the timeout argument
964 * @return the computed result
965 * @throws CancellationException if the computation was cancelled
966 * @throws ExecutionException if the computation threw an
967 * exception
968 * @throws InterruptedException if the current thread is not a
969 * member of a ForkJoinPool and was interrupted while waiting
970 * @throws TimeoutException if the wait timed out
971 */
972 public final V get(long timeout, TimeUnit unit)
973 throws InterruptedException, ExecutionException, TimeoutException {
974 long nanos = unit.toNanos(timeout);
975 int s = awaitDone(null, false, true, true, nanos);
976 if (s >= 0 || (s & ABNORMAL) != 0)
977 reportExecutionException(s);
978 return getRawResult();
979 }
980
981 /**
982 * Joins this task, without returning its result or throwing its
983 * exception. This method may be useful when processing
984 * collections of tasks when some have been cancelled or otherwise
985 * known to have aborted.
986 */
987 public final void quietlyJoin() {
988 if (status >= 0)
989 awaitDone(null, false, false, false, 0L);
990 }
991
992
993 /**
994 * Commences performing this task and awaits its completion if
995 * necessary, without returning its result or throwing its
996 * exception.
997 */
998 public final void quietlyInvoke() {
999 if (doExec() >= 0)
1000 awaitDone(null, true, false, false, 0L);
1001 }
1002
1003 // Versions of join/get for pool.invoke* methods that use external,
1004 // possibly-non-commonPool submits
1005
1006 final void awaitPoolInvoke(ForkJoinPool pool) {
1007 awaitDone(pool, false, false, false, 0L);
1008 }
1009 final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
1010 awaitDone(pool, false, true, true, nanos);
1011 }
1012 final V joinForPoolInvoke(ForkJoinPool pool) {
1013 int s = awaitDone(pool, false, false, false, 0L);
1014 if ((s & ABNORMAL) != 0)
1015 reportException(s);
1016 return getRawResult();
1017 }
1018 final V getForPoolInvoke(ForkJoinPool pool)
1019 throws InterruptedException, ExecutionException {
1020 int s = awaitDone(pool, false, true, false, 0L);
1021 if ((s & ABNORMAL) != 0)
1022 reportExecutionException(s);
1023 return getRawResult();
1024 }
1025 final V getForPoolInvoke(ForkJoinPool pool, long nanos)
1026 throws InterruptedException, ExecutionException, TimeoutException {
1027 int s = awaitDone(pool, false, true, true, nanos);
1028 if (s >= 0 || (s & ABNORMAL) != 0)
1029 reportExecutionException(s);
1030 return getRawResult();
1031 }
1032
1033 /**
1034 * Possibly executes tasks until the pool hosting the current task
1035 * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
1036 * method may be of use in designs in which many tasks are forked,
1037 * but none are explicitly joined, instead executing them until
1038 * all are processed.
1039 */
1040 public static void helpQuiesce() {
1041 Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1042 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1043 (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1044 p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1045 else
1046 ForkJoinPool.commonPool().externalHelpQuiescePool(Long.MAX_VALUE, false);
1047 }
1048
1049 /**
1050 * Resets the internal bookkeeping state of this task, allowing a
1051 * subsequent {@code fork}. This method allows repeated reuse of
1052 * this task, but only if reuse occurs when this task has either
1053 * never been forked, or has been forked, then completed and all
1054 * outstanding joins of this task have also completed. Effects
1055 * under any other usage conditions are not guaranteed.
1056 * This method may be useful when executing
1057 * pre-constructed trees of subtasks in loops.
1058 *
1059 * <p>Upon completion of this method, {@code isDone()} reports
1060 * {@code false}, and {@code getException()} reports {@code
1061 * null}. However, the value returned by {@code getRawResult} is
1062 * unaffected. To clear this value, you can invoke {@code
1063 * setRawResult(null)}.
1064 */
1065 public void reinitialize() {
1066 aux = null;
1067 status = 0;
1068 }
1069
1070 /**
1071 * Returns the pool hosting the current thread, or {@code null}
1072 * if the current thread is executing outside of any ForkJoinPool.
1073 *
1074 * <p>This method returns {@code null} if and only if {@link
1075 * #inForkJoinPool} returns {@code false}.
1076 *
1077 * @return the pool, or {@code null} if none
1078 */
1079 public static ForkJoinPool getPool() {
1080 Thread t;
1081 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1082 ((ForkJoinWorkerThread) t).pool : null);
1083 }
1084
1085 /**
1086 * Returns {@code true} if the current thread is a {@link
1087 * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
1088 *
1089 * @return {@code true} if the current thread is a {@link
1090 * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
1091 * or {@code false} otherwise
1092 */
1093 public static boolean inForkJoinPool() {
1094 return Thread.currentThread() instanceof ForkJoinWorkerThread;
1095 }
1096
1097 /**
1098 * Tries to unschedule this task for execution. This method will
1099 * typically (but is not guaranteed to) succeed if this task is
1100 * the most recently forked task by the current thread, and has
1101 * not commenced executing in another thread. This method may be
1102 * useful when arranging alternative local processing of tasks
1103 * that could have been, but were not, stolen.
1104 *
1105 * @return {@code true} if unforked
1106 */
1107 public boolean tryUnfork() {
1108 Thread t; ForkJoinPool.WorkQueue q;
1109 return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1110 ? (q = ((ForkJoinWorkerThread)t).workQueue) != null
1111 && q.tryUnpush(this)
1112 : (q = ForkJoinPool.commonQueue()) != null
1113 && q.externalTryUnpush(this);
1114 }
1115
1116 /**
1117 * Returns an estimate of the number of tasks that have been
1118 * forked by the current worker thread but not yet executed. This
1119 * value may be useful for heuristic decisions about whether to
1120 * fork other tasks.
1121 *
1122 * @return the number of tasks
1123 */
1124 public static int getQueuedTaskCount() {
1125 Thread t; ForkJoinPool.WorkQueue q;
1126 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1127 q = ((ForkJoinWorkerThread)t).workQueue;
1128 else
1129 q = ForkJoinPool.commonQueue();
1130 return (q == null) ? 0 : q.queueSize();
1131 }
1132
1133 /**
1134 * Returns an estimate of how many more locally queued tasks are
1135 * held by the current worker thread than there are other worker
1136 * threads that might steal them, or zero if this thread is not
1137 * operating in a ForkJoinPool. This value may be useful for
1138 * heuristic decisions about whether to fork other tasks. In many
1139 * usages of ForkJoinTasks, at steady state, each worker should
1140 * aim to maintain a small constant surplus (for example, 3) of
1141 * tasks, and to process computations locally if this threshold is
1142 * exceeded.
1143 *
1144 * @return the surplus number of tasks, which may be negative
1145 */
1146 public static int getSurplusQueuedTaskCount() {
1147 return ForkJoinPool.getSurplusQueuedTaskCount();
1148 }
1149
1150 // Extension methods
1151
1152 /**
1153 * Returns the result that would be returned by {@link #join}, even
1154 * if this task completed abnormally, or {@code null} if this task
1155 * is not known to have been completed. This method is designed
1156 * to aid debugging, as well as to support extensions. Its use in
1157 * any other context is discouraged.
1158 *
1159 * @return the result, or {@code null} if not completed
1160 */
1161 public abstract V getRawResult();
1162
1163 /**
1164 * Forces the given value to be returned as a result. This method
1165 * is designed to support extensions, and should not in general be
1166 * called otherwise.
1167 *
1168 * @param value the value
1169 */
1170 protected abstract void setRawResult(V value);
1171
1172 /**
1173 * Immediately performs the base action of this task and returns
1174 * true if, upon return from this method, this task is guaranteed
1175 * to have completed. This method may return false otherwise, to
1176 * indicate that this task is not necessarily complete (or is not
1177 * known to be complete), for example in asynchronous actions that
1178 * require explicit invocations of completion methods. This method
1179 * may also throw an (unchecked) exception to indicate abnormal
1180 * exit. This method is designed to support extensions, and should
1181 * not in general be called otherwise.
1182 *
1183 * @return {@code true} if this task is known to have completed normally
1184 */
1185 protected abstract boolean exec();
1186
1187 /**
1188 * Returns, but does not unschedule or execute, a task queued by
1189 * the current thread but not yet executed, if one is immediately
1190 * available. There is no guarantee that this task will actually
1191 * be polled or executed next. Conversely, this method may return
1192 * null even if a task exists but cannot be accessed without
1193 * contention with other threads. This method is designed
1194 * primarily to support extensions, and is unlikely to be useful
1195 * otherwise.
1196 *
1197 * @return the next task, or {@code null} if none are available
1198 */
1199 protected static ForkJoinTask<?> peekNextLocalTask() {
1200 Thread t; ForkJoinPool.WorkQueue q;
1201 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1202 q = ((ForkJoinWorkerThread)t).workQueue;
1203 else
1204 q = ForkJoinPool.commonQueue();
1205 return (q == null) ? null : q.peek();
1206 }
1207
1208 /**
1209 * Unschedules and returns, without executing, the next task
1210 * queued by the current thread but not yet executed, if the
1211 * current thread is operating in a ForkJoinPool. This method is
1212 * designed primarily to support extensions, and is unlikely to be
1213 * useful otherwise.
1214 *
1215 * @return the next task, or {@code null} if none are available
1216 */
1217 protected static ForkJoinTask<?> pollNextLocalTask() {
1218 Thread t;
1219 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1220 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null);
1221 }
1222
1223 /**
1224 * If the current thread is operating in a ForkJoinPool,
1225 * unschedules and returns, without executing, the next task
1226 * queued by the current thread but not yet executed, if one is
1227 * available, or if not available, a task that was forked by some
1228 * other thread, if available. Availability may be transient, so a
1229 * {@code null} result does not necessarily imply quiescence of
1230 * the pool this task is operating in. This method is designed
1231 * primarily to support extensions, and is unlikely to be useful
1232 * otherwise.
1233 *
1234 * @return a task, or {@code null} if none are available
1235 */
1236 protected static ForkJoinTask<?> pollTask() {
1237 Thread t; ForkJoinWorkerThread w;
1238 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1239 (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) :
1240 null);
1241 }
1242
1243 /**
1244 * If the current thread is operating in a ForkJoinPool,
1245 * unschedules and returns, without executing, a task externally
1246 * submitted to the pool, if one is available. Availability may be
1247 * transient, so a {@code null} result does not necessarily imply
1248 * quiescence of the pool. This method is designed primarily to
1249 * support extensions, and is unlikely to be useful otherwise.
1250 *
1251 * @return a task, or {@code null} if none are available
1252 * @since 9
1253 */
1254 protected static ForkJoinTask<?> pollSubmission() {
1255 Thread t;
1256 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1257 ((ForkJoinWorkerThread)t).pool.pollSubmission() : null);
1258 }
1259
1260 // tag operations
1261
1262 /**
1263 * Returns the tag for this task.
1264 *
1265 * @return the tag for this task
1266 * @since 1.8
1267 */
1268 public final short getForkJoinTaskTag() {
1269 return (short)status;
1270 }
1271
1272 /**
1273 * Atomically sets the tag value for this task and returns the old value.
1274 *
1275 * @param newValue the new tag value
1276 * @return the previous value of the tag
1277 * @since 1.8
1278 */
1279 public final short setForkJoinTaskTag(short newValue) {
1280 for (int s;;) {
1281 if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK)))
1282 return (short)s;
1283 }
1284 }
1285
1286 /**
1287 * Atomically conditionally sets the tag value for this task.
1288 * Among other applications, tags can be used as visit markers
1289 * in tasks operating on graphs, as in methods that check: {@code
1290 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
1291 * before processing, otherwise exiting because the node has
1292 * already been visited.
1293 *
1294 * @param expect the expected tag value
1295 * @param update the new tag value
1296 * @return {@code true} if successful; i.e., the current value was
1297 * equal to {@code expect} and was changed to {@code update}.
1298 * @since 1.8
1299 */
1300 public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
1301 for (int s;;) {
1302 if ((short)(s = status) != expect)
1303 return false;
1304 if (casStatus(s, (s & ~SMASK) | (update & SMASK)))
1305 return true;
1306 }
1307 }
1308
1309 /**
1310 * Adapter for Runnables. This implements RunnableFuture
1311 * to be compliant with AbstractExecutorService constraints
1312 * when used in ForkJoinPool.
1313 */
1314 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1315 implements RunnableFuture<T> {
1316 @SuppressWarnings("serial") // Conditionally serializable
1317 final Runnable runnable;
1318 @SuppressWarnings("serial") // Conditionally serializable
1319 T result;
1320 AdaptedRunnable(Runnable runnable, T result) {
1321 if (runnable == null) throw new NullPointerException();
1322 this.runnable = runnable;
1323 this.result = result; // OK to set this even before completion
1324 }
1325 public final T getRawResult() { return result; }
1326 public final void setRawResult(T v) { result = v; }
1327 public final boolean exec() { runnable.run(); return true; }
1328 public final void run() { invoke(); }
1329 public String toString() {
1330 return super.toString() + "[Wrapped task = " + runnable + "]";
1331 }
1332 private static final long serialVersionUID = 5232453952276885070L;
1333 }
1334
1335 /**
1336 * Adapter for Runnables without results.
1337 */
1338 static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1339 implements RunnableFuture<Void> {
1340 @SuppressWarnings("serial") // Conditionally serializable
1341 final Runnable runnable;
1342 AdaptedRunnableAction(Runnable runnable) {
1343 if (runnable == null) throw new NullPointerException();
1344 this.runnable = runnable;
1345 }
1346 public final Void getRawResult() { return null; }
1347 public final void setRawResult(Void v) { }
1348 public final boolean exec() { runnable.run(); return true; }
1349 public final void run() { invoke(); }
1350 public String toString() {
1351 return super.toString() + "[Wrapped task = " + runnable + "]";
1352 }
1353 private static final long serialVersionUID = 5232453952276885070L;
1354 }
1355
1356 /**
1357 * Adapter for Runnables in which failure forces worker exception.
1358 */
1359 static final class RunnableExecuteAction extends ForkJoinTask<Void> {
1360 @SuppressWarnings("serial") // Conditionally serializable
1361 final Runnable runnable;
1362 RunnableExecuteAction(Runnable runnable) {
1363 if (runnable == null) throw new NullPointerException();
1364 this.runnable = runnable;
1365 }
1366 public final Void getRawResult() { return null; }
1367 public final void setRawResult(Void v) { }
1368 public final boolean exec() { runnable.run(); return true; }
1369 int trySetException(Throwable ex) { // if a handler, invoke it
1370 int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h;
1371 if (isExceptionalStatus(s = trySetThrown(ex)) &&
1372 (h = ((t = Thread.currentThread()).
1373 getUncaughtExceptionHandler())) != null) {
1374 try {
1375 h.uncaughtException(t, ex);
1376 } catch (Throwable ignore) {
1377 }
1378 }
1379 return s;
1380 }
1381 private static final long serialVersionUID = 5232453952276885070L;
1382 }
1383
1384 /**
1385 * Adapter for Callables.
1386 */
1387 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1388 implements RunnableFuture<T> {
1389 @SuppressWarnings("serial") // Conditionally serializable
1390 final Callable<? extends T> callable;
1391 @SuppressWarnings("serial") // Conditionally serializable
1392 T result;
1393 AdaptedCallable(Callable<? extends T> callable) {
1394 if (callable == null) throw new NullPointerException();
1395 this.callable = callable;
1396 }
1397 public final T getRawResult() { return result; }
1398 public final void setRawResult(T v) { result = v; }
1399 public final boolean exec() {
1400 try {
1401 result = callable.call();
1402 return true;
1403 } catch (RuntimeException rex) {
1404 throw rex;
1405 } catch (Exception ex) {
1406 throw new RuntimeException(ex);
1407 }
1408 }
1409 public final void run() { invoke(); }
1410 public String toString() {
1411 return super.toString() + "[Wrapped task = " + callable + "]";
1412 }
1413 private static final long serialVersionUID = 2838392045355241008L;
1414 }
1415
1416 static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
1417 implements RunnableFuture<T> {
1418 @SuppressWarnings("serial") // Conditionally serializable
1419 final Callable<? extends T> callable;
1420 transient volatile Thread runner;
1421 @SuppressWarnings("serial") // Conditionally serializable
1422 T result;
1423 AdaptedInterruptibleCallable(Callable<? extends T> callable) {
1424 if (callable == null) throw new NullPointerException();
1425 this.callable = callable;
1426 }
1427 public final T getRawResult() { return result; }
1428 public final void setRawResult(T v) { result = v; }
1429 public final boolean exec() {
1430 Thread.interrupted();
1431 runner = Thread.currentThread();
1432 try {
1433 if (!isDone()) // recheck
1434 result = callable.call();
1435 return true;
1436 } catch (RuntimeException rex) {
1437 throw rex;
1438 } catch (Exception ex) {
1439 throw new RuntimeException(ex);
1440 } finally {
1441 runner = null;
1442 Thread.interrupted();
1443 }
1444 }
1445 public final void run() { invoke(); }
1446 public final boolean cancel(boolean mayInterruptIfRunning) {
1447 Thread t;
1448 boolean stat = super.cancel(false);
1449 if (mayInterruptIfRunning && (t = runner) != null) {
1450 try {
1451 t.interrupt();
1452 } catch (Throwable ignore) {
1453 }
1454 }
1455 return stat;
1456 }
1457 public String toString() {
1458 return super.toString() + "[Wrapped task = " + callable + "]";
1459 }
1460 private static final long serialVersionUID = 2838392045355241008L;
1461 }
1462
1463 /**
1464 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1465 * method of the given {@code Runnable} as its action, and returns
1466 * a null result upon {@link #join}.
1467 *
1468 * @param runnable the runnable action
1469 * @return the task
1470 */
1471 public static ForkJoinTask<?> adapt(Runnable runnable) {
1472 return new AdaptedRunnableAction(runnable);
1473 }
1474
1475 /**
1476 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1477 * method of the given {@code Runnable} as its action, and returns
1478 * the given result upon {@link #join}.
1479 *
1480 * @param runnable the runnable action
1481 * @param result the result upon completion
1482 * @param <T> the type of the result
1483 * @return the task
1484 */
1485 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1486 return new AdaptedRunnable<T>(runnable, result);
1487 }
1488
1489 /**
1490 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1491 * method of the given {@code Callable} as its action, and returns
1492 * its result upon {@link #join}, translating any checked exceptions
1493 * encountered into {@code RuntimeException}.
1494 *
1495 * @param callable the callable action
1496 * @param <T> the type of the callable's result
1497 * @return the task
1498 */
1499 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1500 return new AdaptedCallable<T>(callable);
1501 }
1502
1503 /**
1504 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1505 * method of the given {@code Callable} as its action, and returns
1506 * its result upon {@link #join}, translating any checked exceptions
1507 * encountered into {@code RuntimeException}. Additionally,
1508 * invocations of {@code cancel} with {@code mayInterruptIfRunning
1509 * true} will attempt to interrupt the thread performing the task.
1510 *
1511 * @param callable the callable action
1512 * @param <T> the type of the callable's result
1513 * @return the task
1514 *
1515 * @since 17
1516 */
1517 // adaptInterruptible deferred to its own independent change
1518 // https://bugs.openjdk.java.net/browse/JDK-8246587
1519 /* TODO: public */ private static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1520 return new AdaptedInterruptibleCallable<T>(callable);
1521 }
1522
1523 // Serialization support
1524
1525 private static final long serialVersionUID = -7721805057305804111L;
1526
1527 /**
1528 * Saves this task to a stream (that is, serializes it).
1529 *
1530 * @param s the stream
1531 * @throws java.io.IOException if an I/O error occurs
1532 * @serialData the current run status and the exception thrown
1533 * during execution, or {@code null} if none
1534 */
1535 private void writeObject(java.io.ObjectOutputStream s)
1536 throws java.io.IOException {
1537 Aux a;
1538 s.defaultWriteObject();
1539 s.writeObject((a = aux) == null ? null : a.ex);
1540 }
1541
1542 /**
1543 * Reconstitutes this task from a stream (that is, deserializes it).
1544 * @param s the stream
1545 * @throws ClassNotFoundException if the class of a serialized object
1546 * could not be found
1547 * @throws java.io.IOException if an I/O error occurs
1548 */
1549 private void readObject(java.io.ObjectInputStream s)
1550 throws java.io.IOException, ClassNotFoundException {
1551 s.defaultReadObject();
1552 Object ex = s.readObject();
1553 if (ex != null)
1554 trySetThrown((Throwable)ex);
1555 }
1556
1557 static {
1558 U = Unsafe.getUnsafe();
1559 STATUS = U.objectFieldOffset(ForkJoinTask.class, "status");
1560 AUX = U.objectFieldOffset(ForkJoinTask.class, "aux");
1561 }
1562
1563 }