ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.16
Committed: Fri Jul 24 23:47:01 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.15: +23 -26 lines
Log Message:
Unsafe mechanics

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.io.Serializable;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12
13 /**
14 * Abstract base class for tasks that run within a {@link
15 * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
16 * lighter weight than a normal thread. Huge numbers of tasks and
17 * subtasks may be hosted by a small number of actual threads in a
18 * ForkJoinPool, at the price of some usage limitations.
19 *
20 * <p> A "main" ForkJoinTask begins execution when submitted to a
21 * {@link ForkJoinPool}. Once started, it will usually in turn start
22 * other subtasks. As indicated by the name of this class, many
23 * programs using ForkJoinTasks employ only methods {@code fork}
24 * and {@code join}, or derivatives such as
25 * {@code invokeAll}. However, this class also provides a number
26 * of other methods that can come into play in advanced usages, as
27 * well as extension mechanics that allow support of new forms of
28 * fork/join processing.
29 *
30 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
31 * efficiency of ForkJoinTasks stems from a set of restrictions (that
32 * are only partially statically enforceable) reflecting their
33 * intended use as computational tasks calculating pure functions or
34 * operating on purely isolated objects. The primary coordination
35 * mechanisms are {@link #fork}, that arranges asynchronous execution,
36 * and {@link #join}, that doesn't proceed until the task's result has
37 * been computed. Computations should avoid {@code synchronized}
38 * methods or blocks, and should minimize other blocking
39 * synchronization apart from joining other tasks or using
40 * synchronizers such as Phasers that are advertised to cooperate with
41 * fork/join scheduling. Tasks should also not perform blocking IO,
42 * and should ideally access variables that are completely independent
43 * of those accessed by other running tasks. Minor breaches of these
44 * restrictions, for example using shared output streams, may be
45 * tolerable in practice, but frequent use may result in poor
46 * performance, and the potential to indefinitely stall if the number
47 * of threads not waiting for IO or other external synchronization
48 * becomes exhausted. This usage restriction is in part enforced by
49 * not permitting checked exceptions such as {@code IOExceptions}
50 * to be thrown. However, computations may still encounter unchecked
51 * exceptions, that are rethrown to callers attempting join
52 * them. These exceptions may additionally include
53 * RejectedExecutionExceptions stemming from internal resource
54 * exhaustion such as failure to allocate internal task queues.
55 *
56 * <p>The primary method for awaiting completion and extracting
57 * results of a task is {@link #join}, but there are several variants:
58 * The {@link Future#get} methods support interruptible and/or timed
59 * waits for completion and report results using {@code Future}
60 * conventions. Method {@link #helpJoin} enables callers to actively
61 * execute other tasks while awaiting joins, which is sometimes more
62 * efficient but only applies when all subtasks are known to be
63 * strictly tree-structured. Method {@link #invoke} is semantically
64 * equivalent to {@code fork(); join()} but always attempts to
65 * begin execution in the current thread. The "<em>quiet</em>" forms
66 * of these methods do not extract results or report exceptions. These
67 * may be useful when a set of tasks are being executed, and you need
68 * to delay processing of results or exceptions until all complete.
69 * Method {@code invokeAll} (available in multiple versions)
70 * performs the most common form of parallel invocation: forking a set
71 * of tasks and joining them all.
72 *
73 * <p> The ForkJoinTask class is not usually directly subclassed.
74 * Instead, you subclass one of the abstract classes that support a
75 * particular style of fork/join processing. Normally, a concrete
76 * ForkJoinTask subclass declares fields comprising its parameters,
77 * established in a constructor, and then defines a {@code compute}
78 * method that somehow uses the control methods supplied by this base
79 * class. While these methods have {@code public} access (to allow
80 * instances of different task subclasses to call each others
81 * methods), some of them may only be called from within other
82 * ForkJoinTasks (as may be determined using method {@link
83 * #inForkJoinPool}). Attempts to invoke them in other contexts
84 * result in exceptions or errors, possibly including
85 * ClassCastException.
86 *
87 * <p>Most base support methods are {@code final} because their
88 * implementations are intrinsically tied to the underlying
89 * lightweight task scheduling framework, and so cannot be overridden.
90 * Developers creating new basic styles of fork/join processing should
91 * minimally implement {@code protected} methods
92 * {@code exec}, {@code setRawResult}, and
93 * {@code getRawResult}, while also introducing an abstract
94 * computational method that can be implemented in its subclasses,
95 * possibly relying on other {@code protected} methods provided
96 * by this class.
97 *
98 * <p>ForkJoinTasks should perform relatively small amounts of
99 * computations, otherwise splitting into smaller tasks. As a very
100 * rough rule of thumb, a task should perform more than 100 and less
101 * than 10000 basic computational steps. If tasks are too big, then
102 * parallelism cannot improve throughput. If too small, then memory
103 * and internal task maintenance overhead may overwhelm processing.
104 *
105 * <p>ForkJoinTasks are {@code Serializable}, which enables them
106 * to be used in extensions such as remote execution frameworks. It is
107 * in general sensible to serialize tasks only before or after, but
108 * not during execution. Serialization is not relied on during
109 * execution itself.
110 *
111 * @since 1.7
112 * @author Doug Lea
113 */
114 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
115
116 /**
117 * Run control status bits packed into a single int to minimize
118 * footprint and to ensure atomicity (via CAS). Status is
119 * initially zero, and takes on nonnegative values until
120 * completed, upon which status holds COMPLETED. CANCELLED, or
121 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
122 * blocking waits by other threads have SIGNAL_MASK bits set --
123 * bit 15 for external (nonFJ) waits, and the rest a count of
124 * waiting FJ threads. (This representation relies on
125 * ForkJoinPool max thread limits). Completion of a stolen task
126 * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
127 * though suboptimal for some purposes, we use basic builtin
128 * wait/notify to take advantage of "monitor inflation" in JVMs
129 * that we would otherwise need to emulate to avoid adding further
130 * per-task bookkeeping overhead. Note that bits 16-28 are
131 * currently unused. Also value 0x80000000 is available as spare
132 * completion value.
133 */
134 volatile int status; // accessed directly by pool and workers
135
136 static final int COMPLETION_MASK = 0xe0000000;
137 static final int NORMAL = 0xe0000000; // == mask
138 static final int CANCELLED = 0xc0000000;
139 static final int EXCEPTIONAL = 0xa0000000;
140 static final int SIGNAL_MASK = 0x0000ffff;
141 static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
142 static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
143
144 /**
145 * Table of exceptions thrown by tasks, to enable reporting by
146 * callers. Because exceptions are rare, we don't directly keep
147 * them with task objects, but instead use a weak ref table. Note
148 * that cancellation exceptions don't appear in the table, but are
149 * instead recorded as status values.
150 * TODO: Use ConcurrentReferenceHashMap
151 */
152 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
153 Collections.synchronizedMap
154 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
155
156 // within-package utilities
157
158 /**
159 * Gets current worker thread, or null if not a worker thread.
160 */
161 static ForkJoinWorkerThread getWorker() {
162 Thread t = Thread.currentThread();
163 return ((t instanceof ForkJoinWorkerThread) ?
164 (ForkJoinWorkerThread) t : null);
165 }
166
167 final boolean casStatus(int cmp, int val) {
168 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
169 }
170
171 /**
172 * Workaround for not being able to rethrow unchecked exceptions.
173 */
174 static void rethrowException(Throwable ex) {
175 if (ex != null)
176 UNSAFE.throwException(ex);
177 }
178
179 // Setting completion status
180
181 /**
182 * Marks completion and wakes up threads waiting to join this task.
183 *
184 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
185 */
186 final void setCompletion(int completion) {
187 ForkJoinPool pool = getPool();
188 if (pool != null) {
189 int s; // Clear signal bits while setting completion status
190 do {} while ((s = status) >= 0 && !casStatus(s, completion));
191
192 if ((s & SIGNAL_MASK) != 0) {
193 if ((s &= INTERNAL_SIGNAL_MASK) != 0)
194 pool.updateRunningCount(s);
195 synchronized (this) { notifyAll(); }
196 }
197 }
198 else
199 externallySetCompletion(completion);
200 }
201
202 /**
203 * Version of setCompletion for non-FJ threads. Leaves signal
204 * bits for unblocked threads to adjust, and always notifies.
205 */
206 private void externallySetCompletion(int completion) {
207 int s;
208 do {} while ((s = status) >= 0 &&
209 !casStatus(s, (s & SIGNAL_MASK) | completion));
210 synchronized (this) { notifyAll(); }
211 }
212
213 /**
214 * Sets status to indicate normal completion.
215 */
216 final void setNormalCompletion() {
217 // Try typical fast case -- single CAS, no signal, not already done.
218 // Manually expand casStatus to improve chances of inlining it
219 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
220 setCompletion(NORMAL);
221 }
222
223 // internal waiting and notification
224
225 /**
226 * Performs the actual monitor wait for awaitDone.
227 */
228 private void doAwaitDone() {
229 // Minimize lock bias and in/de-flation effects by maximizing
230 // chances of waiting inside sync
231 try {
232 while (status >= 0)
233 synchronized (this) { if (status >= 0) wait(); }
234 } catch (InterruptedException ie) {
235 onInterruptedWait();
236 }
237 }
238
239 /**
240 * Performs the actual timed monitor wait for awaitDone.
241 */
242 private void doAwaitDone(long startTime, long nanos) {
243 synchronized (this) {
244 try {
245 while (status >= 0) {
246 long nt = nanos - System.nanoTime() - startTime;
247 if (nt <= 0)
248 break;
249 wait(nt / 1000000, (int) (nt % 1000000));
250 }
251 } catch (InterruptedException ie) {
252 onInterruptedWait();
253 }
254 }
255 }
256
257 // Awaiting completion
258
259 /**
260 * Sets status to indicate there is joiner, then waits for join,
261 * surrounded with pool notifications.
262 *
263 * @return status upon exit
264 */
265 private int awaitDone(ForkJoinWorkerThread w,
266 boolean maintainParallelism) {
267 ForkJoinPool pool = (w == null) ? null : w.pool;
268 int s;
269 while ((s = status) >= 0) {
270 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
271 if (pool == null || !pool.preJoin(this, maintainParallelism))
272 doAwaitDone();
273 if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
274 adjustPoolCountsOnUnblock(pool);
275 break;
276 }
277 }
278 return s;
279 }
280
281 /**
282 * Timed version of awaitDone
283 *
284 * @return status upon exit
285 */
286 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
287 ForkJoinPool pool = (w == null) ? null : w.pool;
288 int s;
289 while ((s = status) >= 0) {
290 if (casStatus(s, (pool == null) ? s|EXTERNAL_SIGNAL : s+1)) {
291 long startTime = System.nanoTime();
292 if (pool == null || !pool.preJoin(this, false))
293 doAwaitDone(startTime, nanos);
294 if ((s = status) >= 0) {
295 adjustPoolCountsOnCancelledWait(pool);
296 s = status;
297 }
298 if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
299 adjustPoolCountsOnUnblock(pool);
300 break;
301 }
302 }
303 return s;
304 }
305
306 /**
307 * Notifies pool that thread is unblocked. Called by signalled
308 * threads when woken by non-FJ threads (which is atypical).
309 */
310 private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
311 int s;
312 do {} while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
313 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
314 pool.updateRunningCount(s);
315 }
316
317 /**
318 * Notifies pool to adjust counts on cancelled or timed out wait.
319 */
320 private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
321 if (pool != null) {
322 int s;
323 while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
324 if (casStatus(s, s - 1)) {
325 pool.updateRunningCount(1);
326 break;
327 }
328 }
329 }
330 }
331
332 /**
333 * Handles interruptions during waits.
334 */
335 private void onInterruptedWait() {
336 ForkJoinWorkerThread w = getWorker();
337 if (w == null)
338 Thread.currentThread().interrupt(); // re-interrupt
339 else if (w.isTerminating())
340 cancelIgnoringExceptions();
341 // else if FJworker, ignore interrupt
342 }
343
344 // Recording and reporting exceptions
345
346 private void setDoneExceptionally(Throwable rex) {
347 exceptionMap.put(this, rex);
348 setCompletion(EXCEPTIONAL);
349 }
350
351 /**
352 * Throws the exception associated with status s.
353 *
354 * @throws the exception
355 */
356 private void reportException(int s) {
357 if ((s &= COMPLETION_MASK) < NORMAL) {
358 if (s == CANCELLED)
359 throw new CancellationException();
360 else
361 rethrowException(exceptionMap.get(this));
362 }
363 }
364
365 /**
366 * Returns result or throws exception using j.u.c.Future conventions.
367 * Only call when {@code isDone} known to be true.
368 */
369 private V reportFutureResult()
370 throws ExecutionException, InterruptedException {
371 int s = status & COMPLETION_MASK;
372 if (s < NORMAL) {
373 Throwable ex;
374 if (s == CANCELLED)
375 throw new CancellationException();
376 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
377 throw new ExecutionException(ex);
378 if (Thread.interrupted())
379 throw new InterruptedException();
380 }
381 return getRawResult();
382 }
383
384 /**
385 * Returns result or throws exception using j.u.c.Future conventions
386 * with timeouts.
387 */
388 private V reportTimedFutureResult()
389 throws InterruptedException, ExecutionException, TimeoutException {
390 Throwable ex;
391 int s = status & COMPLETION_MASK;
392 if (s == NORMAL)
393 return getRawResult();
394 if (s == CANCELLED)
395 throw new CancellationException();
396 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
397 throw new ExecutionException(ex);
398 if (Thread.interrupted())
399 throw new InterruptedException();
400 throw new TimeoutException();
401 }
402
403 // internal execution methods
404
405 /**
406 * Calls exec, recording completion, and rethrowing exception if
407 * encountered. Caller should normally check status before calling.
408 *
409 * @return true if completed normally
410 */
411 private boolean tryExec() {
412 try { // try block must contain only call to exec
413 if (!exec())
414 return false;
415 } catch (Throwable rex) {
416 setDoneExceptionally(rex);
417 rethrowException(rex);
418 return false; // not reached
419 }
420 setNormalCompletion();
421 return true;
422 }
423
424 /**
425 * Main execution method used by worker threads. Invokes
426 * base computation unless already complete.
427 */
428 final void quietlyExec() {
429 if (status >= 0) {
430 try {
431 if (!exec())
432 return;
433 } catch (Throwable rex) {
434 setDoneExceptionally(rex);
435 return;
436 }
437 setNormalCompletion();
438 }
439 }
440
441 /**
442 * Calls exec(), recording but not rethrowing exception.
443 * Caller should normally check status before calling.
444 *
445 * @return true if completed normally
446 */
447 private boolean tryQuietlyInvoke() {
448 try {
449 if (!exec())
450 return false;
451 } catch (Throwable rex) {
452 setDoneExceptionally(rex);
453 return false;
454 }
455 setNormalCompletion();
456 return true;
457 }
458
459 /**
460 * Cancels, ignoring any exceptions it throws.
461 */
462 final void cancelIgnoringExceptions() {
463 try {
464 cancel(false);
465 } catch (Throwable ignore) {
466 }
467 }
468
469 /**
470 * Main implementation of helpJoin
471 */
472 private int busyJoin(ForkJoinWorkerThread w) {
473 int s;
474 ForkJoinTask<?> t;
475 while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
476 t.quietlyExec();
477 return (s >= 0) ? awaitDone(w, false) : s; // block if no work
478 }
479
480 // public methods
481
482 /**
483 * Arranges to asynchronously execute this task. While it is not
484 * necessarily enforced, it is a usage error to fork a task more
485 * than once unless it has completed and been reinitialized. This
486 * method may be invoked only from within ForkJoinTask
487 * computations (as may be determined using method {@link
488 * #inForkJoinPool}). Attempts to invoke in other contexts result
489 * in exceptions or errors, possibly including ClassCastException.
490 */
491 public final void fork() {
492 ((ForkJoinWorkerThread) Thread.currentThread())
493 .pushTask(this);
494 }
495
496 /**
497 * Returns the result of the computation when it is ready.
498 * This method differs from {@code get} in that abnormal
499 * completion results in RuntimeExceptions or Errors, not
500 * ExecutionExceptions.
501 *
502 * @return the computed result
503 */
504 public final V join() {
505 ForkJoinWorkerThread w = getWorker();
506 if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
507 reportException(awaitDone(w, true));
508 return getRawResult();
509 }
510
511 /**
512 * Commences performing this task, awaits its completion if
513 * necessary, and return its result.
514 *
515 * @throws Throwable (a RuntimeException, Error, or unchecked
516 * exception) if the underlying computation did so
517 * @return the computed result
518 */
519 public final V invoke() {
520 if (status >= 0 && tryExec())
521 return getRawResult();
522 else
523 return join();
524 }
525
526 /**
527 * Forks both tasks, returning when {@code isDone} holds for
528 * both of them or an exception is encountered. This method may be
529 * invoked only from within ForkJoinTask computations (as may be
530 * determined using method {@link #inForkJoinPool}). Attempts to
531 * invoke in other contexts result in exceptions or errors,
532 * possibly including ClassCastException.
533 *
534 * @param t1 one task
535 * @param t2 the other task
536 * @throws NullPointerException if t1 or t2 are null
537 * @throws RuntimeException or Error if either task did so
538 */
539 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
540 t2.fork();
541 t1.invoke();
542 t2.join();
543 }
544
545 /**
546 * Forks the given tasks, returning when {@code isDone} holds
547 * for all of them. If any task encounters an exception, others
548 * may be cancelled. This method may be invoked only from within
549 * ForkJoinTask computations (as may be determined using method
550 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
551 * result in exceptions or errors, possibly including
552 * ClassCastException.
553 *
554 * @param tasks the array of tasks
555 * @throws NullPointerException if tasks or any element are null
556 * @throws RuntimeException or Error if any task did so
557 */
558 public static void invokeAll(ForkJoinTask<?>... tasks) {
559 Throwable ex = null;
560 int last = tasks.length - 1;
561 for (int i = last; i >= 0; --i) {
562 ForkJoinTask<?> t = tasks[i];
563 if (t == null) {
564 if (ex == null)
565 ex = new NullPointerException();
566 }
567 else if (i != 0)
568 t.fork();
569 else {
570 t.quietlyInvoke();
571 if (ex == null)
572 ex = t.getException();
573 }
574 }
575 for (int i = 1; i <= last; ++i) {
576 ForkJoinTask<?> t = tasks[i];
577 if (t != null) {
578 if (ex != null)
579 t.cancel(false);
580 else {
581 t.quietlyJoin();
582 if (ex == null)
583 ex = t.getException();
584 }
585 }
586 }
587 if (ex != null)
588 rethrowException(ex);
589 }
590
591 /**
592 * Forks all tasks in the collection, returning when
593 * {@code isDone} holds for all of them. If any task
594 * encounters an exception, others may be cancelled. This method
595 * may be invoked only from within ForkJoinTask computations (as
596 * may be determined using method {@link
597 * #inForkJoinPool}). Attempts to invoke in other contexts result
598 * in exceptions or errors, possibly including ClassCastException.
599 *
600 * @param tasks the collection of tasks
601 * @throws NullPointerException if tasks or any element are null
602 * @throws RuntimeException or Error if any task did so
603 */
604 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
605 if (!(tasks instanceof List<?>)) {
606 invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
607 return;
608 }
609 @SuppressWarnings("unchecked")
610 List<? extends ForkJoinTask<?>> ts =
611 (List<? extends ForkJoinTask<?>>) tasks;
612 Throwable ex = null;
613 int last = ts.size() - 1;
614 for (int i = last; i >= 0; --i) {
615 ForkJoinTask<?> t = ts.get(i);
616 if (t == null) {
617 if (ex == null)
618 ex = new NullPointerException();
619 }
620 else if (i != 0)
621 t.fork();
622 else {
623 t.quietlyInvoke();
624 if (ex == null)
625 ex = t.getException();
626 }
627 }
628 for (int i = 1; i <= last; ++i) {
629 ForkJoinTask<?> t = ts.get(i);
630 if (t != null) {
631 if (ex != null)
632 t.cancel(false);
633 else {
634 t.quietlyJoin();
635 if (ex == null)
636 ex = t.getException();
637 }
638 }
639 }
640 if (ex != null)
641 rethrowException(ex);
642 }
643
644 /**
645 * Returns true if the computation performed by this task has
646 * completed (or has been cancelled).
647 *
648 * @return true if this computation has completed
649 */
650 public final boolean isDone() {
651 return status < 0;
652 }
653
654 /**
655 * Returns true if this task was cancelled.
656 *
657 * @return true if this task was cancelled
658 */
659 public final boolean isCancelled() {
660 return (status & COMPLETION_MASK) == CANCELLED;
661 }
662
663 /**
664 * Asserts that the results of this task's computation will not be
665 * used. If a cancellation occurs before attempting to execute this
666 * task, then execution will be suppressed, {@code isCancelled}
667 * will report true, and {@code join} will result in a
668 * {@code CancellationException} being thrown. Otherwise, when
669 * cancellation races with completion, there are no guarantees
670 * about whether {@code isCancelled} will report true, whether
671 * {@code join} will return normally or via an exception, or
672 * whether these behaviors will remain consistent upon repeated
673 * invocation.
674 *
675 * <p>This method may be overridden in subclasses, but if so, must
676 * still ensure that these minimal properties hold. In particular,
677 * the cancel method itself must not throw exceptions.
678 *
679 * <p> This method is designed to be invoked by <em>other</em>
680 * tasks. To terminate the current task, you can just return or
681 * throw an unchecked exception from its computation method, or
682 * invoke {@code completeExceptionally}.
683 *
684 * @param mayInterruptIfRunning this value is ignored in the
685 * default implementation because tasks are not in general
686 * cancelled via interruption
687 *
688 * @return true if this task is now cancelled
689 */
690 public boolean cancel(boolean mayInterruptIfRunning) {
691 setCompletion(CANCELLED);
692 return (status & COMPLETION_MASK) == CANCELLED;
693 }
694
695 /**
696 * Returns true if this task threw an exception or was cancelled.
697 *
698 * @return true if this task threw an exception or was cancelled
699 */
700 public final boolean isCompletedAbnormally() {
701 return (status & COMPLETION_MASK) < NORMAL;
702 }
703
704 /**
705 * Returns the exception thrown by the base computation, or a
706 * CancellationException if cancelled, or null if none or if the
707 * method has not yet completed.
708 *
709 * @return the exception, or null if none
710 */
711 public final Throwable getException() {
712 int s = status & COMPLETION_MASK;
713 if (s >= NORMAL)
714 return null;
715 if (s == CANCELLED)
716 return new CancellationException();
717 return exceptionMap.get(this);
718 }
719
720 /**
721 * Completes this task abnormally, and if not already aborted or
722 * cancelled, causes it to throw the given exception upon
723 * {@code join} and related operations. This method may be used
724 * to induce exceptions in asynchronous tasks, or to force
725 * completion of tasks that would not otherwise complete. Its use
726 * in other situations is likely to be wrong. This method is
727 * overridable, but overridden versions must invoke {@code super}
728 * implementation to maintain guarantees.
729 *
730 * @param ex the exception to throw. If this exception is
731 * not a RuntimeException or Error, the actual exception thrown
732 * will be a RuntimeException with cause ex.
733 */
734 public void completeExceptionally(Throwable ex) {
735 setDoneExceptionally((ex instanceof RuntimeException) ||
736 (ex instanceof Error) ? ex :
737 new RuntimeException(ex));
738 }
739
740 /**
741 * Completes this task, and if not already aborted or cancelled,
742 * returning a {@code null} result upon {@code join} and related
743 * operations. This method may be used to provide results for
744 * asynchronous tasks, or to provide alternative handling for
745 * tasks that would not otherwise complete normally. Its use in
746 * other situations is likely to be wrong. This method is
747 * overridable, but overridden versions must invoke {@code super}
748 * implementation to maintain guarantees.
749 *
750 * @param value the result value for this task
751 */
752 public void complete(V value) {
753 try {
754 setRawResult(value);
755 } catch (Throwable rex) {
756 setDoneExceptionally(rex);
757 return;
758 }
759 setNormalCompletion();
760 }
761
762 public final V get() throws InterruptedException, ExecutionException {
763 ForkJoinWorkerThread w = getWorker();
764 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
765 awaitDone(w, true);
766 return reportFutureResult();
767 }
768
769 public final V get(long timeout, TimeUnit unit)
770 throws InterruptedException, ExecutionException, TimeoutException {
771 ForkJoinWorkerThread w = getWorker();
772 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
773 awaitDone(w, unit.toNanos(timeout));
774 return reportTimedFutureResult();
775 }
776
777 /**
778 * Possibly executes other tasks until this task is ready, then
779 * returns the result of the computation. This method may be more
780 * efficient than {@code join}, but is only applicable when
781 * there are no potential dependencies between continuation of the
782 * current task and that of any other task that might be executed
783 * while helping. (This usually holds for pure divide-and-conquer
784 * tasks). This method may be invoked only from within
785 * ForkJoinTask computations (as may be determined using method
786 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
787 * result in exceptions or errors, possibly including
788 * ClassCastException.
789 *
790 * @return the computed result
791 */
792 public final V helpJoin() {
793 ForkJoinWorkerThread w = (ForkJoinWorkerThread) Thread.currentThread();
794 if (status < 0 || !w.unpushTask(this) || !tryExec())
795 reportException(busyJoin(w));
796 return getRawResult();
797 }
798
799 /**
800 * Possibly executes other tasks until this task is ready. This
801 * method may be invoked only from within ForkJoinTask
802 * computations (as may be determined using method {@link
803 * #inForkJoinPool}). Attempts to invoke in other contexts result
804 * in exceptions or errors, possibly including ClassCastException.
805 */
806 public final void quietlyHelpJoin() {
807 if (status >= 0) {
808 ForkJoinWorkerThread w =
809 (ForkJoinWorkerThread) Thread.currentThread();
810 if (!w.unpushTask(this) || !tryQuietlyInvoke())
811 busyJoin(w);
812 }
813 }
814
815 /**
816 * Joins this task, without returning its result or throwing an
817 * exception. This method may be useful when processing
818 * collections of tasks when some have been cancelled or otherwise
819 * known to have aborted.
820 */
821 public final void quietlyJoin() {
822 if (status >= 0) {
823 ForkJoinWorkerThread w = getWorker();
824 if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
825 awaitDone(w, true);
826 }
827 }
828
829 /**
830 * Commences performing this task and awaits its completion if
831 * necessary, without returning its result or throwing an
832 * exception. This method may be useful when processing
833 * collections of tasks when some have been cancelled or otherwise
834 * known to have aborted.
835 */
836 public final void quietlyInvoke() {
837 if (status >= 0 && !tryQuietlyInvoke())
838 quietlyJoin();
839 }
840
841 /**
842 * Possibly executes tasks until the pool hosting the current task
843 * {@link ForkJoinPool#isQuiescent}. This method may be of use in
844 * designs in which many tasks are forked, but none are explicitly
845 * joined, instead executing them until all are processed.
846 */
847 public static void helpQuiesce() {
848 ((ForkJoinWorkerThread) Thread.currentThread())
849 .helpQuiescePool();
850 }
851
852 /**
853 * Resets the internal bookkeeping state of this task, allowing a
854 * subsequent {@code fork}. This method allows repeated reuse of
855 * this task, but only if reuse occurs when this task has either
856 * never been forked, or has been forked, then completed and all
857 * outstanding joins of this task have also completed. Effects
858 * under any other usage conditions are not guaranteed, and are
859 * almost surely wrong. This method may be useful when executing
860 * pre-constructed trees of subtasks in loops.
861 */
862 public void reinitialize() {
863 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
864 exceptionMap.remove(this);
865 status = 0;
866 }
867
868 /**
869 * Returns the pool hosting the current task execution, or null
870 * if this task is executing outside of any ForkJoinPool.
871 *
872 * @return the pool, or null if none
873 */
874 public static ForkJoinPool getPool() {
875 Thread t = Thread.currentThread();
876 return (t instanceof ForkJoinWorkerThread) ?
877 ((ForkJoinWorkerThread) t).pool : null;
878 }
879
880 /**
881 * Returns {@code true} if the current thread is executing as a
882 * ForkJoinPool computation.
883 *
884 * @return {@code true} if the current thread is executing as a
885 * ForkJoinPool computation, or false otherwise
886 */
887 public static boolean inForkJoinPool() {
888 return Thread.currentThread() instanceof ForkJoinWorkerThread;
889 }
890
891 /**
892 * Tries to unschedule this task for execution. This method will
893 * typically succeed if this task is the most recently forked task
894 * by the current thread, and has not commenced executing in
895 * another thread. This method may be useful when arranging
896 * alternative local processing of tasks that could have been, but
897 * were not, stolen. This method may be invoked only from within
898 * ForkJoinTask computations (as may be determined using method
899 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
900 * result in exceptions or errors, possibly including
901 * ClassCastException.
902 *
903 * @return true if unforked
904 */
905 public boolean tryUnfork() {
906 return ((ForkJoinWorkerThread) Thread.currentThread())
907 .unpushTask(this);
908 }
909
910 /**
911 * Returns an estimate of the number of tasks that have been
912 * forked by the current worker thread but not yet executed. This
913 * value may be useful for heuristic decisions about whether to
914 * fork other tasks.
915 *
916 * @return the number of tasks
917 */
918 public static int getQueuedTaskCount() {
919 return ((ForkJoinWorkerThread) Thread.currentThread())
920 .getQueueSize();
921 }
922
923 /**
924 * Returns an estimate of how many more locally queued tasks are
925 * held by the current worker thread than there are other worker
926 * threads that might steal them. This value may be useful for
927 * heuristic decisions about whether to fork other tasks. In many
928 * usages of ForkJoinTasks, at steady state, each worker should
929 * aim to maintain a small constant surplus (for example, 3) of
930 * tasks, and to process computations locally if this threshold is
931 * exceeded.
932 *
933 * @return the surplus number of tasks, which may be negative
934 */
935 public static int getSurplusQueuedTaskCount() {
936 return ((ForkJoinWorkerThread) Thread.currentThread())
937 .getEstimatedSurplusTaskCount();
938 }
939
940 // Extension methods
941
942 /**
943 * Returns the result that would be returned by {@code join},
944 * even if this task completed abnormally, or null if this task is
945 * not known to have been completed. This method is designed to
946 * aid debugging, as well as to support extensions. Its use in any
947 * other context is discouraged.
948 *
949 * @return the result, or null if not completed
950 */
951 public abstract V getRawResult();
952
953 /**
954 * Forces the given value to be returned as a result. This method
955 * is designed to support extensions, and should not in general be
956 * called otherwise.
957 *
958 * @param value the value
959 */
960 protected abstract void setRawResult(V value);
961
962 /**
963 * Immediately performs the base action of this task. This method
964 * is designed to support extensions, and should not in general be
965 * called otherwise. The return value controls whether this task
966 * is considered to be done normally. It may return false in
967 * asynchronous actions that require explicit invocations of
968 * {@code complete} to become joinable. It may throw exceptions
969 * to indicate abnormal exit.
970 *
971 * @return true if completed normally
972 * @throws Error or RuntimeException if encountered during computation
973 */
974 protected abstract boolean exec();
975
976 /**
977 * Returns, but does not unschedule or execute, the task queued by
978 * the current thread but not yet executed, if one is
979 * available. There is no guarantee that this task will actually
980 * be polled or executed next. This method is designed primarily
981 * to support extensions, and is unlikely to be useful otherwise.
982 * This method may be invoked only from within ForkJoinTask
983 * computations (as may be determined using method {@link
984 * #inForkJoinPool}). Attempts to invoke in other contexts result
985 * in exceptions or errors, possibly including ClassCastException.
986 *
987 * @return the next task, or null if none are available
988 */
989 protected static ForkJoinTask<?> peekNextLocalTask() {
990 return ((ForkJoinWorkerThread) Thread.currentThread())
991 .peekTask();
992 }
993
994 /**
995 * Unschedules and returns, without executing, the next task
996 * queued by the current thread but not yet executed. This method
997 * is designed primarily to support extensions, and is unlikely to
998 * be useful otherwise. This method may be invoked only from
999 * within ForkJoinTask computations (as may be determined using
1000 * method {@link #inForkJoinPool}). Attempts to invoke in other
1001 * contexts result in exceptions or errors, possibly including
1002 * ClassCastException.
1003 *
1004 * @return the next task, or null if none are available
1005 */
1006 protected static ForkJoinTask<?> pollNextLocalTask() {
1007 return ((ForkJoinWorkerThread) Thread.currentThread())
1008 .pollLocalTask();
1009 }
1010
1011 /**
1012 * Unschedules and returns, without executing, the next task
1013 * queued by the current thread but not yet executed, if one is
1014 * available, or if not available, a task that was forked by some
1015 * other thread, if available. Availability may be transient, so a
1016 * {@code null} result does not necessarily imply quiescence
1017 * of the pool this task is operating in. This method is designed
1018 * primarily to support extensions, and is unlikely to be useful
1019 * otherwise. This method may be invoked only from within
1020 * ForkJoinTask computations (as may be determined using method
1021 * {@link #inForkJoinPool}). Attempts to invoke in other contexts
1022 * result in exceptions or errors, possibly including
1023 * ClassCastException.
1024 *
1025 * @return a task, or null if none are available
1026 */
1027 protected static ForkJoinTask<?> pollTask() {
1028 return ((ForkJoinWorkerThread) Thread.currentThread())
1029 .pollTask();
1030 }
1031
1032 // Serialization support
1033
1034 private static final long serialVersionUID = -7721805057305804111L;
1035
1036 /**
1037 * Save the state to a stream.
1038 *
1039 * @serialData the current run status and the exception thrown
1040 * during execution, or null if none
1041 * @param s the stream
1042 */
1043 private void writeObject(java.io.ObjectOutputStream s)
1044 throws java.io.IOException {
1045 s.defaultWriteObject();
1046 s.writeObject(getException());
1047 }
1048
1049 /**
1050 * Reconstitute the instance from a stream.
1051 *
1052 * @param s the stream
1053 */
1054 private void readObject(java.io.ObjectInputStream s)
1055 throws java.io.IOException, ClassNotFoundException {
1056 s.defaultReadObject();
1057 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1058 status |= EXTERNAL_SIGNAL; // conservatively set external signal
1059 Object ex = s.readObject();
1060 if (ex != null)
1061 setDoneExceptionally((Throwable) ex);
1062 }
1063
1064 // Unsafe mechanics for jsr166y 3rd party package.
1065 private static sun.misc.Unsafe getUnsafe() {
1066 try {
1067 return sun.misc.Unsafe.getUnsafe();
1068 } catch (SecurityException se) {
1069 try {
1070 return java.security.AccessController.doPrivileged
1071 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
1072 public sun.misc.Unsafe run() throws Exception {
1073 return getUnsafeByReflection();
1074 }});
1075 } catch (java.security.PrivilegedActionException e) {
1076 throw new RuntimeException("Could not initialize intrinsics",
1077 e.getCause());
1078 }
1079 }
1080 }
1081
1082 private static sun.misc.Unsafe getUnsafeByReflection()
1083 throws NoSuchFieldException, IllegalAccessException {
1084 java.lang.reflect.Field f =
1085 sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
1086 f.setAccessible(true);
1087 return (sun.misc.Unsafe) f.get(null);
1088 }
1089
1090 private static long fieldOffset(String fieldName, Class<?> klazz) {
1091 try {
1092 return UNSAFE.objectFieldOffset(klazz.getDeclaredField(fieldName));
1093 } catch (NoSuchFieldException e) {
1094 // Convert Exception to Error
1095 NoSuchFieldError error = new NoSuchFieldError(fieldName);
1096 error.initCause(e);
1097 throw error;
1098 }
1099 }
1100
1101 private static final sun.misc.Unsafe UNSAFE = getUnsafe();
1102 static final long statusOffset =
1103 fieldOffset("status", ForkJoinTask.class);
1104
1105 }