ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinTask.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinTask.java (file contents):
Revision 1.86 by dl, Tue May 6 17:31:01 2014 UTC vs.
Revision 1.87 by dl, Mon Jul 7 18:29:07 2014 UTC

# Line 268 | Line 268 | public abstract class ForkJoinTask<V> im
268      }
269  
270      /**
271 <     * Tries to set SIGNAL status unless already completed. Used by
272 <     * ForkJoinPool. Other variants are directly incorporated into
273 <     * externalAwaitDone etc.
274 <     *
275 <     * @return true if successful
276 <     */
277 <    final boolean trySetSignal() {
278 <        int s = status;
279 <        return s >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL);
271 >     * If not done, sets SIGNAL status and performs Object.wait(timeout).
272 >     * This task may or may not be done on exit. Ignores interrupts.
273 >     *
274 >     * @param timeout using Object.wait conventions.
275 >     */
276 >    final void internalWait(long timeout) {
277 >        int s;
278 >        if ((s = status) >= 0 && // force completer to issue notify
279 >            U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
280 >            synchronized (this) {
281 >                if (status >= 0)
282 >                    try { wait(timeout); } catch (InterruptedException ie) { }
283 >                else
284 >                    notifyAll();
285 >            }
286 >        }
287      }
288  
289      /**
# Line 284 | Line 291 | public abstract class ForkJoinTask<V> im
291       * @return status upon completion
292       */
293      private int externalAwaitDone() {
294 <        int s;
295 <        ForkJoinPool cp = ForkJoinPool.common;
296 <        if ((s = status) >= 0) {
297 <            if (cp != null) {
298 <                if (this instanceof CountedCompleter)
299 <                    s = cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
300 <                else if (cp.tryExternalUnpush(this))
301 <                    s = doExec();
302 <            }
303 <            if (s >= 0 && (s = status) >= 0) {
304 <                boolean interrupted = false;
305 <                do {
306 <                    if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
307 <                        synchronized (this) {
301 <                            if (status >= 0) {
302 <                                try {
303 <                                    wait();
304 <                                } catch (InterruptedException ie) {
305 <                                    interrupted = true;
306 <                                }
294 >        int s = ((this instanceof CountedCompleter) ? // try helping
295 >                 ForkJoinPool.common.externalHelpComplete(
296 >                     (CountedCompleter<?>)this, 0) :
297 >                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
298 >        if (s >= 0 && (s = status) >= 0) {
299 >            boolean interrupted = false;
300 >            do {
301 >                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
302 >                    synchronized (this) {
303 >                        if (status >= 0) {
304 >                            try {
305 >                                wait(0L);
306 >                            } catch (InterruptedException ie) {
307 >                                interrupted = true;
308                              }
308                            else
309                                notifyAll();
309                          }
310 +                        else
311 +                            notifyAll();
312                      }
313 <                } while ((s = status) >= 0);
314 <                if (interrupted)
315 <                    Thread.currentThread().interrupt();
316 <            }
313 >                }
314 >            } while ((s = status) >= 0);
315 >            if (interrupted)
316 >                Thread.currentThread().interrupt();
317          }
318          return s;
319      }
# Line 322 | Line 323 | public abstract class ForkJoinTask<V> im
323       */
324      private int externalInterruptibleAwaitDone() throws InterruptedException {
325          int s;
325        ForkJoinPool cp = ForkJoinPool.common;
326          if (Thread.interrupted())
327              throw new InterruptedException();
328 <        if ((s = status) >= 0 && cp != null) {
329 <            if (this instanceof CountedCompleter)
330 <                cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
331 <            else if (cp.tryExternalUnpush(this))
332 <                doExec();
333 <        }
334 <        while ((s = status) >= 0) {
335 <            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
336 <                synchronized (this) {
337 <                    if (status >= 0)
338 <                        wait();
339 <                    else
340 <                        notifyAll();
328 >        if ((s = status) >= 0 &&
329 >            (s = ((this instanceof CountedCompleter) ?
330 >                  ForkJoinPool.common.externalHelpComplete(
331 >                      (CountedCompleter<?>)this, 0) :
332 >                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
333 >                  0)) >= 0) {
334 >            while ((s = status) >= 0) {
335 >                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
336 >                    synchronized (this) {
337 >                        if (status >= 0)
338 >                            wait(0L);
339 >                        else
340 >                            notifyAll();
341 >                    }
342                  }
343              }
344          }
# Line 357 | Line 358 | public abstract class ForkJoinTask<V> im
358              ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
359              (w = (wt = (ForkJoinWorkerThread)t).workQueue).
360              tryUnpush(this) && (s = doExec()) < 0 ? s :
361 <            wt.pool.awaitJoin(w, this) :
361 >            wt.pool.awaitJoin(w, this, 0L) :
362              externalAwaitDone();
363      }
364  
# Line 370 | Line 371 | public abstract class ForkJoinTask<V> im
371          int s; Thread t; ForkJoinWorkerThread wt;
372          return (s = doExec()) < 0 ? s :
373              ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
374 <            (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
374 >            (wt = (ForkJoinWorkerThread)t).pool.
375 >            awaitJoin(wt.workQueue, this, 0L) :
376              externalAwaitDone();
377      }
378  
# Line 988 | Line 990 | public abstract class ForkJoinTask<V> im
990       */
991      public final V get(long timeout, TimeUnit unit)
992          throws InterruptedException, ExecutionException, TimeoutException {
993 +        int s;
994 +        long nanos = unit.toNanos(timeout);
995          if (Thread.interrupted())
996              throw new InterruptedException();
997 <        // Messy in part because we measure in nanosecs, but wait in millisecs
998 <        int s; long ms;
999 <        long ns = unit.toNanos(timeout);
996 <        ForkJoinPool cp;
997 <        if ((s = status) >= 0 && ns > 0L) {
998 <            long deadline = System.nanoTime() + ns;
999 <            ForkJoinPool p = null;
1000 <            ForkJoinPool.WorkQueue w = null;
997 >        if ((s = status) >= 0 && nanos > 0L) {
998 >            long d = System.nanoTime() + nanos;
999 >            long deadline = (d == 0L)? 1L : d; // avoid 0
1000              Thread t = Thread.currentThread();
1001              if (t instanceof ForkJoinWorkerThread) {
1002                  ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
1003 <                p = wt.pool;
1005 <                w = wt.workQueue;
1006 <                p.helpJoinOnce(w, this); // no retries on failure
1007 <            }
1008 <            else if ((cp = ForkJoinPool.common) != null) {
1009 <                if (this instanceof CountedCompleter)
1010 <                    cp.externalHelpComplete((CountedCompleter<?>)this, Integer.MAX_VALUE);
1011 <                else if (cp.tryExternalUnpush(this))
1012 <                    doExec();
1003 >                s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
1004              }
1005 <            boolean canBlock = false;
1006 <            boolean interrupted = false;
1007 <            try {
1008 <                while ((s = status) >= 0) {
1009 <                    if (w != null && w.qlock < 0)
1010 <                        cancelIgnoringExceptions(this);
1011 <                    else if (!canBlock) {
1012 <                        if (p == null || p.tryCompensate(p.ctl))
1013 <                            canBlock = true;
1014 <                    }
1015 <                    else {
1016 <                        if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
1017 <                            U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
1018 <                            synchronized (this) {
1019 <                                if (status >= 0) {
1029 <                                    try {
1030 <                                        wait(ms);
1031 <                                    } catch (InterruptedException ie) {
1032 <                                        if (p == null)
1033 <                                            interrupted = true;
1034 <                                    }
1035 <                                }
1036 <                                else
1037 <                                    notifyAll();
1038 <                            }
1005 >            else if ((s = ((this instanceof CountedCompleter) ?
1006 >                           ForkJoinPool.common.externalHelpComplete(
1007 >                               (CountedCompleter<?>)this, 0) :
1008 >                           ForkJoinPool.common.tryExternalUnpush(this) ?
1009 >                           doExec() : 0)) >= 0) {
1010 >                long ns, ms; // measure in nanosecs, but wait in millisecs
1011 >                while ((s = status) >= 0 &&
1012 >                       (ns = deadline - System.nanoTime()) > 0L) {
1013 >                    if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
1014 >                        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
1015 >                        synchronized (this) {
1016 >                            if (status >= 0)
1017 >                                wait(ms); // OK to throw InterruptedException
1018 >                            else
1019 >                                notifyAll();
1020                          }
1040                        if ((s = status) < 0 || interrupted ||
1041                            (ns = deadline - System.nanoTime()) <= 0L)
1042                            break;
1021                      }
1022                  }
1045            } finally {
1046                if (p != null && canBlock)
1047                    p.incrementActiveCount();
1023              }
1049            if (interrupted)
1050                throw new InterruptedException();
1024          }
1025 +        if (s >= 0)
1026 +            s = status;
1027          if ((s &= DONE_MASK) != NORMAL) {
1028              Throwable ex;
1029              if (s == CANCELLED)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines