ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinTask.java
Revision: 1.4
Committed: Mon Jan 12 17:16:18 2009 UTC (15 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.3: +22 -13 lines
Log Message:
Split out ThreadLocalRandom; internal refactoring pass

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package jsr166y;
8 import java.io.Serializable;
9 import java.util.*;
10 import java.util.concurrent.*;
11 import java.util.concurrent.atomic.*;
12 import sun.misc.Unsafe;
13 import java.lang.reflect.*;
14
15 /**
16 * Abstract base class for tasks that run within a {@link
17 * ForkJoinPool}. A ForkJoinTask is a thread-like entity that is much
18 * lighter weight than a normal thread. Huge numbers of tasks and
19 * subtasks may be hosted by a small number of actual threads in a
20 * ForkJoinPool, at the price of some usage limitations.
21 *
22 * <p> A "main" ForkJoinTask begins execution when submitted to a
23 * {@link ForkJoinPool}. Once started, it will usually in turn start
24 * other subtasks. As indicated by the name of this class, many
25 * programs using ForkJoinTasks employ only methods <code>fork</code>
26 * and <code>join</code>, or derivatives such as
27 * <code>invokeAll</code>. However, this class also provides a number
28 * of other methods that can come into play in advanced usages, as
29 * well as extension mechanics that allow support of new forms of
30 * fork/join processing.
31 *
32 * <p>A ForkJoinTask is a lightweight form of {@link Future}. The
33 * efficiency of ForkJoinTasks stems from a set of restrictions (that
34 * are only partially statically enforceable) reflecting their
35 * intended use as computational tasks calculating pure functions or
36 * operating on purely isolated objects. The primary coordination
37 * mechanisms are {@link #fork}, that arranges asynchronous execution,
38 * and {@link #join}, that doesn't proceed until the task's result has
39 * been computed. Computations should avoid <code>synchronized</code>
40 * methods or blocks, and should minimize other blocking
41 * synchronization apart from joining other tasks or using
42 * synchronizers such as Phasers that are advertised to cooperate with
43 * fork/join scheduling. Tasks should also not perform blocking IO,
44 * and should ideally access variables that are completely independent
45 * of those accessed by other running tasks. Minor breaches of these
46 * restrictions, for example using shared output streams, may be
47 * tolerable in practice, but frequent use may result in poor
48 * performance, and the potential to indefinitely stall if the number
49 * of threads not waiting for IO or other external synchronization
50 * becomes exhausted. This usage restriction is in part enforced by
51 * not permitting checked exceptions such as <code>IOExceptions</code>
52 * to be thrown. However, computations may still encounter unchecked
53 * exceptions, that are rethrown to callers attempting join
54 * them. These exceptions may additionally include
55 * RejectedExecutionExceptions stemming from internal resource
56 * exhaustion such as failure to allocate internal task queues.
57 *
58 * <p>The primary method for awaiting completion and extracting
59 * results of a task is {@link #join}, but there are several variants:
60 * The {@link Future#get} methods support interruptible and/or timed
61 * waits for completion and report results using <code>Future</code>
62 * conventions. Method {@link #helpJoin} enables callers to actively
63 * execute other tasks while awaiting joins, which is sometimes more
64 * efficient but only applies when all subtasks are known to be
65 * strictly tree-structured. Method {@link #invoke} is semantically
66 * equivalent to <code>fork(); join()</code> but always attempts to
67 * begin execution in the current thread. The "<em>quiet</em>" forms
68 * of these methods do not extract results or report exceptions. These
69 * may be useful when a set of tasks are being executed, and you need
70 * to delay processing of results or exceptions until all complete.
71 * Method <code>invokeAll</code> (available in multiple versions)
72 * performs the most common form of parallel invocation: forking a set
73 * of tasks and joining them all.
74 *
75 * <p> The ForkJoinTask class is not usually directly subclassed.
76 * Instead, you subclass one of the abstract classes that support a
77 * particular style of fork/join processing. Normally, a concrete
78 * ForkJoinTask subclass declares fields comprising its parameters,
79 * established in a constructor, and then defines a <code>compute</code>
80 * method that somehow uses the control methods supplied by this base
81 * class. While these methods have <code>public</code> access (to allow
82 * instances of different task subclasses to call each others
83 * methods), some of them may only be called from within other
84 * ForkJoinTasks. Attempts to invoke them in other contexts result in
85 * exceptions or errors possibly including ClassCastException.
86 *
87 * <p>Most base support methods are <code>final</code> because their
88 * implementations are intrinsically tied to the underlying
89 * lightweight task scheduling framework, and so cannot be overridden.
90 * Developers creating new basic styles of fork/join processing should
91 * minimally implement <code>protected</code> methods
92 * <code>exec</code>, <code>setRawResult</code>, and
93 * <code>getRawResult</code>, while also introducing an abstract
94 * computational method that can be implemented in its subclasses,
95 * possibly relying on other <code>protected</code> methods provided
96 * by this class.
97 *
98 * <p>ForkJoinTasks should perform relatively small amounts of
99 * computations, othewise splitting into smaller tasks. As a very
100 * rough rule of thumb, a task should perform more than 100 and less
101 * than 10000 basic computational steps. If tasks are too big, then
102 * parellelism cannot improve throughput. If too small, then memory
103 * and internal task maintenance overhead may overwhelm processing.
104 *
105 * <p>ForkJoinTasks are <code>Serializable</code>, which enables them
106 * to be used in extensions such as remote execution frameworks. It is
107 * in general sensible to serialize tasks only before or after, but
108 * not during execution. Serialization is not relied on during
109 * execution itself.
110 */
111 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
112
113 /**
114 * Run control status bits packed into a single int to minimize
115 * footprint and to ensure atomicity (via CAS). Status is
116 * initially zero, and takes on nonnegative values until
117 * completed, upon which status holds COMPLETED. CANCELLED, or
118 * EXCEPTIONAL, which use the top 3 bits. Tasks undergoing
119 * blocking waits by other threads have SIGNAL_MASK bits set --
120 * bit 15 for external (nonFJ) waits, and the rest a count of
121 * waiting FJ threads. (This representation relies on
122 * ForkJoinPool max thread limits). Completion of a stolen task
123 * with SIGNAL_MASK bits set awakens waiter via notifyAll. Even
124 * though suboptimal for some purposes, we use basic builtin
125 * wait/notify to take advantage of "monitor inflation" in JVMs
126 * that we would otherwise need to emulate to avoid adding further
127 * per-task bookkeeping overhead. Note that bits 16-28 are
128 * currently unused. Also value 0x80000000 is available as spare
129 * completion value.
130 */
131 volatile int status; // accessed directy by pool and workers
132
133 static final int COMPLETION_MASK = 0xe0000000;
134 static final int NORMAL = 0xe0000000; // == mask
135 static final int CANCELLED = 0xc0000000;
136 static final int EXCEPTIONAL = 0xa0000000;
137 static final int SIGNAL_MASK = 0x0000ffff;
138 static final int INTERNAL_SIGNAL_MASK = 0x00007fff;
139 static final int EXTERNAL_SIGNAL = 0x00008000; // top bit of low word
140
141 /**
142 * Table of exceptions thrown by tasks, to enable reporting by
143 * callers. Because exceptions are rare, we don't directly keep
144 * them with task objects, but instead us a weak ref table. Note
145 * that cancellation exceptions don't appear in the table, but are
146 * instead recorded as status values.
147 * Todo: Use ConcurrentReferenceHashMap
148 */
149 static final Map<ForkJoinTask<?>, Throwable> exceptionMap =
150 Collections.synchronizedMap
151 (new WeakHashMap<ForkJoinTask<?>, Throwable>());
152
153 // within-package utilities
154
155 /**
156 * Get current worker thread, or null if not a worker thread
157 */
158 static ForkJoinWorkerThread getWorker() {
159 Thread t = Thread.currentThread();
160 return ((t instanceof ForkJoinWorkerThread)?
161 (ForkJoinWorkerThread)t : null);
162 }
163
164 final boolean casStatus(int cmp, int val) {
165 return _unsafe.compareAndSwapInt(this, statusOffset, cmp, val);
166 }
167
168 /**
169 * Workaround for not being able to rethrow unchecked exceptions.
170 */
171 static void rethrowException(Throwable ex) {
172 if (ex != null)
173 _unsafe.throwException(ex);
174 }
175
176 // Setting completion status
177
178 /**
179 * Mark completion and wake up threads waiting to join this task.
180 * @param completion one of NORMAL, CANCELLED, EXCEPTIONAL
181 */
182 final void setCompletion(int completion) {
183 ForkJoinPool pool = getPool();
184 if (pool != null) {
185 int s; // Clear signal bits while setting completion status
186 do;while ((s = status) >= 0 && !casStatus(s, completion));
187
188 if ((s & SIGNAL_MASK) != 0) {
189 if ((s &= INTERNAL_SIGNAL_MASK) != 0)
190 pool.updateRunningCount(s);
191 synchronized(this) { notifyAll(); }
192 }
193 }
194 else
195 externallySetCompletion(completion);
196 }
197
198 /**
199 * Version of setCompletion for non-FJ threads. Leaves signal
200 * bits for unblocked threads to adjust, and always notifies.
201 */
202 private void externallySetCompletion(int completion) {
203 int s;
204 do;while ((s = status) >= 0 &&
205 !casStatus(s, (s & SIGNAL_MASK) | completion));
206 synchronized(this) { notifyAll(); }
207 }
208
209 /**
210 * Sets status to indicate normal completion
211 */
212 final void setNormalCompletion() {
213 // Try typical fast case -- single CAS, no signal, not already done.
214 // Manually expand casStatus to improve chances of inlining it
215 if (!_unsafe.compareAndSwapInt(this, statusOffset, 0, NORMAL))
216 setCompletion(NORMAL);
217 }
218
219 // internal waiting and notification
220
221 /**
222 * Performs the actual monitor wait for awaitDone
223 */
224 private void doAwaitDone() {
225 // Minimize lock bias and in/de-flation effects by maximizing
226 // chances of waiting inside sync
227 try {
228 while (status >= 0)
229 synchronized(this) { if (status >= 0) wait(); }
230 } catch (InterruptedException ie) {
231 onInterruptedWait();
232 }
233 }
234
235 /**
236 * Performs the actual monitor wait for awaitDone
237 */
238 private void doAwaitDone(long startTime, long nanos) {
239 synchronized(this) {
240 try {
241 while (status >= 0) {
242 long nt = nanos - System.nanoTime() - startTime;
243 if (nt <= 0)
244 break;
245 wait(nt / 1000000, (int)(nt % 1000000));
246 }
247 } catch (InterruptedException ie) {
248 onInterruptedWait();
249 }
250 }
251 }
252
253 // Awaiting completion
254
255 /**
256 * Sets status to indicate there is joiner, then waits for join,
257 * surrounded with pool notifications.
258 * @return status upon exit
259 */
260 private int awaitDone(ForkJoinWorkerThread w, boolean maintainParallelism) {
261 ForkJoinPool pool = w == null? null : w.pool;
262 int s;
263 while ((s = status) >= 0) {
264 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
265 if (pool == null || !pool.preJoin(this, maintainParallelism))
266 doAwaitDone();
267 if (((s = status) & INTERNAL_SIGNAL_MASK) != 0)
268 adjustPoolCountsOnUnblock(pool);
269 break;
270 }
271 }
272 return s;
273 }
274
275 /**
276 * Timed version of awaitDone
277 * @return status upon exit
278 */
279 private int awaitDone(ForkJoinWorkerThread w, long nanos) {
280 ForkJoinPool pool = w == null? null : w.pool;
281 int s;
282 while ((s = status) >= 0) {
283 if (casStatus(s, pool == null? s|EXTERNAL_SIGNAL : s+1)) {
284 long startTime = System.nanoTime();
285 if (pool == null || !pool.preJoin(this, false))
286 doAwaitDone(startTime, nanos);
287 if ((s = status) >= 0) {
288 adjustPoolCountsOnCancelledWait(pool);
289 s = status;
290 }
291 if (s < 0 && (s & INTERNAL_SIGNAL_MASK) != 0)
292 adjustPoolCountsOnUnblock(pool);
293 break;
294 }
295 }
296 return s;
297 }
298
299 /**
300 * Notify pool that thread is unblocked. Called by signalled
301 * threads when woken by non-FJ threads (which is atypical).
302 */
303 private void adjustPoolCountsOnUnblock(ForkJoinPool pool) {
304 int s;
305 do;while ((s = status) < 0 && !casStatus(s, s & COMPLETION_MASK));
306 if (pool != null && (s &= INTERNAL_SIGNAL_MASK) != 0)
307 pool.updateRunningCount(s);
308 }
309
310 /**
311 * Notify pool to adjust counts on cancelled or timed out wait
312 */
313 private void adjustPoolCountsOnCancelledWait(ForkJoinPool pool) {
314 if (pool != null) {
315 int s;
316 while ((s = status) >= 0 && (s & INTERNAL_SIGNAL_MASK) != 0) {
317 if (casStatus(s, s - 1)) {
318 pool.updateRunningCount(1);
319 break;
320 }
321 }
322 }
323 }
324
325 /**
326 * Handle interruptions during waits.
327 */
328 private void onInterruptedWait() {
329 ForkJoinWorkerThread w = getWorker();
330 if (w == null)
331 Thread.currentThread().interrupt(); // re-interrupt
332 else if (w.isTerminating())
333 cancelIgnoringExceptions();
334 // else if FJworker, ignore interrupt
335 }
336
337 // Recording and reporting exceptions
338
339 private void setDoneExceptionally(Throwable rex) {
340 exceptionMap.put(this, rex);
341 setCompletion(EXCEPTIONAL);
342 }
343
344 /**
345 * Throws the exception associated with status s;
346 * @throws the exception
347 */
348 private void reportException(int s) {
349 if ((s &= COMPLETION_MASK) < NORMAL) {
350 if (s == CANCELLED)
351 throw new CancellationException();
352 else
353 rethrowException(exceptionMap.get(this));
354 }
355 }
356
357 /**
358 * Returns result or throws exception using j.u.c.Future conventions
359 * Only call when isDone known to be true.
360 */
361 private V reportFutureResult()
362 throws ExecutionException, InterruptedException {
363 int s = status & COMPLETION_MASK;
364 if (s < NORMAL) {
365 Throwable ex;
366 if (s == CANCELLED)
367 throw new CancellationException();
368 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
369 throw new ExecutionException(ex);
370 if (Thread.interrupted())
371 throw new InterruptedException();
372 }
373 return getRawResult();
374 }
375
376 /**
377 * Returns result or throws exception using j.u.c.Future conventions
378 * with timeouts
379 */
380 private V reportTimedFutureResult()
381 throws InterruptedException, ExecutionException, TimeoutException {
382 Throwable ex;
383 int s = status & COMPLETION_MASK;
384 if (s == NORMAL)
385 return getRawResult();
386 if (s == CANCELLED)
387 throw new CancellationException();
388 if (s == EXCEPTIONAL && (ex = exceptionMap.get(this)) != null)
389 throw new ExecutionException(ex);
390 if (Thread.interrupted())
391 throw new InterruptedException();
392 throw new TimeoutException();
393 }
394
395 // internal execution methods
396
397 /**
398 * Calls exec, recording completion, and rethrowing exception if
399 * encountered. Caller should normally check status before calling
400 * @return true if completed normally
401 */
402 private boolean tryExec() {
403 try { // try block must contain only call to exec
404 if (!exec())
405 return false;
406 } catch (Throwable rex) {
407 setDoneExceptionally(rex);
408 rethrowException(rex);
409 return false; // not reached
410 }
411 setNormalCompletion();
412 return true;
413 }
414
415 /**
416 * Main execution method used by worker threads. Invokes
417 * base computation unless already complete
418 */
419 final void quietlyExec() {
420 if (status >= 0) {
421 try {
422 if (!exec())
423 return;
424 } catch(Throwable rex) {
425 setDoneExceptionally(rex);
426 return;
427 }
428 setNormalCompletion();
429 }
430 }
431
432 /**
433 * Calls exec, recording but not rethrowing exception
434 * Caller should normally check status before calling
435 * @return true if completed normally
436 */
437 private boolean tryQuietlyInvoke() {
438 try {
439 if (!exec())
440 return false;
441 } catch (Throwable rex) {
442 setDoneExceptionally(rex);
443 return false;
444 }
445 setNormalCompletion();
446 return true;
447 }
448
449 /**
450 * Cancel, ignoring any exceptions it throws
451 */
452 final void cancelIgnoringExceptions() {
453 try {
454 cancel(false);
455 } catch(Throwable ignore) {
456 }
457 }
458
459 /**
460 * Main implementation of helpJoin
461 */
462 private int busyJoin(ForkJoinWorkerThread w) {
463 int s;
464 ForkJoinTask<?> t;
465 while ((s = status) >= 0 && (t = w.scanWhileJoining(this)) != null)
466 t.quietlyExec();
467 return (s >= 0)? awaitDone(w, false) : s; // block if no work
468 }
469
470 // public methods
471
472 /**
473 * Arranges to asynchronously execute this task. While it is not
474 * necessarily enforced, it is a usage error to fork a task more
475 * than once unless it has completed and been reinitialized. This
476 * method may be invoked only from within ForkJoinTask
477 * computations. Attempts to invoke in other contexts result in
478 * exceptions or errors possibly including ClassCastException.
479 */
480 public final void fork() {
481 ((ForkJoinWorkerThread)(Thread.currentThread())).pushTask(this);
482 }
483
484 /**
485 * Returns the result of the computation when it is ready.
486 * This method differs from <code>get</code> in that abnormal
487 * completion results in RuntimeExceptions or Errors, not
488 * ExecutionExceptions.
489 *
490 * @return the computed result
491 */
492 public final V join() {
493 ForkJoinWorkerThread w = getWorker();
494 if (w == null || status < 0 || !w.unpushTask(this) || !tryExec())
495 reportException(awaitDone(w, true));
496 return getRawResult();
497 }
498
499 /**
500 * Commences performing this task, awaits its completion if
501 * necessary, and return its result.
502 * @throws Throwable (a RuntimeException, Error, or unchecked
503 * exception) if the underlying computation did so.
504 * @return the computed result
505 */
506 public final V invoke() {
507 if (status >= 0 && tryExec())
508 return getRawResult();
509 else
510 return join();
511 }
512
513 /**
514 * Forks both tasks, returning when <code>isDone</code> holds for
515 * both of them or an exception is encountered. This method may be
516 * invoked only from within ForkJoinTask computations. Attempts to
517 * invoke in other contexts result in exceptions or errors
518 * possibly including ClassCastException.
519 * @param t1 one task
520 * @param t2 the other task
521 * @throws NullPointerException if t1 or t2 are null
522 * @throws RuntimeException or Error if either task did so.
523 */
524 public static void invokeAll(ForkJoinTask<?>t1, ForkJoinTask<?> t2) {
525 t2.fork();
526 t1.invoke();
527 t2.join();
528 }
529
530 /**
531 * Forks the given tasks, returning when <code>isDone</code> holds
532 * for all of them. If any task encounters an exception, others
533 * may be cancelled. This method may be invoked only from within
534 * ForkJoinTask computations. Attempts to invoke in other contexts
535 * result in exceptions or errors possibly including ClassCastException.
536 * @param tasks the array of tasks
537 * @throws NullPointerException if tasks or any element are null.
538 * @throws RuntimeException or Error if any task did so.
539 */
540 public static void invokeAll(ForkJoinTask<?>... tasks) {
541 Throwable ex = null;
542 int last = tasks.length - 1;
543 for (int i = last; i >= 0; --i) {
544 ForkJoinTask<?> t = tasks[i];
545 if (t == null) {
546 if (ex == null)
547 ex = new NullPointerException();
548 }
549 else if (i != 0)
550 t.fork();
551 else {
552 t.quietlyInvoke();
553 if (ex == null)
554 ex = t.getException();
555 }
556 }
557 for (int i = 1; i <= last; ++i) {
558 ForkJoinTask<?> t = tasks[i];
559 if (t != null) {
560 if (ex != null)
561 t.cancel(false);
562 else {
563 t.quietlyJoin();
564 if (ex == null)
565 ex = t.getException();
566 }
567 }
568 }
569 if (ex != null)
570 rethrowException(ex);
571 }
572
573 /**
574 * Forks all tasks in the collection, returning when
575 * <code>isDone</code> holds for all of them. If any task
576 * encounters an exception, others may be cancelled. This method
577 * may be invoked only from within ForkJoinTask
578 * computations. Attempts to invoke in other contexts resul!t in
579 * exceptions or errors possibly including ClassCastException.
580 * @param tasks the collection of tasks
581 * @throws NullPointerException if tasks or any element are null.
582 * @throws RuntimeException or Error if any task did so.
583 */
584 public static void invokeAll(Collection<? extends ForkJoinTask<?>> tasks) {
585 if (!(tasks instanceof List)) {
586 invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()]));
587 return;
588 }
589 List<? extends ForkJoinTask<?>> ts =
590 (List<? extends ForkJoinTask<?>>)tasks;
591 Throwable ex = null;
592 int last = ts.size() - 1;
593 for (int i = last; i >= 0; --i) {
594 ForkJoinTask<?> t = ts.get(i);
595 if (t == null) {
596 if (ex == null)
597 ex = new NullPointerException();
598 }
599 else if (i != 0)
600 t.fork();
601 else {
602 t.quietlyInvoke();
603 if (ex == null)
604 ex = t.getException();
605 }
606 }
607 for (int i = 1; i <= last; ++i) {
608 ForkJoinTask<?> t = ts.get(i);
609 if (t != null) {
610 if (ex != null)
611 t.cancel(false);
612 else {
613 t.quietlyJoin();
614 if (ex == null)
615 ex = t.getException();
616 }
617 }
618 }
619 if (ex != null)
620 rethrowException(ex);
621 }
622
623 /**
624 * Returns true if the computation performed by this task has
625 * completed (or has been cancelled).
626 * @return true if this computation has completed
627 */
628 public final boolean isDone() {
629 return status < 0;
630 }
631
632 /**
633 * Returns true if this task was cancelled.
634 * @return true if this task was cancelled
635 */
636 public final boolean isCancelled() {
637 return (status & COMPLETION_MASK) == CANCELLED;
638 }
639
640 /**
641 * Asserts that the results of this task's computation will not be
642 * used. If a cancellation occurs before atempting to execute this
643 * task, then execution will be suppressed, <code>isCancelled</code>
644 * will report true, and <code>join</code> will result in a
645 * <code>CancellationException</code> being thrown. Otherwise, when
646 * cancellation races with completion, there are no guarantees
647 * about whether <code>isCancelled</code> will report true, whether
648 * <code>join</code> will return normally or via an exception, or
649 * whether these behaviors will remain consistent upon repeated
650 * invocation.
651 *
652 * <p>This method may be overridden in subclasses, but if so, must
653 * still ensure that these minimal properties hold. In particular,
654 * the cancel method itself must not throw exceptions.
655 *
656 * <p> This method is designed to be invoked by <em>other</em>
657 * tasks. To terminate the current task, you can just return or
658 * throw an unchecked exception from its computation method, or
659 * invoke <code>completeExceptionally</code>.
660 *
661 * @param mayInterruptIfRunning this value is ignored in the
662 * default implementation because tasks are not in general
663 * cancelled via interruption.
664 *
665 * @return true if this task is now cancelled
666 */
667 public boolean cancel(boolean mayInterruptIfRunning) {
668 setCompletion(CANCELLED);
669 return (status & COMPLETION_MASK) == CANCELLED;
670 }
671
672 /**
673 * Returns true if this task threw an exception or was cancelled
674 * @return true if this task threw an exception or was cancelled
675 */
676 public final boolean isCompletedAbnormally() {
677 return (status & COMPLETION_MASK) < NORMAL;
678 }
679
680 /**
681 * Returns the exception thrown by the base computation, or a
682 * CancellationException if cancelled, or null if none or if the
683 * method has not yet completed.
684 * @return the exception, or null if none
685 */
686 public final Throwable getException() {
687 int s = status & COMPLETION_MASK;
688 if (s >= NORMAL)
689 return null;
690 if (s == CANCELLED)
691 return new CancellationException();
692 return exceptionMap.get(this);
693 }
694
695 /**
696 * Completes this task abnormally, and if not already aborted or
697 * cancelled, causes it to throw the given exception upon
698 * <code>join</code> and related operations. This method may be used
699 * to induce exceptions in asynchronous tasks, or to force
700 * completion of tasks that would not otherwise complete. Its use
701 * in other situations is likely to be wrong. This method is
702 * overridable, but overridden versions must invoke <code>super</code>
703 * implementation to maintain guarantees.
704 *
705 * @param ex the exception to throw. If this exception is
706 * not a RuntimeException or Error, the actual exception thrown
707 * will be a RuntimeException with cause ex.
708 */
709 public void completeExceptionally(Throwable ex) {
710 setDoneExceptionally((ex instanceof RuntimeException) ||
711 (ex instanceof Error)? ex :
712 new RuntimeException(ex));
713 }
714
715 /**
716 * Completes this task, and if not already aborted or cancelled,
717 * returning a <code>null</code> result upon <code>join</code> and related
718 * operations. This method may be used to provide results for
719 * asynchronous tasks, or to provide alternative handling for
720 * tasks that would not otherwise complete normally. Its use in
721 * other situations is likely to be wrong. This method is
722 * overridable, but overridden versions must invoke <code>super</code>
723 * implementation to maintain guarantees.
724 *
725 * @param value the result value for this task.
726 */
727 public void complete(V value) {
728 try {
729 setRawResult(value);
730 } catch(Throwable rex) {
731 setDoneExceptionally(rex);
732 return;
733 }
734 setNormalCompletion();
735 }
736
737 public final V get() throws InterruptedException, ExecutionException {
738 ForkJoinWorkerThread w = getWorker();
739 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
740 awaitDone(w, true);
741 return reportFutureResult();
742 }
743
744 public final V get(long timeout, TimeUnit unit)
745 throws InterruptedException, ExecutionException, TimeoutException {
746 ForkJoinWorkerThread w = getWorker();
747 if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke())
748 awaitDone(w, unit.toNanos(timeout));
749 return reportTimedFutureResult();
750 }
751
752 /**
753 * Possibly executes other tasks until this task is ready, then
754 * returns the result of the computation. This method may be more
755 * efficient than <code>join</code>, but is only applicable when
756 * there are no potemtial dependencies between continuation of the
757 * current task and that of any other task that might be executed
758 * while helping. (This usually holds for pure divide-and-conquer
759 * tasks). This method may be invoked only from within
760 * ForkJoinTask computations. Attempts to invoke in other contexts
761 * resul!t in exceptions or errors possibly including ClassCastException.
762 * @return the computed result
763 */
764 public final V helpJoin() {
765 ForkJoinWorkerThread w = (ForkJoinWorkerThread)(Thread.currentThread());
766 if (status < 0 || !w.unpushTask(this) || !tryExec())
767 reportException(busyJoin(w));
768 return getRawResult();
769 }
770
771 /**
772 * Possibly executes other tasks until this task is ready. This
773 * method may be invoked only from within ForkJoinTask
774 * computations. Attempts to invoke in other contexts resul!t in
775 * exceptions or errors possibly including ClassCastException.
776 */
777 public final void quietlyHelpJoin() {
778 if (status >= 0) {
779 ForkJoinWorkerThread w =
780 (ForkJoinWorkerThread)(Thread.currentThread());
781 if (!w.unpushTask(this) || !tryQuietlyInvoke())
782 busyJoin(w);
783 }
784 }
785
786 /**
787 * Joins this task, without returning its result or throwing an
788 * exception. This method may be useful when processing
789 * collections of tasks when some have been cancelled or otherwise
790 * known to have aborted.
791 */
792 public final void quietlyJoin() {
793 if (status >= 0) {
794 ForkJoinWorkerThread w = getWorker();
795 if (w == null || !w.unpushTask(this) || !tryQuietlyInvoke())
796 awaitDone(w, true);
797 }
798 }
799
800 /**
801 * Commences performing this task and awaits its completion if
802 * necessary, without returning its result or throwing an
803 * exception. This method may be useful when processing
804 * collections of tasks when some have been cancelled or otherwise
805 * known to have aborted.
806 */
807 public final void quietlyInvoke() {
808 if (status >= 0 && !tryQuietlyInvoke())
809 quietlyJoin();
810 }
811
812 /**
813 * Possibly executes tasks until the pool hosting the current task
814 * {@link ForkJoinPool#isQuiescent}. This method may be of use in
815 * designs in which many tasks are forked, but none are explicitly
816 * joined, instead executing them until all are processed.
817 */
818 public static void helpQuiesce() {
819 ((ForkJoinWorkerThread)(Thread.currentThread())).
820 helpQuiescePool();
821 }
822
823 /**
824 * Resets the internal bookkeeping state of this task, allowing a
825 * subsequent <code>fork</code>. This method allows repeated reuse of
826 * this task, but only if reuse occurs when this task has either
827 * never been forked, or has been forked, then completed and all
828 * outstanding joins of this task have also completed. Effects
829 * under any other usage conditions are not guaranteed, and are
830 * almost surely wrong. This method may be useful when executing
831 * pre-constructed trees of subtasks in loops.
832 */
833 public void reinitialize() {
834 if ((status & COMPLETION_MASK) == EXCEPTIONAL)
835 exceptionMap.remove(this);
836 status = 0;
837 }
838
839 /**
840 * Returns the pool hosting the current task execution, or null
841 * if this task is executing outside of any pool.
842 * @return the pool, or null if none.
843 */
844 public static ForkJoinPool getPool() {
845 Thread t = Thread.currentThread();
846 return ((t instanceof ForkJoinWorkerThread)?
847 ((ForkJoinWorkerThread)t).pool : null);
848 }
849
850 /**
851 * Tries to unschedule this task for execution. This method will
852 * typically succeed if this task is the most recently forked task
853 * by the current thread, and has not commenced executing in
854 * another thread. This method may be useful when arranging
855 * alternative local processing of tasks that could have been, but
856 * were not, stolen. This method may be invoked only from within
857 * ForkJoinTask computations. Attempts to invoke in other contexts
858 * result in exceptions or errors possibly including ClassCastException.
859 * @return true if unforked
860 */
861 public boolean tryUnfork() {
862 return ((ForkJoinWorkerThread)(Thread.currentThread())).unpushTask(this);
863 }
864
865 /**
866 * Returns an estimate of the number of tasks that have been
867 * forked by the current worker thread but not yet executed. This
868 * value may be useful for heuristic decisions about whether to
869 * fork other tasks.
870 * @return the number of tasks
871 */
872 public static int getQueuedTaskCount() {
873 return ((ForkJoinWorkerThread)(Thread.currentThread())).
874 getQueueSize();
875 }
876
877 /**
878 * Returns a estimate of how many more locally queued tasks are
879 * held by the current worker thread than there are other worker
880 * threads that might steal them. This value may be useful for
881 * heuristic decisions about whether to fork other tasks. In many
882 * usages of ForkJoinTasks, at steady state, each worker should
883 * aim to maintain a small constant surplus (for example, 3) of
884 * tasks, and to process computations locally if this threshold is
885 * exceeded.
886 * @return the surplus number of tasks, which may be negative
887 */
888 public static int getSurplusQueuedTaskCount() {
889 return ((ForkJoinWorkerThread)(Thread.currentThread()))
890 .getEstimatedSurplusTaskCount();
891 }
892
893 // Extension methods
894
895 /**
896 * Returns the result that would be returned by <code>join</code>,
897 * even if this task completed abnormally, or null if this task is
898 * not known to have been completed. This method is designed to
899 * aid debugging, as well as to support extensions. Its use in any
900 * other context is discouraged.
901 *
902 * @return the result, or null if not completed.
903 */
904 public abstract V getRawResult();
905
906 /**
907 * Forces the given value to be returned as a result. This method
908 * is designed to support extensions, and should not in general be
909 * called otherwise.
910 *
911 * @param value the value
912 */
913 protected abstract void setRawResult(V value);
914
915 /**
916 * Immediately performs the base action of this task. This method
917 * is designed to support extensions, and should not in general be
918 * called otherwise. The return value controls whether this task
919 * is considered to be done normally. It may return false in
920 * asynchronous actions that require explicit invocations of
921 * <code>complete</code> to become joinable. It may throw exceptions
922 * to indicate abnormal exit.
923 * @return true if completed normally
924 * @throws Error or RuntimeException if encountered during computation
925 */
926 protected abstract boolean exec();
927
928 /**
929 * Returns, but does not unschedule or execute, the task most
930 * recently forked by the current thread but not yet executed, if
931 * one is available. There is no guarantee that this task will
932 * actually be polled or executed next.
933 * This method is designed primarily to support extensions,
934 * and is unlikely to be useful otherwise.
935 * This method may be invoked only from within
936 * ForkJoinTask computations. Attempts to invoke in other contexts
937 * result in exceptions or errors possibly including ClassCastException.
938 *
939 * @return the next task, or null if none are available
940 */
941 protected static ForkJoinTask<?> peekNextLocalTask() {
942 return ((ForkJoinWorkerThread)(Thread.currentThread())).peekTask();
943 }
944
945 /**
946 * Unschedules and returns, without executing, the task most
947 * recently forked by the current thread but not yet executed.
948 * This method is designed primarily to support extensions,
949 * and is unlikely to be useful otherwise.
950 * This method may be invoked only from within
951 * ForkJoinTask computations. Attempts to invoke in other contexts
952 * result in exceptions or errors possibly including ClassCastException.
953 *
954 * @return the next task, or null if none are available
955 */
956 protected static ForkJoinTask<?> pollNextLocalTask() {
957 return ((ForkJoinWorkerThread)(Thread.currentThread())).popTask();
958 }
959
960 /**
961 * Unschedules and returns, without executing, the task most
962 * recently forked by the current thread but not yet executed, if
963 * one is available, or if not available, a task that was forked
964 * by some other thread, if available. Availability may be
965 * transient, so a <code>null</code> result does not necessarily
966 * imply quiecence of the pool this task is operating in.
967 * This method is designed primarily to support extensions,
968 * and is unlikely to be useful otherwise.
969 * This method may be invoked only from within
970 * ForkJoinTask computations. Attempts to invoke in other contexts
971 * result in exceptions or errors possibly including ClassCastException.
972 *
973 * @return a task, or null if none are available
974 */
975 protected static ForkJoinTask<?> pollTask() {
976 return ((ForkJoinWorkerThread)(Thread.currentThread())).
977 pollTask();
978 }
979
980 // Serialization support
981
982 private static final long serialVersionUID = -7721805057305804111L;
983
984 /**
985 * Save the state to a stream.
986 *
987 * @serialData the current run status and the exception thrown
988 * during execution, or null if none.
989 * @param s the stream
990 */
991 private void writeObject(java.io.ObjectOutputStream s)
992 throws java.io.IOException {
993 s.defaultWriteObject();
994 s.writeObject(getException());
995 }
996
997 /**
998 * Reconstitute the instance from a stream.
999 * @param s the stream
1000 */
1001 private void readObject(java.io.ObjectInputStream s)
1002 throws java.io.IOException, ClassNotFoundException {
1003 s.defaultReadObject();
1004 status &= ~INTERNAL_SIGNAL_MASK; // clear internal signal counts
1005 status |= EXTERNAL_SIGNAL; // conservatively set external signal
1006 Object ex = s.readObject();
1007 if (ex != null)
1008 setDoneExceptionally((Throwable)ex);
1009 }
1010
1011 // Temporary Unsafe mechanics for preliminary release
1012
1013 static final Unsafe _unsafe;
1014 static final long statusOffset;
1015
1016 static {
1017 try {
1018 if (ForkJoinTask.class.getClassLoader() != null) {
1019 Field f = Unsafe.class.getDeclaredField("theUnsafe");
1020 f.setAccessible(true);
1021 _unsafe = (Unsafe)f.get(null);
1022 }
1023 else
1024 _unsafe = Unsafe.getUnsafe();
1025 statusOffset = _unsafe.objectFieldOffset
1026 (ForkJoinTask.class.getDeclaredField("status"));
1027 } catch (Exception ex) { throw new Error(ex); }
1028 }
1029
1030 }