ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
Revision: 1.130
Committed: Mon Feb 3 21:07:49 2020 UTC (4 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.129: +11 -0 lines
Log Message:
fix termination; add documentation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.io.Serializable;
10 import java.lang.invoke.MethodHandles;
11 import java.lang.invoke.VarHandle;
12 import java.lang.ref.ReferenceQueue;
13 import java.lang.ref.WeakReference;
14 import java.lang.reflect.Constructor;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.RandomAccess;
18 import java.util.concurrent.locks.LockSupport;
19
20 /**
21 * Abstract base class for tasks that run within a {@link ForkJoinPool}.
22 * A {@code ForkJoinTask} is a thread-like entity that is much
23 * lighter weight than a normal thread. Huge numbers of tasks and
24 * subtasks may be hosted by a small number of actual threads in a
25 * ForkJoinPool, at the price of some usage limitations.
26 *
27 * <p>A "main" {@code ForkJoinTask} begins execution when it is
28 * explicitly submitted to a {@link ForkJoinPool}, or, if not already
29 * engaged in a ForkJoin computation, commenced in the {@link
30 * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
31 * related methods. Once started, it will usually in turn start other
32 * subtasks. As indicated by the name of this class, many programs
33 * using {@code ForkJoinTask} employ only methods {@link #fork} and
34 * {@link #join}, or derivatives such as {@link
35 * #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
36 * provides a number of other methods that can come into play in
37 * advanced usages, as well as extension mechanics that allow support
38 * of new forms of fork/join processing.
39 *
40 * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
41 * The efficiency of {@code ForkJoinTask}s stems from a set of
42 * restrictions (that are only partially statically enforceable)
43 * reflecting their main use as computational tasks calculating pure
44 * functions or operating on purely isolated objects. The primary
45 * coordination mechanisms are {@link #fork}, that arranges
46 * asynchronous execution, and {@link #join}, that doesn't proceed
47 * until the task's result has been computed. Computations should
48 * ideally avoid {@code synchronized} methods or blocks, and should
49 * minimize other blocking synchronization apart from joining other
50 * tasks or using synchronizers such as Phasers that are advertised to
51 * cooperate with fork/join scheduling. Subdividable tasks should also
52 * not perform blocking I/O, and should ideally access variables that
53 * are completely independent of those accessed by other running
54 * tasks. These guidelines are loosely enforced by not permitting
55 * checked exceptions such as {@code IOExceptions} to be
56 * thrown. However, computations may still encounter unchecked
57 * exceptions, that are rethrown to callers attempting to join
58 * them. These exceptions may additionally include {@link
59 * RejectedExecutionException} stemming from internal resource
60 * exhaustion, such as failure to allocate internal task
61 * queues. Rethrown exceptions behave in the same way as regular
62 * exceptions, but, when possible, contain stack traces (as displayed
63 * for example using {@code ex.printStackTrace()}) of both the thread
64 * that initiated the computation as well as the thread actually
65 * encountering the exception; minimally only the latter.
66 *
67 * <p>It is possible to define and use ForkJoinTasks that may block,
68 * but doing so requires three further considerations: (1) Completion
69 * of few if any <em>other</em> tasks should be dependent on a task
70 * that blocks on external synchronization or I/O. Event-style async
71 * tasks that are never joined (for example, those subclassing {@link
72 * CountedCompleter}) often fall into this category. (2) To minimize
73 * resource impact, tasks should be small; ideally performing only the
74 * (possibly) blocking action. (3) Unless the {@link
75 * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
76 * blocked tasks is known to be less than the pool's {@link
77 * ForkJoinPool#getParallelism} level, the pool cannot guarantee that
78 * enough threads will be available to ensure progress or good
79 * performance.
80 *
81 * <p>The primary method for awaiting completion and extracting
82 * results of a task is {@link #join}, but there are several variants:
83 * The {@link Future#get} methods support interruptible and/or timed
84 * waits for completion and report results using {@code Future}
85 * conventions. Method {@link #invoke} is semantically
86 * equivalent to {@code fork(); join()} but always attempts to begin
87 * execution in the current thread. The "<em>quiet</em>" forms of
88 * these methods do not extract results or report exceptions. These
89 * may be useful when a set of tasks are being executed, and you need
90 * to delay processing of results or exceptions until all complete.
91 * Method {@code invokeAll} (available in multiple versions)
92 * performs the most common form of parallel invocation: forking a set
93 * of tasks and joining them all.
94 *
95 * <p>In the most typical usages, a fork-join pair act like a call
96 * (fork) and return (join) from a parallel recursive function. As is
97 * the case with other forms of recursive calls, returns (joins)
98 * should be performed innermost-first. For example, {@code a.fork();
99 * b.fork(); b.join(); a.join();} is likely to be substantially more
100 * efficient than joining {@code a} before {@code b}.
101 *
102 * <p>The execution status of tasks may be queried at several levels
103 * of detail: {@link #isDone} is true if a task completed in any way
104 * (including the case where a task was cancelled without executing);
105 * {@link #isCompletedNormally} is true if a task completed without
106 * cancellation or encountering an exception; {@link #isCancelled} is
107 * true if the task was cancelled (in which case {@link #getException}
108 * returns a {@link CancellationException}); and
109 * {@link #isCompletedAbnormally} is true if a task was either
110 * cancelled or encountered an exception, in which case {@link
111 * #getException} will return either the encountered exception or
112 * {@link CancellationException}.
113 *
114 * <p>By default, method {@link #cancel} ignores its {@code
115 * mayInterruptIfRunning} argument, to separate task cancellation from
116 * thread status. However, the method is overridable. An adaptor
117 * (@link #adaptInterruptible) for Callables does so by tracking and
118 * interrupting the running thread upon {@code cancel(true)}. Usage
119 * requires care. A late interrupt issued by another thread after the
120 * task has completed may (inadvertently) interrupt some future task.
121 * When using interruptible tasks, method bodies of all task code
122 * should ignore stray interrupts. When applicable, {@code
123 * isCancelled} or {@code isDone} can be used to distinguish cases.
124 *
125 * <p>The ForkJoinTask class is not usually directly subclassed.
126 * Instead, you subclass one of the abstract classes that support a
127 * particular style of fork/join processing, typically {@link
128 * RecursiveAction} for most computations that do not return results,
129 * {@link RecursiveTask} for those that do, and {@link
130 * CountedCompleter} for those in which completed actions trigger
131 * other actions. Normally, a concrete ForkJoinTask subclass declares
132 * fields comprising its parameters, established in a constructor, and
133 * then defines a {@code compute} method that somehow uses the control
134 * methods supplied by this base class.
135 *
136 * <p>Method {@link #join} and its variants are appropriate for use
137 * only when completion dependencies are acyclic; that is, the
138 * parallel computation can be described as a directed acyclic graph
139 * (DAG). Otherwise, executions may encounter a form of deadlock as
140 * tasks cyclically wait for each other. However, this framework
141 * supports other methods and techniques (for example the use of
142 * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
143 * may be of use in constructing custom subclasses for problems that
144 * are not statically structured as DAGs. To support such usages, a
145 * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
146 * value using {@link #setForkJoinTaskTag} or {@link
147 * #compareAndSetForkJoinTaskTag} and checked using {@link
148 * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
149 * these {@code protected} methods or tags for any purpose, but they
150 * may be of use in the construction of specialized subclasses. For
151 * example, parallel graph traversals can use the supplied methods to
152 * avoid revisiting nodes/tasks that have already been processed.
153 * (Method names for tagging are bulky in part to encourage definition
154 * of methods that reflect their usage patterns.)
155 *
156 * <p>Most base support methods are {@code final}, to prevent
157 * overriding of implementations that are intrinsically tied to the
158 * underlying lightweight task scheduling framework. Developers
159 * creating new basic styles of fork/join processing should minimally
160 * implement {@code protected} methods {@link #exec}, {@link
161 * #setRawResult}, and {@link #getRawResult}, while also introducing
162 * an abstract computational method that can be implemented in its
163 * subclasses, possibly relying on other {@code protected} methods
164 * provided by this class.
165 *
166 * <p>ForkJoinTasks should perform relatively small amounts of
167 * computation. Large tasks should be split into smaller subtasks,
168 * usually via recursive decomposition. As a very rough rule of thumb,
169 * a task should perform more than 100 and less than 10000 basic
170 * computational steps, and should avoid indefinite looping. If tasks
171 * are too big, then parallelism cannot improve throughput. If too
172 * small, then memory and internal task maintenance overhead may
173 * overwhelm processing.
174 *
175 * <p>This class provides {@code adapt} methods for {@link Runnable}
176 * and {@link Callable}, that may be of use when mixing execution of
177 * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
178 * of this form, consider using a pool constructed in <em>asyncMode</em>.
179 *
180 * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
181 * used in extensions such as remote execution frameworks. It is
182 * sensible to serialize tasks only before or after, but not during,
183 * execution. Serialization is not relied on during execution itself.
184 *
185 * @since 1.7
186 * @author Doug Lea
187 */
188 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
189
190 /*
191 * See the internal documentation of class ForkJoinPool for a
192 * general implementation overview. ForkJoinTasks are mainly
193 * responsible for maintaining their "status" field amidst relays
194 * to methods in ForkJoinWorkerThread and ForkJoinPool.
195 *
196 * The methods of this class are more-or-less layered into
197 * (1) basic status maintenance
198 * (2) execution and awaiting completion
199 * (3) user-level methods that additionally report results.
200 * This is sometimes hard to see because this file orders exported
201 * methods in a way that flows well in javadocs.
202 *
203 * Revision notes: The use of "Aux" field replaces previous
204 * reliance on a table to hold exceptions and synchronized blocks
205 * and monitors to wait for completion.
206 */
207
208 /**
209 * Nodes for threads waiting for completion, or holding a thrown
210 * exception (never both). Waiting threads prepend nodes
211 * Treiber-stack-style. Signallers detach and unpark
212 * waiters. Cancelled waiters try to unsplice.
213 */
214 static final class Aux {
215 final Thread thread;
216 final Throwable ex; // null if a waiter
217 Aux next; // accessed only via memory-acquire chains
218 Aux(Thread thread, Throwable ex) {
219 this.thread = thread;
220 this.ex = ex;
221 }
222 final boolean casNext(Aux c, Aux v) { // used only in cancellation
223 return NEXT.compareAndSet(this, c, v);
224 }
225 private static final VarHandle NEXT;
226 static {
227 try {
228 NEXT = MethodHandles.lookup()
229 .findVarHandle(Aux.class, "next", Aux.class);
230 } catch (ReflectiveOperationException e) {
231 throw new ExceptionInInitializerError(e);
232 }
233 }
234 }
235
236 /*
237 * The status field holds bits packed into a single int to ensure
238 * atomicity. Status is initially zero, and takes on nonnegative
239 * values until completed, upon which it holds (sign bit) DONE,
240 * possibly with ABNORMAL (cancelled or exceptional) and THROWN
241 * (in which case an exception has been stored). A value of
242 * ABNORMAL without DONE signifies an interrupted wait. These
243 * control bits occupy only (some of) the upper half (16 bits) of
244 * status field. The lower bits are used for user-defined tags.
245 */
246 private static final int DONE = 1 << 31; // must be negative
247 private static final int ABNORMAL = 1 << 16;
248 private static final int THROWN = 1 << 17;
249 private static final int SMASK = 0xffff; // short bits for tags
250 // sentinels can be any positive upper half value:
251 static final int ADJUST = 1 << 16; // uncompensate after block
252
253 // Fields
254 volatile int status; // accessed directly by pool and workers
255 private transient volatile Aux aux; // either waiters or thrown Exception
256
257 // Support for atomic operations
258 private static final VarHandle STATUS;
259 private static final VarHandle AUX;
260 private int getAndBitwiseOrStatus(int v) {
261 return (int)STATUS.getAndBitwiseOr(this, v);
262 }
263 private boolean casStatus(int c, int v) {
264 return STATUS.weakCompareAndSet(this, c, v);
265 }
266 private boolean casAux(Aux c, Aux v) {
267 return AUX.compareAndSet(this, c, v);
268 }
269
270 /** Removes and unparks waiters */
271 private void signalWaiters() {
272 for (Aux a; (a = aux) != null && a.ex == null; ) {
273 if (casAux(a, null)) { // detach entire list
274 for (Thread t; a != null; a = a.next) {
275 if ((t = a.thread) != Thread.currentThread() && t != null)
276 LockSupport.unpark(t); // don't self-signal
277 }
278 break;
279 }
280 }
281 }
282
283 /**
284 * Possibly blocks until task is done or interrupted or timed out.
285 *
286 * @param interruptible true if wait can be cancelled by interrupt
287 * @param deadline if non-zero use timed waits and possibly timeout
288 * @param adjust if true, uncompensate pool after unblocking
289 * @param pool if nonull, current pool (possibly comonPool if unknown)
290 * @return status on exit, or ABNORMAL if interrupted while waiting
291 */
292 private int awaitDone(boolean interruptible, long deadline, boolean adjust,
293 ForkJoinPool pool) {
294 int s; Aux node = null; boolean interrupted = false, queued = false;
295 long nanos = 0L;
296 try {
297 for (Aux a;;) {
298 if ((s = status) < 0)
299 break;
300 else if (node == null)
301 node = new Aux(Thread.currentThread(), null);
302 else if (!queued) {
303 if ((a = aux) != null && a.ex != null)
304 Thread.onSpinWait(); // exception in progress
305 else if (queued = casAux(node.next = a, node))
306 LockSupport.setCurrentBlocker(this);
307 }
308 else if (Thread.interrupted()) {
309 if (interruptible) {
310 s = ABNORMAL;
311 break;
312 }
313 interrupted = true;
314 }
315 else if (pool != null && pool.isStopping())
316 casStatus(s, s | (DONE | ABNORMAL)); // help cancel
317 else if (deadline != 0L &&
318 (nanos = deadline - System.nanoTime()) <= 0L)
319 break; // timeout
320 else if ((s = status) < 0)
321 break; // recheck
322 else if (nanos > 0L)
323 LockSupport.parkNanos(nanos);
324 else
325 LockSupport.park();
326 }
327 } finally {
328 if (adjust && pool != null)
329 pool.uncompensate();
330 }
331 if (queued) {
332 LockSupport.setCurrentBlocker(null);
333 if (s >= 0) { // try to unsplice after cancellation
334 outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
335 for (Aux trail = null;;) {
336 Aux next = a.next;
337 if (a == node) {
338 if (trail != null)
339 trail.casNext(trail, next);
340 else if (casAux(a, next))
341 break outer; // cannot be re-encountered
342 break; // restart
343 } else {
344 trail = a;
345 if ((a = next) == null)
346 break outer;
347 }
348 }
349 }
350 }
351 else {
352 signalWaiters(); // help clean or signal
353 if (interrupted)
354 Thread.currentThread().interrupt();
355 }
356 }
357 return s;
358 }
359
360 /**
361 * Sets DONE status and wakes up threads waiting to join this task.
362 * @return status on exit
363 */
364 private int setDone() {
365 int s = getAndBitwiseOrStatus(DONE) | DONE;
366 signalWaiters();
367 return s;
368 }
369
370 /**
371 * Sets ABNORMAL DONE status unless already done, and wakes up threads
372 * waiting to join this task.
373 * @return status on exit
374 */
375 private int trySetCancelled() {
376 int s;
377 do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
378 signalWaiters();
379 return s;
380 }
381
382 /**
383 * Records exception and sets ABNORMAL THROWN DONE status unless
384 * already done, and wakes up threads waiting to join this task.
385 * If losing a race with setDone or trySetCancelled, the exception
386 * may be recorded but not reported.
387 *
388 * @return status on exit
389 */
390 final int trySetThrown(Throwable ex) {
391 Aux h = new Aux(Thread.currentThread(), ex), p = null;
392 boolean installed = false;
393 int s;
394 while ((s = status) >= 0) {
395 Aux a;
396 if (!installed && ((a = aux) == null || a.ex == null) &&
397 (installed = casAux(a, h)))
398 p = a; // list of waiters replaced by h
399 if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
400 break;
401 }
402 for (; p != null; p = p.next)
403 LockSupport.unpark(p.thread);
404 return s;
405 }
406
407 /**
408 * Records exception unless already done. Overridable in subclasses.
409 *
410 * @return status on exit
411 */
412 int trySetException(Throwable ex) {
413 return trySetThrown(ex);
414 }
415
416 static boolean isExceptionalStatus(int s) { // needed by subclasses
417 return (s & THROWN) != 0;
418 }
419
420 /**
421 * Unless done, calls exec and records status if completed, but
422 * doesn't wait for completion otherwise.
423 *
424 * @return status on exit from this method
425 */
426 final int doExec() {
427 int s; boolean completed;
428 if ((s = status) >= 0) {
429 try {
430 completed = exec();
431 } catch (Throwable rex) {
432 s = trySetException(rex);
433 completed = false;
434 }
435 if (completed)
436 s = setDone();
437 }
438 return s;
439 }
440
441 /**
442 * Helps and/or waits for completion from join, or async invoke if ran true.
443 *
444 * @param ran true if task known to have been exec'd
445 * @return status on exit
446 */
447 private int awaitJoin(boolean ran) {
448 boolean adjust = false, owned;
449 Thread t; ForkJoinWorkerThread wt;
450 ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
451 if (owned = ((t = Thread.currentThread())
452 instanceof ForkJoinWorkerThread)) {
453 p = (wt = (ForkJoinWorkerThread)t).pool;
454 q = wt.workQueue;
455 }
456 else {
457 p = ForkJoinPool.common;
458 q = ForkJoinPool.commonQueue();
459 }
460 if (q != null && p != null) {
461 if ((this instanceof CountedCompleter) ?
462 (s = p.helpComplete(this, q, owned)) < 0 :
463 (!ran && q.tryRemove(this, owned) && (s = doExec()) < 0))
464 return s;
465 else if (owned) {
466 if ((s = p.helpJoin(this, q)) < 0)
467 return s;
468 else if (s == ADJUST)
469 adjust = true;
470 }
471 }
472 return awaitDone(false, 0L, adjust, p);
473 }
474
475 /**
476 * Helps and/or waits for completion from get.
477 *
478 * @param timed if true use timed wait
479 * @param nanos wait time
480 * @return status on exit, or ABNORMAL if interruptible and interrupted
481 */
482 private int awaitGet(boolean timed, long nanos) {
483 boolean adjust = false, owned;
484 Thread t; ForkJoinWorkerThread wt;
485 ForkJoinPool p; ForkJoinPool.WorkQueue q; int s; long deadline;
486 if (owned = ((t = Thread.currentThread())
487 instanceof ForkJoinWorkerThread)) {
488 p = (wt = (ForkJoinWorkerThread)t).pool;
489 q = wt.workQueue;
490 }
491 else if (!Thread.interrupted()) {
492 p = ForkJoinPool.common;
493 q = ForkJoinPool.commonQueue();
494 }
495 else
496 return ABNORMAL;
497 if (!timed)
498 deadline = 0L;
499 else if (nanos <= 0L)
500 return 0;
501 else if ((deadline = nanos + System.nanoTime()) == 0L)
502 deadline = 1L;
503 if (q != null && p != null) {
504 if ((!timed || p.isSaturated()) &&
505 ((this instanceof CountedCompleter) ?
506 (s = p.helpComplete(this, q, owned)) < 0 :
507 (q.tryRemove(this, owned) && (s = doExec()) < 0)))
508 return s;
509 else if (owned) {
510 if ((s = p.helpJoin(this, q)) < 0)
511 return s;
512 else if (s == ADJUST)
513 adjust = true;
514 }
515 }
516 return awaitDone(!owned, deadline, adjust, p);
517 }
518
519 /**
520 * Cancels, ignoring any exceptions thrown by cancel. Cancel is
521 * spec'ed not to throw any exceptions, but if it does anyway, we
522 * have no recourse, so guard against this case.
523 */
524 static final void cancelIgnoringExceptions(Future<?> t) {
525 if (t != null) {
526 try {
527 t.cancel(true);
528 } catch (Throwable ignore) {
529 }
530 }
531 }
532
533 /**
534 * Returns a rethrowable exception for this task, if available.
535 * To provide accurate stack traces, if the exception was not
536 * thrown by the current thread, we try to create a new exception
537 * of the same type as the one thrown, but with the recorded
538 * exception as its cause. If there is no such constructor, we
539 * instead try to use a no-arg constructor, followed by initCause,
540 * to the same effect. If none of these apply, or any fail due to
541 * other exceptions, we return the recorded exception, which is
542 * still correct, although it may contain a misleading stack
543 * trace.
544 *
545 * @return the exception, or null if none
546 */
547 private Throwable getThrowableException() {
548 Throwable ex; Aux a;
549 if ((a = aux) == null)
550 ex = null;
551 else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
552 try {
553 Constructor<?> noArgCtor = null, oneArgCtor = null;
554 for (Constructor<?> c : ex.getClass().getConstructors()) {
555 Class<?>[] ps = c.getParameterTypes();
556 if (ps.length == 0)
557 noArgCtor = c;
558 else if (ps.length == 1 && ps[0] == Throwable.class) {
559 oneArgCtor = c;
560 break;
561 }
562 }
563 if (oneArgCtor != null)
564 ex = (Throwable)oneArgCtor.newInstance(ex);
565 else if (noArgCtor != null) {
566 Throwable rx = (Throwable)noArgCtor.newInstance();
567 rx.initCause(ex);
568 ex = rx;
569 }
570 } catch (Exception ignore) {
571 }
572 }
573 return ex;
574 }
575
576 /**
577 * Returns exception associated with the given status, or null if none.
578 */
579 private Throwable getException(int s) {
580 Throwable ex = null;
581 if ((s & ABNORMAL) != 0 &&
582 ((s & THROWN) == 0 || (ex = getThrowableException()) == null))
583 ex = new CancellationException();
584 return ex;
585 }
586
587 /**
588 * Throws exception associated with the given status, or
589 * CancellationException if none recorded.
590 */
591 private void reportException(int s) {
592 ForkJoinTask.<RuntimeException>uncheckedThrow(
593 (s & THROWN) != 0 ? getThrowableException() : null);
594 }
595
596 /**
597 * Throws exception for (timed or untimed) get, wrapping if
598 * necessary in an ExecutionException.
599 */
600 private void reportExceptionForGet(int s) {
601 Throwable ex = null;
602 if (s == ABNORMAL)
603 ex = new InterruptedException();
604 else if (s >= 0)
605 ex = new TimeoutException();
606 else if ((s & THROWN) != 0 && (ex = getThrowableException()) != null)
607 ex = new ExecutionException(ex);
608 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
609 }
610
611 /**
612 * A version of "sneaky throw" to relay exceptions in other
613 * contexts.
614 */
615 static void rethrow(Throwable ex) {
616 ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
617 }
618
619 /**
620 * The sneaky part of sneaky throw, relying on generics
621 * limitations to evade compiler complaints about rethrowing
622 * unchecked exceptions. If argument null, throws
623 * CancellationException.
624 */
625 @SuppressWarnings("unchecked") static <T extends Throwable>
626 void uncheckedThrow(Throwable t) throws T {
627 if (t == null)
628 t = new CancellationException();
629 throw (T)t; // rely on vacuous cast
630 }
631
632 // public methods
633
634 /**
635 * Arranges to asynchronously execute this task in the pool the
636 * current task is running in, if applicable, or using the {@link
637 * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
638 * it is not necessarily enforced, it is a usage error to fork a
639 * task more than once unless it has completed and been
640 * reinitialized. Subsequent modifications to the state of this
641 * task or any data it operates on are not necessarily
642 * consistently observable by any thread other than the one
643 * executing it unless preceded by a call to {@link #join} or
644 * related methods, or a call to {@link #isDone} returning {@code
645 * true}.
646 *
647 * @return {@code this}, to simplify usage
648 */
649 public final ForkJoinTask<V> fork() {
650 Thread t; ForkJoinWorkerThread w;
651 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
652 (w = (ForkJoinWorkerThread)t).workQueue.push(this, w.pool);
653 else
654 ForkJoinPool.common.externalPush(this);
655 return this;
656 }
657
658 /**
659 * Returns the result of the computation when it
660 * {@linkplain #isDone is done}.
661 * This method differs from {@link #get()} in that abnormal
662 * completion results in {@code RuntimeException} or {@code Error},
663 * not {@code ExecutionException}, and that interrupts of the
664 * calling thread do <em>not</em> cause the method to abruptly
665 * return by throwing {@code InterruptedException}.
666 *
667 * @return the computed result
668 */
669 public final V join() {
670 int s;
671 if ((s = status) >= 0)
672 s = awaitJoin(false);
673 if ((s & ABNORMAL) != 0)
674 reportException(s);
675 return getRawResult();
676 }
677
678 /**
679 * Commences performing this task, awaits its completion if
680 * necessary, and returns its result, or throws an (unchecked)
681 * {@code RuntimeException} or {@code Error} if the underlying
682 * computation did so.
683 *
684 * @return the computed result
685 */
686 public final V invoke() {
687 int s;
688 if ((s = doExec()) >= 0)
689 s = awaitJoin(true);
690 if ((s & ABNORMAL) != 0)
691 reportException(s);
692 return getRawResult();
693 }
694
695 /**
696 * Forks the given tasks, returning when {@code isDone} holds for
697 * each task or an (unchecked) exception is encountered, in which
698 * case the exception is rethrown. If more than one task
699 * encounters an exception, then this method throws any one of
700 * these exceptions. If any task encounters an exception, the
701 * other may be cancelled. However, the execution status of
702 * individual tasks is not guaranteed upon exceptional return. The
703 * status of each task may be obtained using {@link
704 * #getException()} and related methods to check if they have been
705 * cancelled, completed normally or exceptionally, or left
706 * unprocessed.
707 *
708 * @param t1 the first task
709 * @param t2 the second task
710 * @throws NullPointerException if any task is null
711 */
712 public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
713 int s1, s2;
714 if (t1 == null || t2 == null)
715 throw new NullPointerException();
716 t2.fork();
717 if ((s1 = t1.doExec()) >= 0)
718 s1 = t1.awaitJoin(true);
719 if ((s1 & ABNORMAL) != 0) {
720 cancelIgnoringExceptions(t2);
721 t1.reportException(s1);
722 }
723 else {
724 if ((s2 = t2.status) >= 0)
725 s2 = t2.awaitJoin(false);
726 if ((s2 & ABNORMAL) != 0)
727 t2.reportException(s2);
728 }
729 }
730
731 /**
732 * Forks the given tasks, returning when {@code isDone} holds for
733 * each task or an (unchecked) exception is encountered, in which
734 * case the exception is rethrown. If more than one task
735 * encounters an exception, then this method throws any one of
736 * these exceptions. If any task encounters an exception, others
737 * may be cancelled. However, the execution status of individual
738 * tasks is not guaranteed upon exceptional return. The status of
739 * each task may be obtained using {@link #getException()} and
740 * related methods to check if they have been cancelled, completed
741 * normally or exceptionally, or left unprocessed.
742 *
743 * @param tasks the tasks
744 * @throws NullPointerException if any task is null
745 */
746 public static void invokeAll(ForkJoinTask<?>... tasks) {
747 Throwable ex = null;
748 int last = tasks.length - 1;
749 for (int i = last, s; i >= 0; --i) {
750 ForkJoinTask<?> t;
751 if ((t = tasks[i]) == null) {
752 ex = new NullPointerException();
753 break;
754 }
755 if (i == 0) {
756 if ((s = t.doExec()) >= 0)
757 s = t.awaitJoin(true);
758 if ((s & ABNORMAL) != 0)
759 ex = t.getException(s);
760 break;
761 }
762 t.fork();
763 }
764 if (ex == null) {
765 for (int i = 1, s; i <= last; ++i) {
766 ForkJoinTask<?> t;
767 if ((t = tasks[i]) != null) {
768 if ((s = t.status) >= 0)
769 s = t.awaitJoin(false);
770 if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
771 break;
772 }
773 }
774 }
775 if (ex != null) {
776 for (int i = 1, s; i <= last; ++i)
777 cancelIgnoringExceptions(tasks[i]);
778 rethrow(ex);
779 }
780 }
781
782 /**
783 * Forks all tasks in the specified collection, returning when
784 * {@code isDone} holds for each task or an (unchecked) exception
785 * is encountered, in which case the exception is rethrown. If
786 * more than one task encounters an exception, then this method
787 * throws any one of these exceptions. If any task encounters an
788 * exception, others may be cancelled. However, the execution
789 * status of individual tasks is not guaranteed upon exceptional
790 * return. The status of each task may be obtained using {@link
791 * #getException()} and related methods to check if they have been
792 * cancelled, completed normally or exceptionally, or left
793 * unprocessed.
794 *
795 * @param tasks the collection of tasks
796 * @param <T> the type of the values returned from the tasks
797 * @return the tasks argument, to simplify usage
798 * @throws NullPointerException if tasks or any element are null
799 */
800 public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
801 if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
802 invokeAll(tasks.toArray(new ForkJoinTask<?>[0]));
803 return tasks;
804 }
805 @SuppressWarnings("unchecked")
806 List<? extends ForkJoinTask<?>> ts =
807 (List<? extends ForkJoinTask<?>>) tasks;
808 Throwable ex = null;
809 int last = ts.size() - 1; // nearly same as array version
810 for (int i = last, s; i >= 0; --i) {
811 ForkJoinTask<?> t;
812 if ((t = ts.get(i)) == null) {
813 ex = new NullPointerException();
814 break;
815 }
816 if (i == 0) {
817 if ((s = t.doExec()) >= 0)
818 s = t.awaitJoin(true);
819 if ((s & ABNORMAL) != 0)
820 ex = t.getException(s);
821 break;
822 }
823 t.fork();
824 }
825 if (ex == null) {
826 for (int i = 1, s; i <= last; ++i) {
827 ForkJoinTask<?> t;
828 if ((t = ts.get(i)) != null) {
829 if ((s = t.status) >= 0)
830 s = t.awaitJoin(false);
831 if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
832 break;
833 }
834 }
835 }
836 if (ex != null) {
837 for (int i = 1, s; i <= last; ++i)
838 cancelIgnoringExceptions(ts.get(i));
839 rethrow(ex);
840 }
841 return tasks;
842 }
843
844 /**
845 * Attempts to cancel execution of this task. This attempt will
846 * fail if the task has already completed or could not be
847 * cancelled for some other reason. If successful, and this task
848 * has not started when {@code cancel} is called, execution of
849 * this task is suppressed. After this method returns
850 * successfully, unless there is an intervening call to {@link
851 * #reinitialize}, subsequent calls to {@link #isCancelled},
852 * {@link #isDone}, and {@code cancel} will return {@code true}
853 * and calls to {@link #join} and related methods will result in
854 * {@code CancellationException}.
855 *
856 * <p>This method may be overridden in subclasses, but if so, must
857 * still ensure that these properties hold. In particular, the
858 * {@code cancel} method itself must not throw exceptions.
859 *
860 * <p>This method is designed to be invoked by <em>other</em>
861 * tasks. To terminate the current task, you can just return or
862 * throw an unchecked exception from its computation method, or
863 * invoke {@link #completeExceptionally(Throwable)}.
864 *
865 * @param mayInterruptIfRunning this value has no effect in the
866 * default implementation because interrupts are not used to
867 * control cancellation.
868 *
869 * @return {@code true} if this task is now cancelled
870 */
871 public boolean cancel(boolean mayInterruptIfRunning) {
872 return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
873 }
874
875 public final boolean isDone() {
876 return status < 0;
877 }
878
879 public final boolean isCancelled() {
880 return (status & (ABNORMAL | THROWN)) == ABNORMAL;
881 }
882
883 /**
884 * Returns {@code true} if this task threw an exception or was cancelled.
885 *
886 * @return {@code true} if this task threw an exception or was cancelled
887 */
888 public final boolean isCompletedAbnormally() {
889 return (status & ABNORMAL) != 0;
890 }
891
892 /**
893 * Returns {@code true} if this task completed without throwing an
894 * exception and was not cancelled.
895 *
896 * @return {@code true} if this task completed without throwing an
897 * exception and was not cancelled
898 */
899 public final boolean isCompletedNormally() {
900 return (status & (DONE | ABNORMAL)) == DONE;
901 }
902
903 /**
904 * Returns the exception thrown by the base computation, or a
905 * {@code CancellationException} if cancelled, or {@code null} if
906 * none or if the method has not yet completed.
907 *
908 * @return the exception, or {@code null} if none
909 */
910 public final Throwable getException() {
911 return getException(status);
912 }
913
914 /**
915 * Completes this task abnormally, and if not already aborted or
916 * cancelled, causes it to throw the given exception upon
917 * {@code join} and related operations. This method may be used
918 * to induce exceptions in asynchronous tasks, or to force
919 * completion of tasks that would not otherwise complete. Its use
920 * in other situations is discouraged. This method is
921 * overridable, but overridden versions must invoke {@code super}
922 * implementation to maintain guarantees.
923 *
924 * @param ex the exception to throw. If this exception is not a
925 * {@code RuntimeException} or {@code Error}, the actual exception
926 * thrown will be a {@code RuntimeException} with cause {@code ex}.
927 */
928 public void completeExceptionally(Throwable ex) {
929 trySetException((ex instanceof RuntimeException) ||
930 (ex instanceof Error) ? ex :
931 new RuntimeException(ex));
932 }
933
934 /**
935 * Completes this task, and if not already aborted or cancelled,
936 * returning the given value as the result of subsequent
937 * invocations of {@code join} and related operations. This method
938 * may be used to provide results for asynchronous tasks, or to
939 * provide alternative handling for tasks that would not otherwise
940 * complete normally. Its use in other situations is
941 * discouraged. This method is overridable, but overridden
942 * versions must invoke {@code super} implementation to maintain
943 * guarantees.
944 *
945 * @param value the result value for this task
946 */
947 public void complete(V value) {
948 try {
949 setRawResult(value);
950 } catch (Throwable rex) {
951 trySetException(rex);
952 return;
953 }
954 setDone();
955 }
956
957 /**
958 * Completes this task normally without setting a value. The most
959 * recent value established by {@link #setRawResult} (or {@code
960 * null} by default) will be returned as the result of subsequent
961 * invocations of {@code join} and related operations.
962 *
963 * @since 1.8
964 */
965 public final void quietlyComplete() {
966 setDone();
967 }
968
969 /**
970 * Waits if necessary for the computation to complete, and then
971 * retrieves its result.
972 *
973 * @return the computed result
974 * @throws CancellationException if the computation was cancelled
975 * @throws ExecutionException if the computation threw an
976 * exception
977 * @throws InterruptedException if the current thread is not a
978 * member of a ForkJoinPool and was interrupted while waiting
979 */
980 public final V get() throws InterruptedException, ExecutionException {
981 int s;
982 if ((s = status) >= 0)
983 s = awaitGet(false, 0L);
984 if ((s & ABNORMAL) != 0)
985 reportExceptionForGet(s);
986 return getRawResult();
987 }
988
989 /**
990 * Waits if necessary for at most the given time for the computation
991 * to complete, and then retrieves its result, if available.
992 *
993 * @param timeout the maximum time to wait
994 * @param unit the time unit of the timeout argument
995 * @return the computed result
996 * @throws CancellationException if the computation was cancelled
997 * @throws ExecutionException if the computation threw an
998 * exception
999 * @throws InterruptedException if the current thread is not a
1000 * member of a ForkJoinPool and was interrupted while waiting
1001 * @throws TimeoutException if the wait timed out
1002 */
1003 public final V get(long timeout, TimeUnit unit)
1004 throws InterruptedException, ExecutionException, TimeoutException {
1005 int s;
1006 long nanos = unit.toNanos(timeout);
1007 if ((s = status) >= 0)
1008 s = awaitGet(true, nanos);
1009 if (s >= 0 || (s & ABNORMAL) != 0)
1010 reportExceptionForGet(s);
1011 return getRawResult();
1012 }
1013
1014 /**
1015 * Joins this task, without returning its result or throwing its
1016 * exception. This method may be useful when processing
1017 * collections of tasks when some have been cancelled or otherwise
1018 * known to have aborted.
1019 */
1020 public final void quietlyJoin() {
1021 if (status >= 0)
1022 awaitJoin(false);
1023 }
1024
1025 /**
1026 * Commences performing this task and awaits its completion if
1027 * necessary, without returning its result or throwing its
1028 * exception.
1029 */
1030 public final void quietlyInvoke() {
1031 if (doExec() >= 0)
1032 awaitJoin(true);
1033 }
1034
1035 /**
1036 * Possibly executes tasks until the pool hosting the current task
1037 * {@linkplain ForkJoinPool#isQuiescent is quiescent}. This
1038 * method may be of use in designs in which many tasks are forked,
1039 * but none are explicitly joined, instead executing them until
1040 * all are processed.
1041 */
1042 public static void helpQuiesce() {
1043 Thread t; ForkJoinWorkerThread w; ForkJoinPool p;
1044 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
1045 (p = (w = (ForkJoinWorkerThread)t).pool) != null)
1046 p.helpQuiescePool(w.workQueue, Long.MAX_VALUE, false);
1047 else
1048 ForkJoinPool.common.externalHelpQuiescePool(Long.MAX_VALUE, false);
1049 }
1050
1051 /**
1052 * Resets the internal bookkeeping state of this task, allowing a
1053 * subsequent {@code fork}. This method allows repeated reuse of
1054 * this task, but only if reuse occurs when this task has either
1055 * never been forked, or has been forked, then completed and all
1056 * outstanding joins of this task have also completed. Effects
1057 * under any other usage conditions are not guaranteed.
1058 * This method may be useful when executing
1059 * pre-constructed trees of subtasks in loops.
1060 *
1061 * <p>Upon completion of this method, {@code isDone()} reports
1062 * {@code false}, and {@code getException()} reports {@code
1063 * null}. However, the value returned by {@code getRawResult} is
1064 * unaffected. To clear this value, you can invoke {@code
1065 * setRawResult(null)}.
1066 */
1067 public void reinitialize() {
1068 aux = null;
1069 status = 0;
1070 }
1071
1072 /**
1073 * Returns the pool hosting the current thread, or {@code null}
1074 * if the current thread is executing outside of any ForkJoinPool.
1075 *
1076 * <p>This method returns {@code null} if and only if {@link
1077 * #inForkJoinPool} returns {@code false}.
1078 *
1079 * @return the pool, or {@code null} if none
1080 */
1081 public static ForkJoinPool getPool() {
1082 Thread t;
1083 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1084 ((ForkJoinWorkerThread) t).pool : null);
1085 }
1086
1087 /**
1088 * Returns {@code true} if the current thread is a {@link
1089 * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
1090 *
1091 * @return {@code true} if the current thread is a {@link
1092 * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
1093 * or {@code false} otherwise
1094 */
1095 public static boolean inForkJoinPool() {
1096 return Thread.currentThread() instanceof ForkJoinWorkerThread;
1097 }
1098
1099 /**
1100 * Tries to unschedule this task for execution. This method will
1101 * typically (but is not guaranteed to) succeed if this task is
1102 * the most recently forked task by the current thread, and has
1103 * not commenced executing in another thread. This method may be
1104 * useful when arranging alternative local processing of tasks
1105 * that could have been, but were not, stolen.
1106 *
1107 * @return {@code true} if unforked
1108 */
1109 public boolean tryUnfork() {
1110 Thread t; boolean owned;
1111 ForkJoinPool.WorkQueue q = ((owned = (t = Thread.currentThread())
1112 instanceof ForkJoinWorkerThread) ?
1113 ((ForkJoinWorkerThread)t).workQueue :
1114 ForkJoinPool.commonQueue());
1115 return q != null && q.tryUnpush(this, owned);
1116 }
1117
1118 /**
1119 * Returns an estimate of the number of tasks that have been
1120 * forked by the current worker thread but not yet executed. This
1121 * value may be useful for heuristic decisions about whether to
1122 * fork other tasks.
1123 *
1124 * @return the number of tasks
1125 */
1126 public static int getQueuedTaskCount() {
1127 Thread t; ForkJoinPool.WorkQueue q;
1128 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1129 q = ((ForkJoinWorkerThread)t).workQueue;
1130 else
1131 q = ForkJoinPool.commonQueue();
1132 return (q == null) ? 0 : q.queueSize();
1133 }
1134
1135 /**
1136 * Returns an estimate of how many more locally queued tasks are
1137 * held by the current worker thread than there are other worker
1138 * threads that might steal them, or zero if this thread is not
1139 * operating in a ForkJoinPool. This value may be useful for
1140 * heuristic decisions about whether to fork other tasks. In many
1141 * usages of ForkJoinTasks, at steady state, each worker should
1142 * aim to maintain a small constant surplus (for example, 3) of
1143 * tasks, and to process computations locally if this threshold is
1144 * exceeded.
1145 *
1146 * @return the surplus number of tasks, which may be negative
1147 */
1148 public static int getSurplusQueuedTaskCount() {
1149 return ForkJoinPool.getSurplusQueuedTaskCount();
1150 }
1151
1152 // Extension methods
1153
1154 /**
1155 * Returns the result that would be returned by {@link #join}, even
1156 * if this task completed abnormally, or {@code null} if this task
1157 * is not known to have been completed. This method is designed
1158 * to aid debugging, as well as to support extensions. Its use in
1159 * any other context is discouraged.
1160 *
1161 * @return the result, or {@code null} if not completed
1162 */
1163 public abstract V getRawResult();
1164
1165 /**
1166 * Forces the given value to be returned as a result. This method
1167 * is designed to support extensions, and should not in general be
1168 * called otherwise.
1169 *
1170 * @param value the value
1171 */
1172 protected abstract void setRawResult(V value);
1173
1174 /**
1175 * Immediately performs the base action of this task and returns
1176 * true if, upon return from this method, this task is guaranteed
1177 * to have completed. This method may return false otherwise, to
1178 * indicate that this task is not necessarily complete (or is not
1179 * known to be complete), for example in asynchronous actions that
1180 * require explicit invocations of completion methods. This method
1181 * may also throw an (unchecked) exception to indicate abnormal
1182 * exit. This method is designed to support extensions, and should
1183 * not in general be called otherwise.
1184 *
1185 * @return {@code true} if this task is known to have completed normally
1186 */
1187 protected abstract boolean exec();
1188
1189 /**
1190 * Returns, but does not unschedule or execute, a task queued by
1191 * the current thread but not yet executed, if one is immediately
1192 * available. There is no guarantee that this task will actually
1193 * be polled or executed next. Conversely, this method may return
1194 * null even if a task exists but cannot be accessed without
1195 * contention with other threads. This method is designed
1196 * primarily to support extensions, and is unlikely to be useful
1197 * otherwise.
1198 *
1199 * @return the next task, or {@code null} if none are available
1200 */
1201 protected static ForkJoinTask<?> peekNextLocalTask() {
1202 Thread t; ForkJoinPool.WorkQueue q;
1203 if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1204 q = ((ForkJoinWorkerThread)t).workQueue;
1205 else
1206 q = ForkJoinPool.commonQueue();
1207 return (q == null) ? null : q.peek();
1208 }
1209
1210 /**
1211 * Unschedules and returns, without executing, the next task
1212 * queued by the current thread but not yet executed, if the
1213 * current thread is operating in a ForkJoinPool. This method is
1214 * designed primarily to support extensions, and is unlikely to be
1215 * useful otherwise.
1216 *
1217 * @return the next task, or {@code null} if none are available
1218 */
1219 protected static ForkJoinTask<?> pollNextLocalTask() {
1220 Thread t;
1221 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1222 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null);
1223 }
1224
1225 /**
1226 * If the current thread is operating in a ForkJoinPool,
1227 * unschedules and returns, without executing, the next task
1228 * queued by the current thread but not yet executed, if one is
1229 * available, or if not available, a task that was forked by some
1230 * other thread, if available. Availability may be transient, so a
1231 * {@code null} result does not necessarily imply quiescence of
1232 * the pool this task is operating in. This method is designed
1233 * primarily to support extensions, and is unlikely to be useful
1234 * otherwise.
1235 *
1236 * @return a task, or {@code null} if none are available
1237 */
1238 protected static ForkJoinTask<?> pollTask() {
1239 Thread t; ForkJoinWorkerThread w;
1240 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1241 (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) :
1242 null);
1243 }
1244
1245 /**
1246 * If the current thread is operating in a ForkJoinPool,
1247 * unschedules and returns, without executing, a task externally
1248 * submitted to the pool, if one is available. Availability may be
1249 * transient, so a {@code null} result does not necessarily imply
1250 * quiescence of the pool. This method is designed primarily to
1251 * support extensions, and is unlikely to be useful otherwise.
1252 *
1253 * @return a task, or {@code null} if none are available
1254 * @since 9
1255 */
1256 protected static ForkJoinTask<?> pollSubmission() {
1257 Thread t;
1258 return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1259 ((ForkJoinWorkerThread)t).pool.pollSubmission() : null);
1260 }
1261
1262 // tag operations
1263
1264 /**
1265 * Returns the tag for this task.
1266 *
1267 * @return the tag for this task
1268 * @since 1.8
1269 */
1270 public final short getForkJoinTaskTag() {
1271 return (short)status;
1272 }
1273
1274 /**
1275 * Atomically sets the tag value for this task and returns the old value.
1276 *
1277 * @param newValue the new tag value
1278 * @return the previous value of the tag
1279 * @since 1.8
1280 */
1281 public final short setForkJoinTaskTag(short newValue) {
1282 for (int s;;) {
1283 if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK)))
1284 return (short)s;
1285 }
1286 }
1287
1288 /**
1289 * Atomically conditionally sets the tag value for this task.
1290 * Among other applications, tags can be used as visit markers
1291 * in tasks operating on graphs, as in methods that check: {@code
1292 * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
1293 * before processing, otherwise exiting because the node has
1294 * already been visited.
1295 *
1296 * @param expect the expected tag value
1297 * @param update the new tag value
1298 * @return {@code true} if successful; i.e., the current value was
1299 * equal to {@code expect} and was changed to {@code update}.
1300 * @since 1.8
1301 */
1302 public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
1303 for (int s;;) {
1304 if ((short)(s = status) != expect)
1305 return false;
1306 if (casStatus(s, (s & ~SMASK) | (update & SMASK)))
1307 return true;
1308 }
1309 }
1310
1311 /**
1312 * Adapter for Runnables. This implements RunnableFuture
1313 * to be compliant with AbstractExecutorService constraints
1314 * when used in ForkJoinPool.
1315 */
1316 static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1317 implements RunnableFuture<T> {
1318 @SuppressWarnings("serial") // Conditionally serializable
1319 final Runnable runnable;
1320 @SuppressWarnings("serial") // Conditionally serializable
1321 T result;
1322 AdaptedRunnable(Runnable runnable, T result) {
1323 if (runnable == null) throw new NullPointerException();
1324 this.runnable = runnable;
1325 this.result = result; // OK to set this even before completion
1326 }
1327 public final T getRawResult() { return result; }
1328 public final void setRawResult(T v) { result = v; }
1329 public final boolean exec() { runnable.run(); return true; }
1330 public final void run() { invoke(); }
1331 public String toString() {
1332 return super.toString() + "[Wrapped task = " + runnable + "]";
1333 }
1334 private static final long serialVersionUID = 5232453952276885070L;
1335 }
1336
1337 /**
1338 * Adapter for Runnables without results.
1339 */
1340 static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1341 implements RunnableFuture<Void> {
1342 @SuppressWarnings("serial") // Conditionally serializable
1343 final Runnable runnable;
1344 AdaptedRunnableAction(Runnable runnable) {
1345 if (runnable == null) throw new NullPointerException();
1346 this.runnable = runnable;
1347 }
1348 public final Void getRawResult() { return null; }
1349 public final void setRawResult(Void v) { }
1350 public final boolean exec() { runnable.run(); return true; }
1351 public final void run() { invoke(); }
1352 public String toString() {
1353 return super.toString() + "[Wrapped task = " + runnable + "]";
1354 }
1355 private static final long serialVersionUID = 5232453952276885070L;
1356 }
1357
1358 /**
1359 * Adapter for Runnables in which failure forces worker exception.
1360 */
1361 static final class RunnableExecuteAction extends ForkJoinTask<Void> {
1362 @SuppressWarnings("serial") // Conditionally serializable
1363 final Runnable runnable;
1364 RunnableExecuteAction(Runnable runnable) {
1365 if (runnable == null) throw new NullPointerException();
1366 this.runnable = runnable;
1367 }
1368 public final Void getRawResult() { return null; }
1369 public final void setRawResult(Void v) { }
1370 public final boolean exec() { runnable.run(); return true; }
1371 int trySetException(Throwable ex) {
1372 int s; // if runnable has a handler, invoke it
1373 if (isExceptionalStatus(s = trySetThrown(ex)) &&
1374 runnable instanceof java.lang.Thread.UncaughtExceptionHandler) {
1375 try {
1376 ((java.lang.Thread.UncaughtExceptionHandler)runnable).
1377 uncaughtException(Thread.currentThread(), ex);
1378 } catch (Throwable ignore) {
1379 }
1380 }
1381 return s;
1382 }
1383 private static final long serialVersionUID = 5232453952276885070L;
1384 }
1385
1386 /**
1387 * Adapter for Callables.
1388 */
1389 static final class AdaptedCallable<T> extends ForkJoinTask<T>
1390 implements RunnableFuture<T> {
1391 @SuppressWarnings("serial") // Conditionally serializable
1392 final Callable<? extends T> callable;
1393 @SuppressWarnings("serial") // Conditionally serializable
1394 T result;
1395 AdaptedCallable(Callable<? extends T> callable) {
1396 if (callable == null) throw new NullPointerException();
1397 this.callable = callable;
1398 }
1399 public final T getRawResult() { return result; }
1400 public final void setRawResult(T v) { result = v; }
1401 public final boolean exec() {
1402 try {
1403 result = callable.call();
1404 return true;
1405 } catch (RuntimeException rex) {
1406 throw rex;
1407 } catch (Exception ex) {
1408 throw new RuntimeException(ex);
1409 }
1410 }
1411 public final void run() { invoke(); }
1412 public String toString() {
1413 return super.toString() + "[Wrapped task = " + callable + "]";
1414 }
1415 private static final long serialVersionUID = 2838392045355241008L;
1416 }
1417
1418 static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
1419 implements RunnableFuture<T> {
1420 @SuppressWarnings("serial") // Conditionally serializable
1421 final Callable<? extends T> callable;
1422 @SuppressWarnings("serial") // Conditionally serializable
1423 transient volatile Thread runner;
1424 T result;
1425 AdaptedInterruptibleCallable(Callable<? extends T> callable) {
1426 if (callable == null) throw new NullPointerException();
1427 this.callable = callable;
1428 }
1429 public final T getRawResult() { return result; }
1430 public final void setRawResult(T v) { result = v; }
1431 public final boolean exec() {
1432 Thread.interrupted();
1433 runner = Thread.currentThread();
1434 try {
1435 result = callable.call();
1436 return true;
1437 } catch (RuntimeException rex) {
1438 throw rex;
1439 } catch (Exception ex) {
1440 throw new RuntimeException(ex);
1441 } finally {
1442 runner = null;
1443 Thread.interrupted();
1444 }
1445 }
1446 public final void run() { invoke(); }
1447 public final boolean cancel(boolean mayInterruptIfRunning) {
1448 Thread t;
1449 boolean stat = super.cancel(false);
1450 if (mayInterruptIfRunning && (t = runner) != null) {
1451 try {
1452 t.interrupt();
1453 } catch (Throwable ignore) {
1454 }
1455 }
1456 return stat;
1457 }
1458 public String toString() {
1459 return super.toString() + "[Wrapped task = " + callable + "]";
1460 }
1461 private static final long serialVersionUID = 2838392045355241008L;
1462 }
1463
1464 /**
1465 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1466 * method of the given {@code Runnable} as its action, and returns
1467 * a null result upon {@link #join}.
1468 *
1469 * @param runnable the runnable action
1470 * @return the task
1471 */
1472 public static ForkJoinTask<?> adapt(Runnable runnable) {
1473 return new AdaptedRunnableAction(runnable);
1474 }
1475
1476 /**
1477 * Returns a new {@code ForkJoinTask} that performs the {@code run}
1478 * method of the given {@code Runnable} as its action, and returns
1479 * the given result upon {@link #join}.
1480 *
1481 * @param runnable the runnable action
1482 * @param result the result upon completion
1483 * @param <T> the type of the result
1484 * @return the task
1485 */
1486 public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1487 return new AdaptedRunnable<T>(runnable, result);
1488 }
1489
1490 /**
1491 * Returns a new {@code ForkJoinTask} that performs the {@code call}
1492 * method of the given {@code Callable} as its action, and returns
1493 * its result upon {@link #join}, translating any checked exceptions
1494 * encountered into {@code RuntimeException}.
1495 *
1496 * @param callable the callable action
1497 * @param <T> the type of the callable's result
1498 * @return the task
1499 */
1500 public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1501 return new AdaptedCallable<T>(callable);
1502 }
1503
1504 /**
1505 * Returns a new {@code ForkJoinTask} that performs the {@code
1506 * call} method of the given {@code Callable} as its action, and
1507 * returns its result upon {@link #join}, translating any checked
1508 * exceptions encountered into {@code
1509 * RuntimeException}. Additionally, invocations of {@code cancel}
1510 * with {@code mayInterruptIfRunning true} will attempt to
1511 * interrupt the thread performing the task.
1512 *
1513 * @param callable the callable action
1514 * @param <T> the type of the callable's result
1515 * @return the task
1516 *
1517 * @since 1.15
1518 */
1519 public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1520 return new AdaptedInterruptibleCallable<T>(callable);
1521 }
1522
1523 // Serialization support
1524
1525 private static final long serialVersionUID = -7721805057305804111L;
1526
1527 /**
1528 * Saves this task to a stream (that is, serializes it).
1529 *
1530 * @param s the stream
1531 * @throws java.io.IOException if an I/O error occurs
1532 * @serialData the current run status and the exception thrown
1533 * during execution, or {@code null} if none
1534 */
1535 private void writeObject(java.io.ObjectOutputStream s)
1536 throws java.io.IOException {
1537 Aux a;
1538 s.defaultWriteObject();
1539 s.writeObject((a = aux) == null ? null : a.ex);
1540 }
1541
1542 /**
1543 * Reconstitutes this task from a stream (that is, deserializes it).
1544 * @param s the stream
1545 * @throws ClassNotFoundException if the class of a serialized object
1546 * could not be found
1547 * @throws java.io.IOException if an I/O error occurs
1548 */
1549 private void readObject(java.io.ObjectInputStream s)
1550 throws java.io.IOException, ClassNotFoundException {
1551 s.defaultReadObject();
1552 Object ex = s.readObject();
1553 if (ex != null)
1554 trySetThrown((Throwable)ex);
1555 }
1556
1557 static {
1558 try {
1559 MethodHandles.Lookup l = MethodHandles.lookup();
1560 STATUS = l.findVarHandle(ForkJoinTask.class, "status", int.class);
1561 AUX = l.findVarHandle(ForkJoinTask.class, "aux", Aux.class);
1562 } catch (ReflectiveOperationException e) {
1563 throw new ExceptionInInitializerError(e);
1564 }
1565 }
1566
1567 }