ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.12
Committed: Wed Jul 22 01:36:51 2009 UTC (14 years, 9 months ago) by jsr166
Branch: MAIN
Changes since 1.11: +3 -0 lines
Log Message:
Add @since, @author tags

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.io.Serializable;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * Abstract base class for tasks that run within a {@link
17 * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18 * lighter weight than a normal thread. Huge numbers of tasks and
19 * subtasks may be hosted by a small number of actual threads in a
20 * ForkJoinPool, at the price of some usage limitations.
21 *
22 * <p> A "main" ForkJoinTask begins execution when submitted to a
23 * {@link ForkJoinPool}. Once started, it will usually in turn start
24 * other subtasks. As indicated by the name of this class, many
25 * programs using ForkJoinTasks employ only methods {@code fork}
26 * and {@code join}, or derivatives such as
27 * {@code invokeAll}. However, this class also provides a number
28 * of other methods that can come into play in advanced usages, as
29 * well as extension mechanics that allow support of new forms of
30 * fork/join processing.
31 *
32 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33 * efficiency of ForkJoinTasks stems from a set of restrictions (that
34 * are only partially statically enforceable) reflecting their
35 * intended use as computational tasks calculating pure functions or
36 * operating on purely isolated objects. The primary coordination
37 * mechanisms are {@link #fork}, that arranges asynchronous execution,
38 * and {@link #join}, that doesn't proceed until the task's result has
39 * been computed. Computations should avoid {@code synchronized}
40 * methods or blocks, and should minimize other blocking
41 * synchronization apart from joining other tasks or using
42 * synchronizers such as Phasers that are advertised to cooperate with
43 * fork/join scheduling. Tasks should also not perform blocking IO,
44 * and should ideally access variables that are completely independent
45 * of those accessed by other running tasks. Minor breaches of these
46 * restrictions, for example using shared output streams, may be
47 * tolerable in practice, but frequent use may result in poor
48 * performance, and the potential to indefinitely stall if the number
49 * of threads not waiting for IO or other external synchronization
50 * becomes exhausted. This usage restriction is in part enforced by
51 * not permitting checked exceptions such as {@code IOExceptions}
52 * to be thrown. However, computations may still encounter unchecked
53 * exceptions, that are rethrown to callers attempting join
54 * them. These exceptions may additionally include
55 * RejectedExecutionExceptions stemming from internal resource
56 * exhaustion such as failure to allocate internal task queues.
57 *
58 * <p>The primary method for awaiting completion and extracting
59 * results of a task is {@link #join}, but there are several variants:
60 * The {@link Future#get} methods support interruptible and/or timed
61 * waits for completion and report results using {@code Future}
62 * conventions. Method {@link #helpJoin} enables callers to actively
63 * execute other tasks while awaiting joins, which is sometimes more
64 * efficient but only applies when all subtasks are known to be
65 * strictly tree-structured. Method {@link #invoke} is semantically
66 * equivalent to {@code fork(); join()} but always attempts to
67 * begin execution in the current thread. The "<em>quiet</em>" forms
68 * of these methods do not extract results or report exceptions. These
69 * may be useful when a set of tasks are being executed, and you need
70 * to delay processing of results or exceptions until all complete.
71 * Method {@code invokeAll} (available in multiple versions)
72 * performs the most common form of parallel invocation: forking a set
73 * of tasks and joining them all.
74 *
75 * <p> The ForkJoinTask class is not usually directly subclassed.
76 * Instead, you subclass one of the abstract classes that support a
77 * particular style of fork/join processing. Normally, a concrete
78 * ForkJoinTask subclass declares fields comprising its parameters,
79 * established in a constructor, and then defines a {@code compute}
80 * method that somehow uses the control methods supplied by this base
81 * class. While these methods have {@code public} access (to allow
82 * instances of different task subclasses to call each others
83 * methods), some of them may only be called from within other
84 * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 * exceptions or errors possibly including ClassCastException.
86 *
87 * <p>Most base support methods are {@code final} because their
88 * implementations are intrinsically tied to the underlying
89 * lightweight task scheduling framework, and so cannot be overridden.
90 * Developers creating new basic styles of fork/join processing should
91 * minimally implement {@code protected} methods
92 * {@code exec}, {@code setRawResult}, and
93 * {@code getRawResult}, while also introducing an abstract
94 * computational method that can be implemented in its subclasses,
95 * possibly relying on other {@code protected} methods provided
96 * by this class.
97 *
98 * <p>ForkJoinTasks should perform relatively small amounts of
99 * computations, otherwise splitting into smaller tasks. As a very
100 * rough rule of thumb, a task should perform more than 100 and less
101 * than 10000 basic computational steps. If tasks are too big, then
102 * parallelism cannot improve throughput. If too small, then memory
103 * and internal task maintenance overhead may overwhelm processing.
104 *
105 * <p>ForkJoinTasks are {@code Serializable}, which enables them
106 * to be used in extensions such as remote execution frameworks. It is
107 * in general sensible to serialize tasks only before or after, but
108 * not during execution. Serialization is not relied on during
109 * execution itself.
110 *
111 * @since 1.7
112 * @author Doug Lea
113 */
114 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
115
116 /**
117 * Run control status bits packed into a single int to minimize
118 * footprint and to ensure atomicity (via CAS). Status is
119 * initially zero, and takes on nonnegative values until
120 * completed, upon which status holds COMPLETED. CANCELLED, or
121 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
122 * blocking waits by other threads have SIGNAL_MASK bits set --
123 * bit 15 for external (nonFJ) waits, and the rest a count of
124 * waiting FJ threads. (This representation relies on
125 * ForkJoinPool max thread limits). Completion of a stolen task
126 * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
127 * though suboptimal for some purposes, we use basic builtin
128 * wait/notify to take advantage of "monitor inflation" in JVMs
129 * that we would otherwise need to emulate to avoid adding further
130 * per-task bookkeeping overhead. Note that bits 16-28 are
131 * currently unused. Also value 0x80000000 is available as spare
132 * completion value.
133 */
134 volatile int status; // accessed directly by pool and workers
135
136 static final int COMPLETION_MASK = 0xe0000000;
137 static final int NORMAL = 0xe0000000; // == mask
138 static final int CANCELLED = 0xc0000000;
139 static final int EXCEPTIONAL = 0xa0000000;
140 static final int SIGNAL_MASK = 0x0000ffff;
141 static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
142 static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
143
144 /**
145 * Table of exceptions thrown by tasks, to enable reporting by
146 * callers. Because exceptions are rare, we don't directly keep
147 * them with task objects, but instead use a weak ref table. Note
148 * that cancellation exceptions don't appear in the table, but are
149 * instead recorded as status values.
150 * TODO: Use ConcurrentReferenceHashMap
151 */
152 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
153 Collections.synchronizedMap
154 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
155
156 // within-package utilities
157
158 /**
159 * Gets current worker thread, or null if not a worker thread.
160 */
161 static ForkJoinWorkerThread getWorker() {
162 Thread t = Thread.currentThread();
163 return ((t instanceof ForkJoinWorkerThread)?
164 (ForkJoinWorkerThread)t : null);
165 }
166
167 final boolean casStatus(int cmp, int val) {
168 return UNSAFE.compareAndSwapInt(this, statusOffset, cmp, val);
169 }
170
171 /**
172 * Workaround for not being able to rethrow unchecked exceptions.
173 */
174 static void rethrowException(Throwable ex) {
175 if (ex != null)
176 UNSAFE.throwException(ex);
177 }
178
179 // Setting completion status
180
181 /**
182 * Marks completion and wakes up threads waiting to join this task.
183 *
184 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
185 */
186 final void setCompletion(int completion) {
187 ForkJoinPool pool = getPool();
188 if (pool != null) {
189 int s; // Clear signal bits while setting completion status
190 do;while ((s = status) >= 0 && !casStatus(s, completion));
191
192 if ((s & SIGNAL_MASK) != 0) {
193 if ((s &= INTERNAL_SIGNAL_MASK) != 0)
194 pool.updateRunningCount(s);
195 synchronized(this) { notifyAll(); }
196 }
197 }
198 else
199 externallySetCompletion(completion);
200 }
201
202 /**
203 * Version of setCompletion for non-FJ threads. Leaves signal
204 * bits for unblocked threads to adjust, and always notifies.
205 */
206 private void externallySetCompletion(int completion) {
207 int s;
208 do;while ((s = status) >= 0 &&
209 !casStatus(s, (s & SIGNAL_MASK) | completion));
210 synchronized(this) { notifyAll(); }
211 }
212
213 /**
214 * Sets status to indicate normal completion
215 */
216 final void setNormalCompletion() {
217 // Try typical fast case -- single CAS, no signal, not already done.
218 // Manually expand casStatus to improve chances of inlining it
219 if (!UNSAFE.compareAndSwapInt(this, statusOffset, 0, NORMAL))
220 setCompletion(NORMAL);
221 }
222
223 // internal waiting and notification
224
225 /**
226 * Performs the actual monitor wait for awaitDone
227 */
228 private void doAwaitDone() {
229 // Minimize lock bias and in/de-flation effects by maximizing
230 // chances of waiting inside sync
231 try {
232 while (status >= 0)
233 synchronized(this) { if (status >= 0) wait(); }
234 } catch (InterruptedException ie) {
235 onInterruptedWait();
236 }
237 }
238
239 /**
240 * Performs the actual monitor wait for awaitDone
241 */
242 private void doAwaitDone(long startTime, long nanos) {
243 synchronized(this) {
244 try {
245 while (status >= 0) {
246 long nt = nanos - System.nanoTime() - startTime;
247 if (nt <= 0)
248 break;
249 wait(nt / 1000000, (int)(nt % 1000000));
250 }
251 } catch (InterruptedException ie) {
252 onInterruptedWait();
253 }
254 }
255 }
256
257 // Awaiting completion
258
259 /**
260 * Sets status to indicate there is joiner, then waits for join,
261 * surrounded with pool notifications.
262 *
263 * @return status upon exit
264 */
265 private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
266 ForkJoinPool pool = w == null? null : w.pool;
267 int s;
268 while ((s = status) >= 0) {
269 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
270 if (pool == null || !pool.preJoin(this, maintainParallelism))
271 doAwaitDone();
272 if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
273 adjustPoolCountsOnUnblock(pool);
274 break;
275 }
276 }
277 return s;
278 }
279
280 /**
281 * Timed version of awaitDone
282 * @return status upon exit
283 */
284 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
285 ForkJoinPool pool = w == null? null : w.pool;
286 int s;
287 while ((s = status) >= 0) {
288 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
289 long startTime = System.nanoTime();
290 if (pool == null || !pool.preJoin(this, false))
291 doAwaitDone(startTime, nanos);
292 if ((s = status) >= 0) {
293 adjustPoolCountsOnCancelledWait(pool);
294 s = status;
295 }
296 if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
297 adjustPoolCountsOnUnblock(pool);
298 break;
299 }
300 }
301 return s;
302 }
303
304 /**
305 * Notifies pool that thread is unblocked. Called by signalled
306 * threads when woken by non-FJ threads (which is atypical).
307 */
308 private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
309 int s;
310 do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
311 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
312 pool.updateRunningCount(s);
313 }
314
315 /**
316 * Notifies pool to adjust counts on cancelled or timed out wait.
317 */
318 private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
319 if (pool != null) {
320 int s;
321 while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
322 if (casStatus(s, s - 1)) {
323 pool.updateRunningCount(1);
324 break;
325 }
326 }
327 }
328 }
329
330 /**
331 * Handles interruptions during waits.
332 */
333 private void onInterruptedWait() {
334 ForkJoinWorkerThread w = getWorker();
335 if (w == null)
336 Thread.currentThread().interrupt(); // re-interrupt
337 else if (w.isTerminating())
338 cancelIgnoringExceptions();
339 // else if FJworker, ignore interrupt
340 }
341
342 // Recording and reporting exceptions
343
344 private void setDoneExceptionally(Throwable rex) {
345 exceptionMap.put(this, rex);
346 setCompletion(EXCEPTIONAL);
347 }
348
349 /**
350 * Throws the exception associated with status s.
351 *
352 * @throws the exception
353 */
354 private void reportException(int s) {
355 if ((s &= COMPLETION_MASK) < NORMAL) {
356 if (s == CANCELLED)
357 throw new CancellationException();
358 else
359 rethrowException(exceptionMap.get(this));
360 }
361 }
362
363 /**
364 * Returns result or throws exception using j.u.c.Future conventions.
365 * Only call when isDone known to be true.
366 */
367 private V reportFutureResult()
368 throws ExecutionException, InterruptedException {
369 int s = status & COMPLETION_MASK;
370 if (s < NORMAL) {
371 Throwable ex;
372 if (s == CANCELLED)
373 throw new CancellationException();
374 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
375 throw new ExecutionException(ex);
376 if (Thread.interrupted())
377 throw new InterruptedException();
378 }
379 return getRawResult();
380 }
381
382 /**
383 * Returns result or throws exception using j.u.c.Future conventions
384 * with timeouts.
385 */
386 private V reportTimedFutureResult()
387 throws InterruptedException, ExecutionException, TimeoutException {
388 Throwable ex;
389 int s = status & COMPLETION_MASK;
390 if (s == NORMAL)
391 return getRawResult();
392 if (s == CANCELLED)
393 throw new CancellationException();
394 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
395 throw new ExecutionException(ex);
396 if (Thread.interrupted())
397 throw new InterruptedException();
398 throw new TimeoutException();
399 }
400
401 // internal execution methods
402
403 /**
404 * Calls exec, recording completion, and rethrowing exception if
405 * encountered. Caller should normally check status before calling.
406 *
407 * @return true if completed normally
408 */
409 private boolean tryExec() {
410 try { // try block must contain only call to exec
411 if (!exec())
412 return false;
413 } catch (Throwable rex) {
414 setDoneExceptionally(rex);
415 rethrowException(rex);
416 return false; // not reached
417 }
418 setNormalCompletion();
419 return true;
420 }
421
422 /**
423 * Main execution method used by worker threads. Invokes
424 * base computation unless already complete.
425 */
426 final void quietlyExec() {
427 if (status >= 0) {
428 try {
429 if (!exec())
430 return;
431 } catch(Throwable rex) {
432 setDoneExceptionally(rex);
433 return;
434 }
435 setNormalCompletion();
436 }
437 }
438
439 /**
440 * Calls exec(), recording but not rethrowing exception.
441 * Caller should normally check status before calling.
442 *
443 * @return true if completed normally
444 */
445 private boolean tryQuietlyInvoke() {
446 try {
447 if (!exec())
448 return false;
449 } catch (Throwable rex) {
450 setDoneExceptionally(rex);
451 return false;
452 }
453 setNormalCompletion();
454 return true;
455 }
456
457 /**
458 * Cancels, ignoring any exceptions it throws.
459 */
460 final void cancelIgnoringExceptions() {
461 try {
462 cancel(false);
463 } catch(Throwable ignore) {
464 }
465 }
466
467 /**
468 * Main implementation of helpJoin
469 */
470 private int busyJoin(ForkJoinWorkerThread w) {
471 int s;
472 ForkJoinTask<?> t;
473 while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
474 t.quietlyExec();
475 return (s >= 0)? awaitDone(w, false) : s; // block if no work
476 }
477
478 // public methods
479
480 /**
481 * Arranges to asynchronously execute this task. While it is not
482 * necessarily enforced, it is a usage error to fork a task more
483 * than once unless it has completed and been reinitialized. This
484 * method may be invoked only from within ForkJoinTask
485 * computations. Attempts to invoke in other contexts result in
486 * exceptions or errors possibly including ClassCastException.
487 */
488 public final void fork() {
489 ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
490 }
491
492 /**
493 * Returns the result of the computation when it is ready.
494 * This method differs from {@code get} in that abnormal
495 * completion results in RuntimeExceptions or Errors, not
496 * ExecutionExceptions.
497 *
498 * @return the computed result
499 */
500 public final V join() {
501 ForkJoinWorkerThread w = getWorker();
502 if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
503 reportException(awaitDone(w, true));
504 return getRawResult();
505 }
506
507 /**
508 * Commences performing this task, awaits its completion if
509 * necessary, and return its result.
510 *
511 * @throws Throwable (a RuntimeException, Error, or unchecked
512 * exception) if the underlying computation did so
513 * @return the computed result
514 */
515 public final V invoke() {
516 if (status >= 0 && tryExec())
517 return getRawResult();
518 else
519 return join();
520 }
521
522 /**
523 * Forks both tasks, returning when {@code isDone} holds for
524 * both of them or an exception is encountered. This method may be
525 * invoked only from within ForkJoinTask computations. Attempts to
526 * invoke in other contexts result in exceptions or errors
527 * possibly including ClassCastException.
528 *
529 * @param t1 one task
530 * @param t2 the other task
531 * @throws NullPointerException if t1 or t2 are null
532 * @throws RuntimeException or Error if either task did so
533 */
534 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
535 t2.fork();
536 t1.invoke();
537 t2.join();
538 }
539
540 /**
541 * Forks the given tasks, returning when {@code isDone} holds
542 * for all of them. If any task encounters an exception, others
543 * may be cancelled. This method may be invoked only from within
544 * ForkJoinTask computations. Attempts to invoke in other contexts
545 * result in exceptions or errors possibly including ClassCastException.
546 *
547 * @param tasks the array of tasks
548 * @throws NullPointerException if tasks or any element are null
549 * @throws RuntimeException or Error if any task did so
550 */
551 public static void invokeAll(ForkJoinTask<?>... tasks) {
552 Throwable ex = null;
553 int last = tasks.length - 1;
554 for (int i = last; i >= 0; --i) {
555 ForkJoinTask<?> t = tasks[i];
556 if (t == null) {
557 if (ex == null)
558 ex = new NullPointerException();
559 }
560 else if (i != 0)
561 t.fork();
562 else {
563 t.quietlyInvoke();
564 if (ex == null)
565 ex = t.getException();
566 }
567 }
568 for (int i = 1; i <= last; ++i) {
569 ForkJoinTask<?> t = tasks[i];
570 if (t != null) {
571 if (ex != null)
572 t.cancel(false);
573 else {
574 t.quietlyJoin();
575 if (ex == null)
576 ex = t.getException();
577 }
578 }
579 }
580 if (ex != null)
581 rethrowException(ex);
582 }
583
584 /**
585 * Forks all tasks in the collection, returning when
586 * {@code isDone} holds for all of them. If any task
587 * encounters an exception, others may be cancelled. This method
588 * may be invoked only from within ForkJoinTask
589 * computations. Attempts to invoke in other contexts result in
590 * exceptions or errors possibly including ClassCastException.
591 *
592 * @param tasks the collection of tasks
593 * @throws NullPointerException if tasks or any element are null
594 * @throws RuntimeException or Error if any task did so
595 */
596 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
597 if (!(tasks instanceof List)) {
598 invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
599 return;
600 }
601 List<? extends ForkJoinTask<?>> ts =
602 (List<? extends ForkJoinTask<?>>)tasks;
603 Throwable ex = null;
604 int last = ts.size() - 1;
605 for (int i = last; i >= 0; --i) {
606 ForkJoinTask<?> t = ts.get(i);
607 if (t == null) {
608 if (ex == null)
609 ex = new NullPointerException();
610 }
611 else if (i != 0)
612 t.fork();
613 else {
614 t.quietlyInvoke();
615 if (ex == null)
616 ex = t.getException();
617 }
618 }
619 for (int i = 1; i <= last; ++i) {
620 ForkJoinTask<?> t = ts.get(i);
621 if (t != null) {
622 if (ex != null)
623 t.cancel(false);
624 else {
625 t.quietlyJoin();
626 if (ex == null)
627 ex = t.getException();
628 }
629 }
630 }
631 if (ex != null)
632 rethrowException(ex);
633 }
634
635 /**
636 * Returns true if the computation performed by this task has
637 * completed (or has been cancelled).
638 *
639 * @return true if this computation has completed
640 */
641 public final boolean isDone() {
642 return status < 0;
643 }
644
645 /**
646 * Returns true if this task was cancelled.
647 *
648 * @return true if this task was cancelled
649 */
650 public final boolean isCancelled() {
651 return (status & COMPLETION_MASK) == CANCELLED;
652 }
653
654 /**
655 * Asserts that the results of this task's computation will not be
656 * used. If a cancellation occurs before attempting to execute this
657 * task, then execution will be suppressed, {@code isCancelled}
658 * will report true, and {@code join} will result in a
659 * {@code CancellationException} being thrown. Otherwise, when
660 * cancellation races with completion, there are no guarantees
661 * about whether {@code isCancelled} will report true, whether
662 * {@code join} will return normally or via an exception, or
663 * whether these behaviors will remain consistent upon repeated
664 * invocation.
665 *
666 * <p>This method may be overridden in subclasses, but if so, must
667 * still ensure that these minimal properties hold. In particular,
668 * the cancel method itself must not throw exceptions.
669 *
670 * <p> This method is designed to be invoked by <em>other</em>
671 * tasks. To terminate the current task, you can just return or
672 * throw an unchecked exception from its computation method, or
673 * invoke {@code completeExceptionally}.
674 *
675 * @param mayInterruptIfRunning this value is ignored in the
676 * default implementation because tasks are not in general
677 * cancelled via interruption.
678 *
679 * @return true if this task is now cancelled
680 */
681 public boolean cancel(boolean mayInterruptIfRunning) {
682 setCompletion(CANCELLED);
683 return (status & COMPLETION_MASK) == CANCELLED;
684 }
685
686 /**
687 * Returns true if this task threw an exception or was cancelled.
688 *
689 * @return true if this task threw an exception or was cancelled
690 */
691 public final boolean isCompletedAbnormally() {
692 return (status & COMPLETION_MASK) < NORMAL;
693 }
694
695 /**
696 * Returns the exception thrown by the base computation, or a
697 * CancellationException if cancelled, or null if none or if the
698 * method has not yet completed.
699 *
700 * @return the exception, or null if none
701 */
702 public final Throwable getException() {
703 int s = status & COMPLETION_MASK;
704 if (s >= NORMAL)
705 return null;
706 if (s == CANCELLED)
707 return new CancellationException();
708 return exceptionMap.get(this);
709 }
710
711 /**
712 * Completes this task abnormally, and if not already aborted or
713 * cancelled, causes it to throw the given exception upon
714 * {@code join} and related operations. This method may be used
715 * to induce exceptions in asynchronous tasks, or to force
716 * completion of tasks that would not otherwise complete. Its use
717 * in other situations is likely to be wrong. This method is
718 * overridable, but overridden versions must invoke {@code super}
719 * implementation to maintain guarantees.
720 *
721 * @param ex the exception to throw. If this exception is
722 * not a RuntimeException or Error, the actual exception thrown
723 * will be a RuntimeException with cause ex.
724 */
725 public void completeExceptionally(Throwable ex) {
726 setDoneExceptionally((ex instanceof RuntimeException) ||
727 (ex instanceof Error)? ex :
728 new RuntimeException(ex));
729 }
730
731 /**
732 * Completes this task, and if not already aborted or cancelled,
733 * returning a {@code null} result upon {@code join} and related
734 * operations. This method may be used to provide results for
735 * asynchronous tasks, or to provide alternative handling for
736 * tasks that would not otherwise complete normally. Its use in
737 * other situations is likely to be wrong. This method is
738 * overridable, but overridden versions must invoke {@code super}
739 * implementation to maintain guarantees.
740 *
741 * @param value the result value for this task
742 */
743 public void complete(V value) {
744 try {
745 setRawResult(value);
746 } catch(Throwable rex) {
747 setDoneExceptionally(rex);
748 return;
749 }
750 setNormalCompletion();
751 }
752
753 public final V get() throws InterruptedException, ExecutionException {
754 ForkJoinWorkerThread w = getWorker();
755 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
756 awaitDone(w, true);
757 return reportFutureResult();
758 }
759
760 public final V get(long timeout, TimeUnit unit)
761 throws InterruptedException, ExecutionException, TimeoutException {
762 ForkJoinWorkerThread w = getWorker();
763 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
764 awaitDone(w, unit.toNanos(timeout));
765 return reportTimedFutureResult();
766 }
767
768 /**
769 * Possibly executes other tasks until this task is ready, then
770 * returns the result of the computation. This method may be more
771 * efficient than {@code join}, but is only applicable when
772 * there are no potential dependencies between continuation of the
773 * current task and that of any other task that might be executed
774 * while helping. (This usually holds for pure divide-and-conquer
775 * tasks). This method may be invoked only from within
776 * ForkJoinTask computations. Attempts to invoke in other contexts
777 * result in exceptions or errors possibly including ClassCastException.
778 *
779 * @return the computed result
780 */
781 public final V helpJoin() {
782 ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
783 if (status < 0 || !w.unpushTask(this) || !tryExec())
784 reportException(busyJoin(w));
785 return getRawResult();
786 }
787
788 /**
789 * Possibly executes other tasks until this task is ready. This
790 * method may be invoked only from within ForkJoinTask
791 * computations. Attempts to invoke in other contexts result in
792 * exceptions or errors possibly including ClassCastException.
793 */
794 public final void quietlyHelpJoin() {
795 if (status >= 0) {
796 ForkJoinWorkerThread w =
797 (ForkJoinWorkerThread)(Thread.currentThread());
798 if (!w.unpushTask(this) || !tryQuietlyInvoke())
799 busyJoin(w);
800 }
801 }
802
803 /**
804 * Joins this task, without returning its result or throwing an
805 * exception. This method may be useful when processing
806 * collections of tasks when some have been cancelled or otherwise
807 * known to have aborted.
808 */
809 public final void quietlyJoin() {
810 if (status >= 0) {
811 ForkJoinWorkerThread w = getWorker();
812 if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
813 awaitDone(w, true);
814 }
815 }
816
817 /**
818 * Commences performing this task and awaits its completion if
819 * necessary, without returning its result or throwing an
820 * exception. This method may be useful when processing
821 * collections of tasks when some have been cancelled or otherwise
822 * known to have aborted.
823 */
824 public final void quietlyInvoke() {
825 if (status >= 0 && !tryQuietlyInvoke())
826 quietlyJoin();
827 }
828
829 /**
830 * Possibly executes tasks until the pool hosting the current task
831 * {@link ForkJoinPool#isQuiescent}. This method may be of use in
832 * designs in which many tasks are forked, but none are explicitly
833 * joined, instead executing them until all are processed.
834 */
835 public static void helpQuiesce() {
836 ((ForkJoinWorkerThread)(Thread.currentThread())).
837 helpQuiescePool();
838 }
839
840 /**
841 * Resets the internal bookkeeping state of this task, allowing a
842 * subsequent {@code fork}. This method allows repeated reuse of
843 * this task, but only if reuse occurs when this task has either
844 * never been forked, or has been forked, then completed and all
845 * outstanding joins of this task have also completed. Effects
846 * under any other usage conditions are not guaranteed, and are
847 * almost surely wrong. This method may be useful when executing
848 * pre-constructed trees of subtasks in loops.
849 */
850 public void reinitialize() {
851 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
852 exceptionMap.remove(this);
853 status = 0;
854 }
855
856 /**
857 * Returns the pool hosting the current task execution, or null
858 * if this task is executing outside of any pool.
859 *
860 * @return the pool, or null if none
861 */
862 public static ForkJoinPool getPool() {
863 Thread t = Thread.currentThread();
864 return ((t instanceof ForkJoinWorkerThread)?
865 ((ForkJoinWorkerThread)t).pool : null);
866 }
867
868 /**
869 * Tries to unschedule this task for execution. This method will
870 * typically succeed if this task is the most recently forked task
871 * by the current thread, and has not commenced executing in
872 * another thread. This method may be useful when arranging
873 * alternative local processing of tasks that could have been, but
874 * were not, stolen. This method may be invoked only from within
875 * ForkJoinTask computations. Attempts to invoke in other contexts
876 * result in exceptions or errors possibly including ClassCastException.
877 *
878 * @return true if unforked
879 */
880 public boolean tryUnfork() {
881 return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
882 }
883
884 /**
885 * Returns an estimate of the number of tasks that have been
886 * forked by the current worker thread but not yet executed. This
887 * value may be useful for heuristic decisions about whether to
888 * fork other tasks.
889 *
890 * @return the number of tasks
891 */
892 public static int getQueuedTaskCount() {
893 return ((ForkJoinWorkerThread)(Thread.currentThread())).
894 getQueueSize();
895 }
896
897 /**
898 * Returns an estimate of how many more locally queued tasks are
899 * held by the current worker thread than there are other worker
900 * threads that might steal them. This value may be useful for
901 * heuristic decisions about whether to fork other tasks. In many
902 * usages of ForkJoinTasks, at steady state, each worker should
903 * aim to maintain a small constant surplus (for example, 3) of
904 * tasks, and to process computations locally if this threshold is
905 * exceeded.
906 *
907 * @return the surplus number of tasks, which may be negative
908 */
909 public static int getSurplusQueuedTaskCount() {
910 return ((ForkJoinWorkerThread)(Thread.currentThread()))
911 .getEstimatedSurplusTaskCount();
912 }
913
914 // Extension methods
915
916 /**
917 * Returns the result that would be returned by {@code join},
918 * even if this task completed abnormally, or null if this task is
919 * not known to have been completed. This method is designed to
920 * aid debugging, as well as to support extensions. Its use in any
921 * other context is discouraged.
922 *
923 * @return the result, or null if not completed
924 */
925 public abstract V getRawResult();
926
927 /**
928 * Forces the given value to be returned as a result. This method
929 * is designed to support extensions, and should not in general be
930 * called otherwise.
931 *
932 * @param value the value
933 */
934 protected abstract void setRawResult(V value);
935
936 /**
937 * Immediately performs the base action of this task. This method
938 * is designed to support extensions, and should not in general be
939 * called otherwise. The return value controls whether this task
940 * is considered to be done normally. It may return false in
941 * asynchronous actions that require explicit invocations of
942 * {@code complete} to become joinable. It may throw exceptions
943 * to indicate abnormal exit.
944 *
945 * @return true if completed normally
946 * @throws Error or RuntimeException if encountered during computation
947 */
948 protected abstract boolean exec();
949
950 /**
951 * Returns, but does not unschedule or execute, the task queued by
952 * the current thread but not yet executed, if one is
953 * available. There is no guarantee that this task will actually
954 * be polled or executed next. This method is designed primarily
955 * to support extensions, and is unlikely to be useful otherwise.
956 * This method may be invoked only from within ForkJoinTask
957 * computations. Attempts to invoke in other contexts result in
958 * exceptions or errors possibly including ClassCastException.
959 *
960 * @return the next task, or null if none are available
961 */
962 protected static ForkJoinTask<?> peekNextLocalTask() {
963 return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
964 }
965
966 /**
967 * Unschedules and returns, without executing, the next task
968 * queued by the current thread but not yet executed. This method
969 * is designed primarily to support extensions, and is unlikely to
970 * be useful otherwise. This method may be invoked only from
971 * within ForkJoinTask computations. Attempts to invoke in other
972 * contexts result in exceptions or errors possibly including
973 * ClassCastException.
974 *
975 * @return the next task, or null if none are available
976 */
977 protected static ForkJoinTask<?> pollNextLocalTask() {
978 return ((ForkJoinWorkerThread)(Thread.currentThread())).pollLocalTask();
979 }
980
981 /**
982 * Unschedules and returns, without executing, the next task
983 * queued by the current thread but not yet executed, if one is
984 * available, or if not available, a task that was forked by some
985 * other thread, if available. Availability may be transient, so a
986 * {@code null} result does not necessarily imply quiescence
987 * of the pool this task is operating in. This method is designed
988 * primarily to support extensions, and is unlikely to be useful
989 * otherwise. This method may be invoked only from within
990 * ForkJoinTask computations. Attempts to invoke in other contexts
991 * result in exceptions or errors possibly including
992 * ClassCastException.
993 *
994 * @return a task, or null if none are available
995 */
996 protected static ForkJoinTask<?> pollTask() {
997 return ((ForkJoinWorkerThread)(Thread.currentThread())).
998 pollTask();
999 }
1000
1001 // Serialization support
1002
1003 private static final long serialVersionUID = -7721805057305804111L;
1004
1005 /**
1006 * Save the state to a stream.
1007 *
1008 * @serialData the current run status and the exception thrown
1009 * during execution, or null if none
1010 * @param s the stream
1011 */
1012 private void writeObject(java.io.ObjectOutputStream s)
1013 throws java.io.IOException {
1014 s.defaultWriteObject();
1015 s.writeObject(getException());
1016 }
1017
1018 /**
1019 * Reconstitute the instance from a stream.
1020 *
1021 * @param s the stream
1022 */
1023 private void readObject(java.io.ObjectInputStream s)
1024 throws java.io.IOException, ClassNotFoundException {
1025 s.defaultReadObject();
1026 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1027 status |= EXTERNAL_SIGNAL; // conservatively set external signal
1028 Object ex = s.readObject();
1029 if (ex != null)
1030 setDoneExceptionally((Throwable)ex);
1031 }
1032
1033 // Temporary Unsafe mechanics for preliminary release
1034 private static Unsafe getUnsafe() throws Throwable {
1035 try {
1036 return Unsafe.getUnsafe();
1037 } catch (SecurityException se) {
1038 try {
1039 return java.security.AccessController.doPrivileged
1040 (new java.security.PrivilegedExceptionAction<Unsafe>() {
1041 public Unsafe run() throws Exception {
1042 return getUnsafePrivileged();
1043 }});
1044 } catch (java.security.PrivilegedActionException e) {
1045 throw e.getCause();
1046 }
1047 }
1048 }
1049
1050 private static Unsafe getUnsafePrivileged()
1051 throws NoSuchFieldException, IllegalAccessException {
1052 Field f = Unsafe.class.getDeclaredField("theUnsafe");
1053 f.setAccessible(true);
1054 return (Unsafe) f.get(null);
1055 }
1056
1057 private static long fieldOffset(String fieldName)
1058 throws NoSuchFieldException {
1059 return UNSAFE.objectFieldOffset
1060 (ForkJoinTask.class.getDeclaredField(fieldName));
1061 }
1062
1063 static final Unsafe UNSAFE;
1064 static final long statusOffset;
1065
1066 static {
1067 try {
1068 UNSAFE = getUnsafe();
1069 statusOffset = fieldOffset("status");
1070 } catch (Throwable e) {
1071 throw new RuntimeException("Could not initialize intrinsics", e);
1072 }
1073 }
1074
1075 }